强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

03 - 多路复用与流控制

第 03 章:多路复用与流控制

一根连接,万流并发——HTTP/2 最核心的性能特性


3.1 什么是多路复用

多路复用(Multiplexing)是 HTTP/2 最具革命性的特性。它允许在单一 TCP 连接上同时发送和接收多个请求/响应,且互不干扰。

3.1.1 HTTP/1.1 的串行困境

HTTP/1.1 连接模型(并发受限):

连接 1: [请求A]────[响应A]────[请求C]────[响应C]
连接 2: [请求B]────[响应B]────[请求D]────[响应D]
连接 3: [请求E]────[响应E]

问题:
- 浏览器限制 6 个并发连接
- 每个连接同一时刻只能处理一个请求
- 管线化(Pipelining)要求响应严格按序返回

3.1.2 HTTP/2 的多路复用

HTTP/2 单连接模型(完全并行):

连接: [流1 HEADERS][流3 HEADERS][流1 DATA][流5 HEADERS][流3 DATA][流1 DATA]...

         ┌──────────────────────────────────┐
         │         TCP 连接 (1个)            │
         │                                  │
         │  流1: ──H──D──D──D──            │
         │  流3: ──H──D──D────────         │
         │  流5: ──H──────────────D──      │
         │  流7: ──H──D──D────────         │
         │                                  │
         └──────────────────────────────────┘

优势:
- 无需建立多个 TCP 连接
- 帧可以交错发送
- 流之间完全独立

3.2 并发流管理

3.2.1 最大并发流限制

参数默认值建议范围说明
SETTINGS_MAX_CONCURRENT_STREAMS无限 (建议 100)100-256客户端/服务器同时活跃的流数量上限
# 模拟并发流管理器
import threading
from collections import deque

class ConcurrencyManager:
    def __init__(self, max_concurrent: int = 100):
        self.max_concurrent = max_concurrent
        self.active_streams: set = set()
        self.waiting_queue: deque = deque()
        self.lock = threading.Lock()
        self.condition = threading.Condition(self.lock)
    
    def acquire(self, stream_id: int) -> bool:
        """获取流的并发许可"""
        with self.condition:
            if len(self.active_streams) < self.max_concurrent:
                self.active_streams.add(stream_id)
                return True
            else:
                self.waiting_queue.append(stream_id)
                return False
    
    def release(self, stream_id: int):
        """释放流的并发占用"""
        with self.condition:
            self.active_streams.discard(stream_id)
            self.condition.notify()
    
    def wait_for_slot(self, stream_id: int):
        """等待可用的并发槽位"""
        with self.condition:
            while len(self.active_streams) >= self.max_concurrent:
                self.condition.wait()
            self.active_streams.add(stream_id)

# 使用示例
manager = ConcurrencyManager(max_concurrent=100)
manager.acquire(1)   # stream 1 获得许可
manager.acquire(3)   # stream 3 获得许可
manager.release(1)   # stream 1 释放

3.2.2 流状态与并发

package main

import (
	"fmt"
	"sync"
)

// Stream 表示一个 HTTP/2 流
type Stream struct {
	ID           uint32
	State        string
	HeadersSent  bool
	DataComplete bool
	mu           sync.Mutex
}

// Connection 管理多个流
type Connection struct {
	streams      map[uint32]*Stream
	maxStreams   int
	mu           sync.RWMutex
}

func NewConnection(maxStreams int) *Connection {
	return &Connection{
		streams:    make(map[uint32]*Stream),
		maxStreams: maxStreams,
	}
}

func (c *Connection) CreateStream(id uint32) (*Stream, error) {
	c.mu.Lock()
	defer c.mu.Unlock()
	
	if len(c.streams) >= c.maxStreams {
		return nil, fmt.Errorf("超出最大并发流限制 (%d)", c.maxStreams)
	}
	
	if _, exists := c.streams[id]; exists {
		return nil, fmt.Errorf("流 %d 已存在", id)
	}
	
	stream := &Stream{ID: id, State: "open"}
	c.streams[id] = stream
	return stream, nil
}

func (c *Connection) CloseStream(id uint32) {
	c.mu.Lock()
	defer c.mu.Unlock()
	delete(c.streams, id)
}

func (c *Connection) ActiveStreams() int {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return len(c.streams)
}

