HTTP/2 与 RPC 精讲教程 / 06 - 流量控制机制
第 06 章:流量控制机制
用窗口机制掌控数据流——HTTP/2 的背压与流控设计
6.1 流量控制概述
HTTP/2 的流量控制(Flow Control)是一种基于**信用(Credit)**的机制,接收方通过 WINDOW_UPDATE 帧告知发送方自己愿意接收多少数据,从而避免接收方被数据淹没。
6.1.1 为什么需要流量控制
问题场景:服务器推送大量数据
客户端(移动端,内存有限) 服务器(高速推送)
| |
|<---- DATA 1MB ---------------------| 接收缓冲区溢出!
|<---- DATA 1MB ---------------------| 内存不足!
|<---- DATA 1MB ---------------------| OOM 崩溃!
流量控制解决方案:
客户端(限制窗口) 服务器(遵守窗口)
| |
|<---- WINDOW_UPDATE (16KB) ---------| 先给 16KB 窗口
|<---- DATA 16KB -------------------| 服务器只发 16KB
| |
|---- WINDOW_UPDATE (16KB) --------->| 处理完,再要 16KB
|<---- DATA 16KB -------------------|
...
6.1.2 流量控制的核心概念
| 概念 | 说明 |
|---|---|
| 窗口(Window) | 接收方允许发送方发送的最大字节数 |
| 初始窗口(Initial Window) | 连接/流建立时的窗口大小(默认 65535 字节) |
| 窗口更新(Window Update) | 接收方通过 WINDOW_UPDATE 帧增加窗口大小 |
| 背压(Backpressure) | 接收方通过缩小窗口来减缓发送速率 |
⚠️ 重要特性:
- 流量控制是逐跳的(Hop-by-Hop),不是端到端的
- 代理服务器需要独立管理上游和下游的流量控制
- DATA 帧和 HEADERS 帧的负载计入流量控制
- 帧头部(9 字节)不计入流量控制
6.2 两级流量控制
HTTP/2 有两个独立的流量控制级别:
┌─────────────────────────────────────────────────┐
│ TCP 连接级别 │
│ ┌────────────────────────────────────────────┐ │
│ │ 连接级窗口 (Connection Window) │ │
│ │ │ │
│ │ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │ │
│ │ │ 流 1 │ │ 流 3 │ │ 流 5 │ │ 流 7 │ │ │
│ │ │窗口w1│ │窗口w3│ │窗口w5│ │窗口w7│ │ │
│ │ └──────┘ └──────┘ └──────┘ └──────┘ │ │
│ └────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────┘
两层窗口必须同时满足:
1. 流级窗口 > 0(该流允许接收数据)
2. 连接级窗口 > 0(连接总体允许接收数据)
| 级别 | 作用域 | 默认大小 | 控制范围 |
|---|---|---|---|
| 连接级 | 整个 TCP 连接 | 65535 字节 | 所有流共享 |
| 流级 | 单个流 | 65535 字节 | 独立控制 |
6.3 WINDOW_UPDATE 帧
6.3.1 帧格式
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length (24) |
+---------------+---------------+-------------------------------+
| Type (8) | Flags (8) |
+-+-------------+---------------+-------------------------------+
|R| Stream Identifier (31) |
+-+-------------------------------------------------------------+
|R| Window Size Increment (31) |
+-+-------------------------------------------------------------+
| 字段 | 说明 |
|---|---|
| Stream Identifier | 0 表示连接级更新,非 0 表示流级更新 |
| Window Size Increment | 窗口增量(1 到 2^31 - 1) |
6.3.2 窗口更新规则
class FlowControlWindow:
def __init__(self, initial_size: int = 65535):
self.size = initial_size
self.max_size = (1 << 31) - 1 # 2^31 - 1
self.consumed = 0
def consume(self, bytes_count: int) -> bool:
"""发送方消耗窗口"""
if bytes_count > self.size:
return False # 超出窗口,不可发送
self.size -= bytes_count
self.consumed += bytes_count
return True
def update(self, increment: int):
"""接收方增加窗口"""
new_size = self.size + increment
if new_size > self.max_size:
raise FlowControlError("窗口大小溢出")
self.size = new_size
def available(self) -> int:
"""可用窗口大小"""
return self.size
class FlowControlError(Exception):
pass
6.3.3 Go 实现流控
package main
import (
"fmt"
"sync"
)
type FlowControlWindow struct {
mu sync.Mutex
size int32
maxSize int32
consumed int64
}
func NewFlowControlWindow(initialSize int32) *FlowControlWindow {
return &FlowControlWindow{
size: initialSize,
maxSize: (1 << 31) - 1,
}
}
func (w *FlowControlWindow) Consume(n int32) error {
w.mu.Lock()
defer w.mu.Unlock()
if n > w.size {
return fmt.Errorf("流量控制错误: 需要 %d,可用 %d", n, w.size)
}
w.size -= n
w.consumed += int64(n)
return nil
}
func (w *FlowControlWindow) Update(increment int32) error {
w.mu.Lock()
defer w.mu.Unlock()
newSize := w.size + increment
if newSize > w.maxSize {
return fmt.Errorf("窗口大小溢出: %d + %d > %d", w.size, increment, w.maxSize)
}
w.size = newSize
return nil
}
func (w *FlowControlWindow) Available() int32 {
w.mu.Lock()
defer w.mu.Unlock()
return w.size
}
func main() {
// 连接级窗口
connWindow := NewFlowControlWindow(65535)
// 流级窗口
streamWindow := NewFlowControlWindow(65535)
// 模拟发送 16KB 数据
dataSize := int32(16384)
if err := connWindow.Consume(dataSize); err != nil {
fmt.Println("连接级错误:", err)
return
}
if err := streamWindow.Consume(dataSize); err != nil {
fmt.Println("流级错误:", err)
return
}
fmt.Printf("连接窗口剩余: %d\n", connWindow.Available())
fmt.Printf("流窗口剩余: %d\n", streamWindow.Available())
// 接收方处理完数据后,更新窗口
connWindow.Update(dataSize)
streamWindow.Update(dataSize)
fmt.Printf("更新后连接窗口: %d\n", connWindow.Available())
fmt.Printf("更新后流窗口: %d\n", streamWindow.Available())
}
6.4 初始窗口大小设置
6.4.1 SETTINGS_INITIAL_WINDOW_SIZE
# 连接建立时的窗口大小协商
class ConnectionSetup:
def __init__(self):
self.initial_window_size = 65535 # 默认值
self.max_frame_size = 16384 # 默认值
def process_settings(self, settings: dict):
"""处理 SETTINGS 帧"""
if 0x4 in settings: # SETTINGS_INITIAL_WINDOW_SIZE
new_size = settings[0x4]
if new_size > (1 << 31) - 1:
raise ValueError("窗口大小超出上限")
# 计算增量并更新所有现有流
delta = new_size - self.initial_window_size
self.initial_window_size = new_size
# 更新所有活跃流的窗口
# 注意:仅影响已存在的流
return delta
def create_settings_frame(self) -> dict:
"""创建初始 SETTINGS"""
return {
0x1: 4096, # HEADER_TABLE_SIZE
0x2: 1, # ENABLE_PUSH
0x3: 100, # MAX_CONCURRENT_STREAMS
0x4: self.initial_window_size,
0x5: self.max_frame_size,
}
6.4.2 窗口大小建议
| 场景 | 建议初始窗口 | 理由 |
|---|---|---|
| 通用 Web 服务 | 65535 (默认) | 平衡内存与吞吐 |
| 大文件传输 | 1048576 (1MB) | 减少 WINDOW_UPDATE 频率 |
| 实时小消息 | 16384 (16KB) | 快速反馈,减少缓冲 |
| 微服务通信 | 262144 (256KB) | 适中,支持中等负载 |
| 流媒体服务 | 1048576+ | 高吞吐优先 |
6.5 背压机制实现
6.5.1 生产者-消费者模型
import asyncio
from typing import Optional
class StreamBuffer:
"""流级别缓冲区,实现背压"""
def __init__(self, stream_id: int, window_size: int = 65535):
self.stream_id = stream_id
self.window_size = window_size
self.buffer = bytearray()
self.high_water = window_size * 0.8 # 高水位线
self.low_water = window_size * 0.2 # 低水位线
self.paused = False
self._event = asyncio.Event()
async def write(self, data: bytes):
"""生产者写入数据"""
while len(self.buffer) + len(data) > self.window_size:
# 等待消费者消费
self.paused = True
await self._event.wait()
self._event.clear()
self.buffer.extend(data)
if len(self.buffer) > self.high_water:
self.paused = True
async def read(self, size: int) -> bytes:
"""消费者读取数据"""
while len(self.buffer) == 0:
await asyncio._get_running_loop().run_in_executor(None, lambda: None)
chunk = bytes(self.buffer[:size])
self.buffer = self.buffer[size:]
if self.paused and len(self.buffer) < self.low_water:
self.paused = False
self._event.set()
return chunk
def window_update(self, increment: int):
"""接收方增加窗口"""
self.window_size += increment
# 使用示例
async def producer(buffer: StreamBuffer):
for i in range(100):
data = f"消息 {i}\n".encode()
await buffer.write(data)
print(f"发送: 消息 {i} (缓冲: {len(buffer.buffer)} 字节)")
async def consumer(buffer: StreamBuffer):
for _ in range(100):
data = await buffer.read(1024)
print(f"接收: {data.decode().strip()}")
await asyncio.sleep(0.1) # 模拟处理延迟
async def main():
buffer = StreamBuffer(stream_id=1, window_size=4096)
await asyncio.gather(producer(buffer), consumer(buffer))
asyncio.run(main())
6.5.2 连接级背压
package main
import (
"fmt"
"sync"
"time"
)
// ConnectionFlowControl 管理连接级别的流量控制
type ConnectionFlowControl struct {
mu sync.Mutex
connWindow int32
streamWindows map[uint32]int32
maxWindow int32
pendingUpdate int32
onWindowUpdate func(streamID uint32, increment int32)
}
func NewConnectionFlowControl(initialWindow int32) *ConnectionFlowControl {
return &ConnectionFlowControl{
connWindow: initialWindow,
streamWindows: make(map[uint32]int32),
maxWindow: 1 << 24, // 16MB
}
}
func (fc *ConnectionFlowControl) RegisterStream(streamID uint32, windowSize int32) {
fc.mu.Lock()
defer fc.mu.Unlock()
fc.streamWindows[streamID] = windowSize
}
func (fc *ConnectionFlowControl) RemoveStream(streamID uint32) {
fc.mu.Lock()
defer fc.mu.Unlock()
delete(fc.streamWindows, streamID)
}
func (fc *ConnectionFlowControl) CanSend(streamID uint32, size int32) bool {
fc.mu.Lock()
defer fc.mu.Unlock()
streamWindow := fc.streamWindows[streamID]
return fc.connWindow >= size && streamWindow >= size
}
func (fc *ConnectionFlowControl) Consume(streamID uint32, size int32) {
fc.mu.Lock()
defer fc.mu.Unlock()
fc.connWindow -= size
fc.streamWindows[streamID] -= size
}
func (fc *ConnectionFlowControl) OnDataReceived(streamID uint32, size int32) {
fc.mu.Lock()
fc.pendingUpdate += size
// 累积到一定量再发送 WINDOW_UPDATE
if fc.pendingUpdate >= fc.maxWindow/2 {
increment := fc.pendingUpdate
fc.pendingUpdate = 0
fc.mu.Unlock()
if fc.onWindowUpdate != nil {
fc.onWindowUpdate(0, increment) // 0 = 连接级
fc.onWindowUpdate(streamID, increment)
}
} else {
fc.mu.Unlock()
}
}
func main() {
fc := NewConnectionFlowControl(65535)
fc.RegisterStream(1, 65535)
fc.RegisterStream(3, 65535)
// 模拟发送
dataSize := int32(1024)
if fc.CanSend(1, dataSize) {
fc.Consume(1, dataSize)
fmt.Printf("流 1 发送 %d 字节,连接窗口剩余: %d\n", dataSize, fc.connWindow)
}
// 模拟接收并自动更新窗口
fc.OnDataReceived(1, dataSize)
fmt.Printf("处理接收数据后,待更新: %d\n", fc.pendingUpdate)
}
6.6 流量控制与流优先级
流优先级影响带宽分配,但不影响流量控制:
高优先级流(权重 256):获得 80% 带宽
低优先级流(权重 64):获得 20% 带宽
但如果低优先级流的窗口为 0:
- 该流无法发送任何数据
- 空闲带宽分配给其他流
实际分配 = min(可用窗口, 优先级带宽份额)
6.7 业务场景:视频流媒体
场景:HTTP/2 传输视频流
问题:
- 视频流持续传输大量数据
- 客户端播放缓冲区有限(通常 30-60 秒)
- 如果不控制,服务器会无限推送
解决方案:
1. 客户端维护播放缓冲区大小
2. 缓冲区满时停止发送 WINDOW_UPDATE
3. 缓冲区降到阈值以下时恢复 WINDOW_UPDATE
4. 实现平滑播放,避免卡顿
import asyncio
class VideoStreamReceiver:
def __init__(self, stream_id: int, buffer_limit: int = 5 * 1024 * 1024):
self.stream_id = stream_id
self.buffer_limit = buffer_limit # 5MB 缓冲限制
self.buffer = bytearray()
self.window_size = 65535
self.paused = False
async def on_data(self, data: bytes):
"""接收视频数据"""
self.buffer.extend(data)
# 缓冲区接近满,暂停接收
if len(self.buffer) > self.buffer_limit * 0.9:
self.paused = True
print(f"缓冲区 {len(self.buffer)}/{self.buffer_limit},暂停接收")
def consume_for_playback(self, size: int) -> bytes:
"""播放器消费数据"""
chunk = bytes(self.buffer[:size])
self.buffer = self.buffer[size:]
# 缓冲区降低,恢复接收
if self.paused and len(self.buffer) < self.buffer_limit * 0.5:
self.paused = False
increment = self.buffer_limit - len(self.buffer)
print(f"缓冲区恢复,请求 {increment} 字节窗口更新")
return chunk, increment
return chunk, 0
def should_send_window_update(self) -> tuple[bool, int]:
"""判断是否需要发送 WINDOW_UPDATE"""
if self.paused:
return False, 0
available = self.buffer_limit - len(self.buffer)
if available > self.window_size:
return True, available - self.window_size
return False, 0
6.8 注意事项
⚠️ 窗口耗尽处理:
- 发送方在窗口为 0 时不得发送 DATA 帧
- 必须等待 WINDOW_UPDATE 帧才能继续发送
- 如果等待时间过长,应发送 PING 检测连接活性
⚠️ 窗口溢出:
- WINDOW_UPDATE 帧可能导致窗口大小超过 2^31 - 1
- 这是 FLOW_CONTROL_ERROR,必须终止连接
- 实现时应检查每次更新后的窗口大小
⚠️ 代理服务器:
- 代理需要独立管理上下游的流控窗口
- 上游窗口耗尽时应停止向下游推送
- 下游窗口耗尽时应停止请求上游数据
💡 调优建议:
- 根据网络带宽延迟积(BDP)设置窗口大小
- BDP = 带宽 × RTT,窗口应不小于 BDP
- 高延迟网络(如跨洋)需要更大的窗口
6.9 扩展阅读
- 📖 RFC 7540 Section 5.2 - Flow Control
- 📖 RFC 7540 Section 6.9 - WINDOW_UPDATE
- 📖 TCP Flow Control vs HTTP/2 Flow Control
- 📖 Bandwidth-Delay Product