强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

HTTP/2 与 RPC 精讲教程 / 06 - 流量控制机制

第 06 章:流量控制机制

用窗口机制掌控数据流——HTTP/2 的背压与流控设计


6.1 流量控制概述

HTTP/2 的流量控制(Flow Control)是一种基于**信用(Credit)**的机制,接收方通过 WINDOW_UPDATE 帧告知发送方自己愿意接收多少数据,从而避免接收方被数据淹没。

6.1.1 为什么需要流量控制

问题场景:服务器推送大量数据

客户端(移动端,内存有限)          服务器(高速推送)
  |                                    |
  |<---- DATA 1MB ---------------------|  接收缓冲区溢出!
  |<---- DATA 1MB ---------------------|  内存不足!
  |<---- DATA 1MB ---------------------|  OOM 崩溃!

流量控制解决方案:

客户端(限制窗口)                   服务器(遵守窗口)
  |                                    |
  |<---- WINDOW_UPDATE (16KB) ---------|  先给 16KB 窗口
  |<---- DATA 16KB -------------------|  服务器只发 16KB
  |                                    |
  |---- WINDOW_UPDATE (16KB) --------->|  处理完,再要 16KB
  |<---- DATA 16KB -------------------|
  ...

6.1.2 流量控制的核心概念

概念说明
窗口(Window)接收方允许发送方发送的最大字节数
初始窗口(Initial Window)连接/流建立时的窗口大小(默认 65535 字节)
窗口更新(Window Update)接收方通过 WINDOW_UPDATE 帧增加窗口大小
背压(Backpressure)接收方通过缩小窗口来减缓发送速率

⚠️ 重要特性

  • 流量控制是逐跳的(Hop-by-Hop),不是端到端的
  • 代理服务器需要独立管理上游和下游的流量控制
  • DATA 帧和 HEADERS 帧的负载计入流量控制
  • 帧头部(9 字节)不计入流量控制

6.2 两级流量控制

HTTP/2 有两个独立的流量控制级别:

┌─────────────────────────────────────────────────┐
│                 TCP 连接级别                      │
│  ┌────────────────────────────────────────────┐ │
│  │           连接级窗口 (Connection Window)    │ │
│  │                                            │ │
│  │  ┌──────┐  ┌──────┐  ┌──────┐  ┌──────┐  │ │
│  │  │ 流 1 │  │ 流 3 │  │ 流 5 │  │ 流 7 │  │ │
│  │  │窗口w1│  │窗口w3│  │窗口w5│  │窗口w7│  │ │
│  │  └──────┘  └──────┘  └──────┘  └──────┘  │ │
│  └────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────┘

两层窗口必须同时满足:
1. 流级窗口 > 0(该流允许接收数据)
2. 连接级窗口 > 0(连接总体允许接收数据)
级别作用域默认大小控制范围
连接级整个 TCP 连接65535 字节所有流共享
流级单个流65535 字节独立控制

6.3 WINDOW_UPDATE 帧

6.3.1 帧格式

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
|                 Length (24)                                   |
+---------------+---------------+-------------------------------+
|   Type (8)    |   Flags (8)   |
+-+-------------+---------------+-------------------------------+
|R|                 Stream Identifier (31)                      |
+-+-------------------------------------------------------------+
|R|              Window Size Increment (31)                     |
+-+-------------------------------------------------------------+
字段说明
Stream Identifier0 表示连接级更新,非 0 表示流级更新
Window Size Increment窗口增量(1 到 2^31 - 1)

6.3.2 窗口更新规则

class FlowControlWindow:
    def __init__(self, initial_size: int = 65535):
        self.size = initial_size
        self.max_size = (1 << 31) - 1  # 2^31 - 1
        self.consumed = 0
    
    def consume(self, bytes_count: int) -> bool:
        """发送方消耗窗口"""
        if bytes_count > self.size:
            return False  # 超出窗口,不可发送
        self.size -= bytes_count
        self.consumed += bytes_count
        return True
    
    def update(self, increment: int):
        """接收方增加窗口"""
        new_size = self.size + increment
        if new_size > self.max_size:
            raise FlowControlError("窗口大小溢出")
        self.size = new_size
    
    def available(self) -> int:
        """可用窗口大小"""
        return self.size