func main() {
	conn := NewConnection(100)
	
	// 模拟并发请求
	for i := uint32(1); i <= 10; i += 2 {
		stream, err := conn.CreateStream(i)
		if err != nil {
			fmt.Printf("创建流失败: %v\n", err)
			continue
		}
		fmt.Printf("创建流 %d,当前活跃: %d\n", stream.ID, conn.ActiveStreams())
	}
}

3.3 队头阻塞(Head-of-Line Blocking)

3.3.1 HTTP/1.1 的队头阻塞

队头阻塞是 HTTP/1.1 最严重的性能瓶颈之一:

问题场景:请求 A 的响应耗时较长

HTTP/1.1 管线化:
客户端发送: [A][B][C][D]
服务器响应: [响应A]----→[响应B][响应C][响应D]
                ↑
         B、C、D 必须等待 A 完成

结果:即使 B、C、D 已处理完毕,也必须排队等待

3.3.2 HTTP/2 如何解决应用层队头阻塞

HTTP/2 多路复用:
客户端发送: 流1-H 流3-H 流5-H 流7-H
服务器响应: 流5-D 流3-D 流1-D 流7-D
              ↑
         谁先处理完谁先返回

关键:帧级别的交错发送,流之间互不影响
# 模拟对比实验
import time
import concurrent.futures
from dataclasses import dataclass
from typing import List

@dataclass
class Request:
    id: int
    path: str
    delay_ms: int  # 服务器处理耗时

def simulate_http11(requests: List[Request], max_conn: int = 6) -> dict:
    """模拟 HTTP/1.1 请求(受限于连接数和队头阻塞)"""
    results = {}
    start = time.time()
    
    # 分批处理(每批最多 max_conn 个)
    for i in range(0, len(requests), max_conn):
        batch = requests[i:i+max_conn]
        # 每个连接串行处理
        for req in batch:
            time.sleep(req.delay_ms / 1000)
            results[req.id] = time.time() - start
    
    return results

def simulate_http2(requests: List[Request]) -> dict:
    """模拟 HTTP/2 请求(真正的多路复用)"""
    results = {}
    start = time.time()
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
        futures = {}
        for req in requests:
            future = executor.submit(lambda r: time.sleep(r.delay_ms/1000), req)
            futures[future] = req
        
        for future in concurrent.futures.as_completed(futures):
            req = futures[future]
            results[req.id] = time.time() - start
    
    return results

# 测试场景:10 个请求,处理时间各异
requests = [
    Request(1, "/css/style.css", 50),
    Request(2, "/js/app.js", 100),
    Request(3, "/api/users", 200),
    Request(4, "/api/orders", 300),  # 最慢
    Request(5, "/image/logo.png", 30),
    Request(6, "/js/utils.js", 60),
    Request(7, "/api/products", 150),
    Request(8, "/css/theme.css", 40),
    Request(9, "/image/bg.jpg", 80),
    Request(10, "/api/config", 20),
]

print("=== HTTP/1.1 (6 并发连接) ===")
h1_results = simulate_http11(requests, max_conn=6)
for rid, t in sorted(h1_results.items()):
    print(f"  请求 {rid}: {t:.3f}s")

print("\n=== HTTP/2 (多路复用) ===")
h2_results = simulate_http2(requests)
for rid, t in sorted(h2_results.items()):
    print(f"  请求 {rid}: {t:.3f}s")

3.3.3 TCP 层队头阻塞

⚠️ HTTP/2 并未完全消除队头阻塞

TCP 层的队头阻塞仍然存在:

应用层(HTTP/2):
  流1: 帧A1, 帧A2, 帧A3
  流2: 帧B1, 帧B2, 帧B3

TCP 层(单一字节流):
  发送: [A1][B1][A2][B2][A3][B3]
  
如果 B1 的 TCP 段丢失:
  TCP 必须等待 B1 重传后才能交付后续数据
  即使 A2、A3 已经到达,也必须等待
  
解决方案 → HTTP/3 (QUIC) 基于 UDP,流级别独立重传

3.4 流优先级(Stream Priority)

3.4.1 优先级模型

HTTP/2 使用**依赖树(Dependency Tree)权重(Weight)**来表达流之间的优先级关系。

依赖树示例(Web 页面资源加载):

         根节点 (流 0)
            │
     ┌──────┼──────┐
     │      │      │
  流 1    流 3    流 5
  HTML   CSS    JS
  w=256  w=256  w=256
          │      │
        ┌─┘      └─┐
        │          │
     流 7         流 9
     字体         图片
     w=128       w=64

说明:
- w = weight(权重,1-256)
- 子节点共享父节点的带宽
- 同级节点按权重比例分配带宽
- 依赖关系:被依赖的流优先获得资源

3.4.2 优先级类型

策略说明适用资源
最高优先级 + 独占关键资源独占连接首屏 CSS、关键 JS
高优先级 + 依赖依赖于关键资源字体、图片
低优先级 + 依赖后台加载预取资源、懒加载图片
最低优先级可延迟加载分析脚本、广告

3.4.3 优先级代码示例

package main

import (
	"fmt"
	"net/http"
)

func main() {
	// Go 的 HTTP/2 支持通过 ResponseWriter 设置优先级
	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
		// 获取流的优先级信息(来自客户端的 PRIORITY 帧)
		priority := r.Header.Get("Priority")
		fmt.Printf("流优先级: %s\n", priority)
		
		// 不同资源类型返回不同的缓存策略
		switch r.URL.Path {
		case "/critical.css":
			// CSS 关键资源,高优先级
			w.Header().Set("Content-Type", "text/css")
			w.Header().Set("Cache-Control", "public, max-age=31536000")
			w.Write([]byte("body { margin: 0; }"))
			
		case "/analytics.js":
			// 分析脚本,低优先级
			w.Header().Set("Content-Type", "application/javascript")
			w.Header().Set("Cache-Control", "public, max-age=3600")
			w.Write([]byte("// analytics code"))
			
		default:
			w.Write([]byte("Hello"))
		}
	})
}

3.4.4 优先级的局限性

⚠️ 优先级并非强制

  • 服务器可以忽略客户端的优先级建议
  • Chrome 和 Firefox 的优先级实现不完全相同
  • 某些 CDN 对优先级的支持有限

💡 实用建议

  • 关键资源使用 <link rel="preload"> 提示
  • 不要过度依赖优先级,使用资源加载策略(如懒加载)更可靠
  • HTTP/3 使用了改进的优先级方案(Extensible Priorities)

3.5 多路复用的性能优化

3.5.1 并发流数量调优

场景建议并发流数理由
静态资源服务100-256资源多但单个请求小
API 网关50-100请求多样,处理时间差异大
微服务通信100-200高频调用,需要高并发
实时推送50-100长时间占用流资源
// 服务端并发流配置
package main

import (
	"log"
	"net/http"
)

func main() {
	server := &http.Server{
		Addr: ":8443",
		Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
			w.Write([]byte("Hello"))
		}),
		// 最大并发流数(通过 SETTINGS 帧告知客户端)
		MaxConcurrentStreams: 100,
	}
	
	log.Printf("启动 HTTP/2 服务器,最大并发流: %d", 100)
	log.Fatal(server.ListenAndServeTLS("cert.pem", "key.pem"))
}

3.5.2 流级别的资源分配

# 根据流优先级分配服务器资源
import asyncio
from enum import IntEnum
from dataclasses import dataclass

class Priority(IntEnum):
    CRITICAL = 0    # 关键渲染路径
    HIGH = 1        # 重要资源
    MEDIUM = 2      # 普通资源
    LOW = 3         # 非关键资源
    BACKGROUND = 4  # 后台任务

@dataclass
class StreamRequest:
    stream_id: int
    path: str
    priority: Priority

class StreamScheduler:
    def __init__(self, max_workers: int = 10):
        self.max_workers = max_workers
        self.queues = {p: asyncio.Queue() for p in Priority}
        self.workers = []
    
    async def submit(self, request: StreamRequest):
        await self.queues[request.priority].put(request)
    
    async def process(self):
        """按优先级顺序处理请求"""
        while True:
            # 优先处理高优先级队列
            for priority in Priority:
                if not self.queues[priority].empty():
                    request = await self.queues[priority].get()
                    await self.handle_request(request)
                    return
            await asyncio.sleep(0.001)
    
    async def handle_request(self, request: StreamRequest):
        # 模拟处理
        await asyncio.sleep(0.01)
        print(f"处理流 {request.stream_id}: {request.path} (优先级: {request.priority.name})")