class FlowControlError(Exception):
    pass

6.3.3 Go 实现流控

package main

import (
	"fmt"
	"sync"
)

type FlowControlWindow struct {
	mu        sync.Mutex
	size      int32
	maxSize   int32
	consumed  int64
}

func NewFlowControlWindow(initialSize int32) *FlowControlWindow {
	return &FlowControlWindow{
		size:    initialSize,
		maxSize: (1 << 31) - 1,
	}
}

func (w *FlowControlWindow) Consume(n int32) error {
	w.mu.Lock()
	defer w.mu.Unlock()
	
	if n > w.size {
		return fmt.Errorf("流量控制错误: 需要 %d,可用 %d", n, w.size)
	}
	w.size -= n
	w.consumed += int64(n)
	return nil
}

func (w *FlowControlWindow) Update(increment int32) error {
	w.mu.Lock()
	defer w.mu.Unlock()
	
	newSize := w.size + increment
	if newSize > w.maxSize {
		return fmt.Errorf("窗口大小溢出: %d + %d > %d", w.size, increment, w.maxSize)
	}
	w.size = newSize
	return nil
}

func (w *FlowControlWindow) Available() int32 {
	w.mu.Lock()
	defer w.mu.Unlock()
	return w.size
}

func main() {
	// 连接级窗口
	connWindow := NewFlowControlWindow(65535)
	// 流级窗口
	streamWindow := NewFlowControlWindow(65535)
	
	// 模拟发送 16KB 数据
	dataSize := int32(16384)
	
	if err := connWindow.Consume(dataSize); err != nil {
		fmt.Println("连接级错误:", err)
		return
	}
	if err := streamWindow.Consume(dataSize); err != nil {
		fmt.Println("流级错误:", err)
		return
	}
	
	fmt.Printf("连接窗口剩余: %d\n", connWindow.Available())
	fmt.Printf("流窗口剩余: %d\n", streamWindow.Available())
	
	// 接收方处理完数据后,更新窗口
	connWindow.Update(dataSize)
	streamWindow.Update(dataSize)
	
	fmt.Printf("更新后连接窗口: %d\n", connWindow.Available())
	fmt.Printf("更新后流窗口: %d\n", streamWindow.Available())
}

6.4 初始窗口大小设置

6.4.1 SETTINGS_INITIAL_WINDOW_SIZE

# 连接建立时的窗口大小协商
class ConnectionSetup:
    def __init__(self):
        self.initial_window_size = 65535  # 默认值
        self.max_frame_size = 16384       # 默认值
    
    def process_settings(self, settings: dict):
        """处理 SETTINGS 帧"""
        if 0x4 in settings:  # SETTINGS_INITIAL_WINDOW_SIZE
            new_size = settings[0x4]
            if new_size > (1 << 31) - 1:
                raise ValueError("窗口大小超出上限")
            
            # 计算增量并更新所有现有流
            delta = new_size - self.initial_window_size
            self.initial_window_size = new_size
            
            # 更新所有活跃流的窗口
            # 注意:仅影响已存在的流
            return delta
    
    def create_settings_frame(self) -> dict:
        """创建初始 SETTINGS"""
        return {
            0x1: 4096,       # HEADER_TABLE_SIZE
            0x2: 1,          # ENABLE_PUSH
            0x3: 100,        # MAX_CONCURRENT_STREAMS
            0x4: self.initial_window_size,
            0x5: self.max_frame_size,
        }

6.4.2 窗口大小建议

场景建议初始窗口理由
通用 Web 服务65535 (默认)平衡内存与吞吐
大文件传输1048576 (1MB)减少 WINDOW_UPDATE 频率
实时小消息16384 (16KB)快速反馈,减少缓冲
微服务通信262144 (256KB)适中,支持中等负载
流媒体服务1048576+高吞吐优先

6.5 背压机制实现

6.5.1 生产者-消费者模型

import asyncio
from typing import Optional