# 使用示例
async def main():
    scheduler = StreamScheduler()
    
    requests = [
        StreamRequest(1, "/css/critical.css", Priority.CRITICAL),
        StreamRequest(3, "/api/users", Priority.HIGH),
        StreamRequest(5, "/image/logo.png", Priority.MEDIUM),
        StreamRequest(7, "/analytics.js", Priority.LOW),
    ]
    
    for req in requests:
        await scheduler.submit(req)
    
    for _ in requests:
        await scheduler.process()

asyncio.run(main())

3.6 业务场景:微服务间的多路复用

场景:电商系统中,订单服务需要调用多个下游服务

传统 HTTP/1.1:
  订单服务 → 用户服务 (10ms)
  订单服务 → 库存服务 (15ms)
  订单服务 → 价格服务 (20ms)
  订单服务 → 支付服务 (25ms)
  总耗时: 串行 70ms / 并行 25ms(需要 4 个连接)

HTTP/2 多路复用:
  订单服务 → 用户服务 ┐
              库存服务 ├─ 同一 TCP 连接
              价格服务 │
              支付服务 ┘
  总耗时: 并行 25ms(仅 1 个连接)

优势:
- 减少 TCP/TLS 握手开销
- 降低文件描述符消耗
- 更好的拥塞控制
- 统一的连接管理
// 微服务间 HTTP/2 通信示例
package main

import (
	"context"
	"fmt"
	"io"
	"net/http"
	"sync"
	"time"
)

type ServiceClient struct {
	client *http.Client
	baseURL string
}

func NewServiceClient(baseURL string) *ServiceClient {
	transport := &http.Transport{
		MaxIdleConns:        100,
		MaxIdleConnsPerHost: 100,
		IdleConnTimeout:     90 * time.Second,
	}
	
	return &ServiceClient{
		client:  &http.Client{Transport: transport, Timeout: 10 * time.Second},
		baseURL: baseURL,
	}
}

func (c *ServiceClient) Call(ctx context.Context, path string) ([]byte, error) {
	req, err := http.NewRequestWithContext(ctx, "GET", c.baseURL+path, nil)
	if err != nil {
		return nil, err
	}
	
	resp, err := c.client.Do(req)
	if err != nil {
		return nil, err
	}
	defer resp.Body.Close()
	
	return io.ReadAll(resp.Body)
}

// 并行调用多个微服务
func CallServices(ctx context.Context, client *ServiceClient, paths []string) (map[string][]byte, error) {
	results := make(map[string][]byte)
	var mu sync.Mutex
	var wg sync.WaitGroup
	errs := make(chan error, len(paths))
	
	for _, path := range paths {
		wg.Add(1)
		go func(p string) {
			defer wg.Done()
			data, err := client.Call(ctx, p)
			if err != nil {
				errs <- err
				return
			}
			mu.Lock()
			results[p] = data
			mu.Unlock()
		}(path)
	}
	
	wg.Wait()
	close(errs)
	
	for err := range errs {
		return nil, err
	}
	return results, nil
}

func main() {
	client := NewServiceClient("https://api.example.com")
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	
	services := []string{
		"/users/123",
		"/orders/recent",
		"/products/search?q=shoes",
	}
	
	start := time.Now()
	results, err := CallServices(ctx, client, services)
	if err != nil {
		fmt.Printf("调用失败: %v\n", err)
		return
	}
	
	fmt.Printf("调用完成,耗时: %v\n", time.Since(start))
	for path, data := range results {
		fmt.Printf("  %s: %d bytes\n", path, len(data))
	}
}

3.7 注意事项

⚠️ 连接管理

  • 单连接故障会影响所有流,需实现连接健康检查
  • 连接空闲时应发送 PING 帧保持活性
  • 服务器应实现 GOAWAY 处理,优雅迁移流

⚠️ 并发流上限

  • 设置过低会限制吞吐量,过高会增加内存消耗
  • 建议根据服务器资源动态调整
  • 监控活跃流数量,避免资源耗尽

⚠️ 流优先级

  • 不同浏览器/客户端的优先级策略不同
  • 服务器可能忽略优先级建议
  • 关键资源建议使用 preload 显式提示

💡 性能调优

  • 合并小请求减少流数量
  • 使用流优先级确保关键资源先加载
  • 监控流创建/关闭频率,避免流风暴

3.8 扩展阅读


第 02 章 - 二进制分帧层 | 第 04 章 - 头部压缩 HPACK