class StreamBuffer:
    """流级别缓冲区,实现背压"""
    
    def __init__(self, stream_id: int, window_size: int = 65535):
        self.stream_id = stream_id
        self.window_size = window_size
        self.buffer = bytearray()
        self.high_water = window_size * 0.8  # 高水位线
        self.low_water = window_size * 0.2   # 低水位线
        self.paused = False
        self._event = asyncio.Event()
    
    async def write(self, data: bytes):
        """生产者写入数据"""
        while len(self.buffer) + len(data) > self.window_size:
            # 等待消费者消费
            self.paused = True
            await self._event.wait()
            self._event.clear()
        
        self.buffer.extend(data)
        
        if len(self.buffer) > self.high_water:
            self.paused = True
    
    async def read(self, size: int) -> bytes:
        """消费者读取数据"""
        while len(self.buffer) == 0:
            await asyncio._get_running_loop().run_in_executor(None, lambda: None)
        
        chunk = bytes(self.buffer[:size])
        self.buffer = self.buffer[size:]
        
        if self.paused and len(self.buffer) < self.low_water:
            self.paused = False
            self._event.set()
        
        return chunk
    
    def window_update(self, increment: int):
        """接收方增加窗口"""
        self.window_size += increment

# 使用示例
async def producer(buffer: StreamBuffer):
    for i in range(100):
        data = f"消息 {i}\n".encode()
        await buffer.write(data)
        print(f"发送: 消息 {i} (缓冲: {len(buffer.buffer)} 字节)")

async def consumer(buffer: StreamBuffer):
    for _ in range(100):
        data = await buffer.read(1024)
        print(f"接收: {data.decode().strip()}")
        await asyncio.sleep(0.1)  # 模拟处理延迟

async def main():
    buffer = StreamBuffer(stream_id=1, window_size=4096)
    await asyncio.gather(producer(buffer), consumer(buffer))

asyncio.run(main())

6.5.2 连接级背压

package main

import (
	"fmt"
	"sync"
	"time"
)

// ConnectionFlowControl 管理连接级别的流量控制
type ConnectionFlowControl struct {
	mu              sync.Mutex
	connWindow      int32
	streamWindows   map[uint32]int32
	maxWindow       int32
	pendingUpdate   int32
	onWindowUpdate  func(streamID uint32, increment int32)
}

func NewConnectionFlowControl(initialWindow int32) *ConnectionFlowControl {
	return &ConnectionFlowControl{
		connWindow:    initialWindow,
		streamWindows: make(map[uint32]int32),
		maxWindow:     1 << 24, // 16MB
	}
}

func (fc *ConnectionFlowControl) RegisterStream(streamID uint32, windowSize int32) {
	fc.mu.Lock()
	defer fc.mu.Unlock()
	fc.streamWindows[streamID] = windowSize
}

func (fc *ConnectionFlowControl) RemoveStream(streamID uint32) {
	fc.mu.Lock()
	defer fc.mu.Unlock()
	delete(fc.streamWindows, streamID)
}

func (fc *ConnectionFlowControl) CanSend(streamID uint32, size int32) bool {
	fc.mu.Lock()
	defer fc.mu.Unlock()
	
	streamWindow := fc.streamWindows[streamID]
	return fc.connWindow >= size && streamWindow >= size
}

func (fc *ConnectionFlowControl) Consume(streamID uint32, size int32) {
	fc.mu.Lock()
	defer fc.mu.Unlock()
	
	fc.connWindow -= size
	fc.streamWindows[streamID] -= size
}

func (fc *ConnectionFlowControl) OnDataReceived(streamID uint32, size int32) {
	fc.mu.Lock()
	fc.pendingUpdate += size
	
	// 累积到一定量再发送 WINDOW_UPDATE
	if fc.pendingUpdate >= fc.maxWindow/2 {
		increment := fc.pendingUpdate
		fc.pendingUpdate = 0
		fc.mu.Unlock()
		
		if fc.onWindowUpdate != nil {
			fc.onWindowUpdate(0, increment) // 0 = 连接级
			fc.onWindowUpdate(streamID, increment)
		}
	} else {
		fc.mu.Unlock()
	}
}

func main() {
	fc := NewConnectionFlowControl(65535)
	
	fc.RegisterStream(1, 65535)
	fc.RegisterStream(3, 65535)
	
	// 模拟发送
	dataSize := int32(1024)
	if fc.CanSend(1, dataSize) {
		fc.Consume(1, dataSize)
		fmt.Printf("流 1 发送 %d 字节,连接窗口剩余: %d\n", dataSize, fc.connWindow)
	}
	
	// 模拟接收并自动更新窗口
	fc.OnDataReceived(1, dataSize)
	fmt.Printf("处理接收数据后,待更新: %d\n", fc.pendingUpdate)
}

6.6 流量控制与流优先级

流优先级影响带宽分配,但不影响流量控制:

高优先级流(权重 256):获得 80% 带宽
低优先级流(权重 64):获得 20% 带宽

但如果低优先级流的窗口为 0:
- 该流无法发送任何数据
- 空闲带宽分配给其他流

实际分配 = min(可用窗口, 优先级带宽份额)

6.7 业务场景:视频流媒体

场景:HTTP/2 传输视频流

问题:
- 视频流持续传输大量数据
- 客户端播放缓冲区有限(通常 30-60 秒)
- 如果不控制,服务器会无限推送

解决方案:
1. 客户端维护播放缓冲区大小
2. 缓冲区满时停止发送 WINDOW_UPDATE
3. 缓冲区降到阈值以下时恢复 WINDOW_UPDATE
4. 实现平滑播放,避免卡顿
import asyncio

class VideoStreamReceiver:
    def __init__(self, stream_id: int, buffer_limit: int = 5 * 1024 * 1024):
        self.stream_id = stream_id
        self.buffer_limit = buffer_limit  # 5MB 缓冲限制
        self.buffer = bytearray()
        self.window_size = 65535
        self.paused = False
    
    async def on_data(self, data: bytes):
        """接收视频数据"""
        self.buffer.extend(data)
        
        # 缓冲区接近满,暂停接收
        if len(self.buffer) > self.buffer_limit * 0.9:
            self.paused = True
            print(f"缓冲区 {len(self.buffer)}/{self.buffer_limit},暂停接收")
    
    def consume_for_playback(self, size: int) -> bytes:
        """播放器消费数据"""
        chunk = bytes(self.buffer[:size])
        self.buffer = self.buffer[size:]
        
        # 缓冲区降低,恢复接收
        if self.paused and len(self.buffer) < self.buffer_limit * 0.5:
            self.paused = False
            increment = self.buffer_limit - len(self.buffer)
            print(f"缓冲区恢复,请求 {increment} 字节窗口更新")
            return chunk, increment
        
        return chunk, 0
    
    def should_send_window_update(self) -> tuple[bool, int]:
        """判断是否需要发送 WINDOW_UPDATE"""
        if self.paused:
            return False, 0
        
        available = self.buffer_limit - len(self.buffer)
        if available > self.window_size:
            return True, available - self.window_size
        return False, 0

6.8 注意事项

⚠️ 窗口耗尽处理

  • 发送方在窗口为 0 时不得发送 DATA 帧
  • 必须等待 WINDOW_UPDATE 帧才能继续发送
  • 如果等待时间过长,应发送 PING 检测连接活性

⚠️ 窗口溢出

  • WINDOW_UPDATE 帧可能导致窗口大小超过 2^31 - 1
  • 这是 FLOW_CONTROL_ERROR,必须终止连接
  • 实现时应检查每次更新后的窗口大小

⚠️ 代理服务器

  • 代理需要独立管理上下游的流控窗口
  • 上游窗口耗尽时应停止向下游推送
  • 下游窗口耗尽时应停止请求上游数据

💡 调优建议

  • 根据网络带宽延迟积(BDP)设置窗口大小
  • BDP = 带宽 × RTT,窗口应不小于 BDP
  • 高延迟网络(如跨洋)需要更大的窗口

6.9 扩展阅读


第 05 章 - 服务器推送 | 第 07 章 - HTTP/3 与 QUIC