RTMP 协议精讲 / 03 - 块流机制
块流机制(Chunk Stream)
3.1 为什么需要块流
RTMP 在握手完成后,所有的消息(Message)都通过 块流(Chunk Stream) 传输。块流层是 RTMP 协议中最核心、最精巧的设计。
设计动机
在实时通信中存在一个关键矛盾:
- 大消息:音视频帧通常较大(视频关键帧可达数十 KB)
- 低延迟:实时场景需要快速响应控制消息
- 多路复用:多个流的音视频和控制数据需要在同一个 TCP 连接上传输
解决方案:将大消息拆分为小的 块(Chunk),实现交错发送:
不用块流(理想但不可行):
┌──────────────────────────────────────────┐
│ 大视频帧 (50KB) │ ← 必须等发送完才能发其他消息
└──────────────────────────────────────────┘
使用块流(RTMP 实际方案):
┌──────┐┌──────┐┌────────┐┌──────┐┌────────┐┌──────┐
│Video ││Audio ││ Ctrl ││Video ││ Audio ││Video │ ← 交错发送
│Chunk ││Chunk ││ Msg ││Chunk ││ Chunk ││Chunk │
│ #1 ││ #1 ││ ││ #2 ││ #2 ││ #3 │
└──────┘└──────┘└────────┘└──────┘└────────┘└──────┘
3.2 块的基本结构
每个块(Chunk)由两部分组成:
┌────────────────┐┌────────────────────────────────┐
│ Chunk Header ││ Chunk Body │
│ (变长) ││ (最大 128~1MB) │
└────────────────┘└────────────────────────────────┘
块头结构
块头由三部分组成:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
│ fmt (2 bits) │ cs id (6 bits) │ 基本头(1-4字节)
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
│ │
│ 消息头 (0/3/7/11 bytes,取决于 fmt) │
│ │
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
│ 扩展时间戳 (0/4 bytes,仅当 timestamp=0xFFFFFF) │
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
字段说明:
┌─────────────────┬──────────────────────────────────────────────────────┐
│ 字段 │ 说明 │
├─────────────────┼──────────────────────────────────────────────────────┤
│ fmt (2 bits) │ 块消息头格式(0-3),决定消息头的长度和内容 │
│ cs id (6-14 bits) │ 块流 ID,标识逻辑通道 │
│ timestamp │ 时间戳(3 bytes 或 4 bytes 扩展) │
│ message length │ 消息长度(3 bytes) │
│ message type │ 消息类型 ID(1 byte) │
│ message stream │ 消息流 ID(4 bytes) │
└─────────────────┴──────────────────────────────────────────────────────┘
3.3 块流 ID(Chunk Stream ID)
块流 ID 用于标识不同的逻辑通道,预定义了以下特殊值:
| 范围 | 用途 | 说明 |
|---|---|---|
| 2 | 协议控制消息 | Set Chunk Size、Abort 等 |
| 3 | 消息控制消息 | Window Ack Size、Set Peer Bandwidth |
| 4-6 | 保留 | — |
| 7 | 命令消息 | connect、createStream 等 AMF 命令 |
| 8 | 音频消息 | 音频数据 |
| 9 | 视频消息 | 视频数据 |
| 10-31 | 保留 | — |
| 32-65599 | 用户自定义 | 扩展使用 |
注意:这些 ID 是约定俗成的推荐值,不同实现可能有所不同。SRS 中默认音频用 cs_id=8,视频用 cs_id=6。
3.4 块消息头格式(fmt)
fmt 字段(2 bits)决定块消息头的类型,共有 4 种格式,从类型 0 到类型 3 信息依次减少:
类型 0(fmt=0)— 11 字节
完整消息头,用于消息流的第一个块或时间戳回绕时。
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
│ timestamp (3 bytes, big-endian) │
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
│ message length (3 bytes, big-endian) │
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
│ message type id (1 byte) │ │
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
│ message stream id (4 bytes, little-endian) │
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
字段说明:
┌──────────────┬──────────────────────────────────────────────────┐
│ timestamp │ 绝对时间戳,3 bytes。若为 0xFFFFFF 则读取扩展时间戳│
│ msg length │ 消息总长度(非块长度) │
│ msg type id │ 消息类型(1=Set Chunk Size, 8=Audio, 9=Video 等)│
│ msg stream │ 消息流 ID,注意是 **小端序**(Little-Endian) │
└──────────────┴──────────────────────────────────────────────────┘
类型 1(fmt=1)— 7 字节
省略消息流 ID(沿用前一个块),用于 同一消息流 中消息结构变化的情况。
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
│ timestamp delta (3 bytes, big-endian) │
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
│ message length (3 bytes, big-endian) │
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
│ message type id (1 byte) │
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
与类型 0 的区别:
- 时间戳是 **增量值**(相对于前一个块)
- 消息流 ID 沿用前一个块
类型 2(fmt=2)— 3 字节
只包含时间戳增量,用于 同一消息流、同一消息长度和类型 的情况(如连续的音频块)。
0 1 2
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
│ timestamp delta (3 bytes) │
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
字段沿用:
- 消息长度 = 前一个块
- 消息类型 = 前一个块
- 消息流 ID = 前一个块
类型 3(fmt=3)— 0 字节
无消息头,所有信息沿用前一个块。用于:
- 拆分后的后续块(同一个消息的多个块)
- 连续相同参数的消息
格式对比总表
| fmt | 头大小 | timestamp | msg length | msg type | msg stream |
|---|---|---|---|---|---|
| 0 | 11 bytes | 绝对值 | ✅ 新值 | ✅ 新值 | ✅ 新值 |
| 1 | 7 bytes | 增量 | ✅ 新值 | ✅ 新值 | ❌ 沿用 |
| 2 | 3 bytes | 增量 | ❌ 沿用 | ❌ 沿用 | ❌ 沿用 |
| 3 | 0 bytes | ❌ 沿用 | ❌ 沿用 | ❌ 沿用 | ❌ 沿用 |
3.5 基本头编码
基本头(Basic Header)编码块流 ID,长度为 1-3 字节:
1 字节格式 (cs id: 2-63):
0 1 2 3 4 5 6 7
+-+-+-+-+-+-+-+-+
│fmt│ cs id │ cs id = 该字段值
+-+-+-+-+-+-+-+-+
2 字节格式 (cs id: 64-319):
0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
│fmt│ 0 (6b) │ cs id - 64 │ cs id = 第二字节 + 64
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
3 字节格式 (cs id: 320-65599):
0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
│fmt│ 1 (6b) │ cs id - 64 (2 bytes) │ cs id = 后两字节 + 64
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
编码解码实现
def encode_basic_header(fmt: int, cs_id: int) -> bytes:
"""编码基本头"""
fmt = fmt & 0x03
if 2 <= cs_id <= 63:
return bytes([(fmt << 6) | cs_id])
elif 64 <= cs_id <= 319:
return bytes([fmt << 6, cs_id - 64])
elif 320 <= cs_id <= 65599:
return bytes([(fmt << 6) | 1,
(cs_id - 64) & 0xFF,
((cs_id - 64) >> 8) & 0xFF])
else:
raise ValueError(f"Invalid cs_id: {cs_id}")
def decode_basic_header(data: bytes, offset: int = 0):
"""解码基本头,返回 (fmt, cs_id, header_length)"""
byte0 = data[offset]
fmt = (byte0 >> 6) & 0x03
cs_id = byte0 & 0x3F
if cs_id == 0:
# 2 字节格式
cs_id = data[offset + 1] + 64
return fmt, cs_id, 2
elif cs_id == 1:
# 3 字节格式
cs_id = data[offset + 1] + (data[offset + 2] << 8) + 64
return fmt, cs_id, 3
else:
# 1 字节格式
return fmt, cs_id, 1
3.6 消息拆分(Message Splitting)
当消息大于最大块长(Default Chunk Size = 128 bytes)时,必须拆分为多个块。
拆分规则
- 第一个块使用适当的 fmt(0/1/2/3)
- 后续块使用 fmt=3(无消息头,所有信息沿用)
- 最后一个块的 body 可能小于最大块长
拆分示例
假设发送一条 300 字节的消息(消息类型 9=Video,块流 ID 6):
原始消息 (300 bytes):
┌─────────────────────────────────────────────────────────────────┐
│ 300 bytes of data │
└─────────────────────────────────────────────────────────────────┘
拆分为 3 个块(最大块长 128 bytes):
Chunk #1 (fmt=0): ← 首块,完整消息头
┌───────────┬──────────────────────────────┐
│ Header │ Body │
│ 1+11 bytes│ 128 bytes │
│ │ [byte 0 ~ 127] │
└───────────┴──────────────────────────────┘
Chunk #2 (fmt=3): ← 后续块,无消息头
┌───────────┬──────────────────────────────┐
│ Header │ Body │
│ 1 byte │ 128 bytes │
│ │ [byte 128 ~ 255] │
└───────────┴──────────────────────────────┘
Chunk #3 (fmt=3): ← 最后一个块
┌───────────┬──────────────────────────────┐
│ Header │ Body │
│ 1 byte │ 44 bytes │
│ │ [byte 256 ~ 299] │
└───────────┴──────────────────────────────┘
Python 实现消息拆分
DEFAULT_CHUNK_SIZE = 128
def split_message_into_chunks(
message: bytes,
msg_type_id: int,
msg_stream_id: int,
timestamp: int,
chunk_stream_id: int,
chunk_size: int = DEFAULT_CHUNK_SIZE
) -> list:
"""将消息拆分为多个块"""
chunks = []
msg_length = len(message)
offset = 0
first_chunk = True
while offset < msg_length:
body_length = min(chunk_size, msg_length - offset)
body = message[offset:offset + body_length]
if first_chunk:
# 第一个块:使用 fmt=0(完整头)
header = encode_chunk_header(
fmt=0,
cs_id=chunk_stream_id,
timestamp=timestamp,
msg_length=msg_length,
msg_type_id=msg_type_id,
msg_stream_id=msg_stream_id
)
first_chunk = False
else:
# 后续块:使用 fmt=3(无头)
header = encode_chunk_header(
fmt=3,
cs_id=chunk_stream_id
)
chunks.append(header + body)
offset += body_length
return chunks
def encode_chunk_header(fmt, cs_id, timestamp=0, msg_length=0,
msg_type_id=0, msg_stream_id=0) -> bytes:
"""编码块头"""
parts = []
# 基本头
parts.append(encode_basic_header(fmt, cs_id))
# 消息头(根据 fmt)
if fmt == 0:
ts = min(timestamp, 0xFFFFFF)
parts.append(struct.pack('>I', ts)[1:]) # 3 bytes
parts.append(struct.pack('>I', msg_length)[1:]) # 3 bytes
parts.append(struct.pack('B', msg_type_id))
parts.append(struct.pack('<I', msg_stream_id)) # little-endian!
if timestamp >= 0xFFFFFF:
parts.append(struct.pack('>I', timestamp)) # 扩展时间戳
elif fmt == 1:
ts = min(timestamp, 0xFFFFFF)
parts.append(struct.pack('>I', ts)[1:])
parts.append(struct.pack('>I', msg_length)[1:])
parts.append(struct.pack('B', msg_type_id))
if timestamp >= 0xFFFFFF:
parts.append(struct.pack('>I', timestamp))
elif fmt == 2:
ts = min(timestamp, 0xFFFFFF)
parts.append(struct.pack('>I', ts)[1:])
if timestamp >= 0xFFFFFF:
parts.append(struct.pack('>I', timestamp))
# fmt == 3: 无消息头
return b''.join(parts)
3.7 Set Chunk Size 消息
协议控制消息,用于协商最大块长。
消息结构:
┌──────────────────────────────────────────────┐
│ chunk size (4 bytes, big-endian) │
│ 31 bits: 新的块大小 │
│ 1 bit: 0(保留,必须为 0) │
└──────────────────────────────────────────────┘
默认块大小: 128 bytes
常见配置: 4096 ~ 65536 bytes
最大值: 2^31 - 1 = 2,147,483,647 bytes
设置块大小
def create_set_chunk_size(chunk_size: int) -> bytes:
"""创建 Set Chunk Size 协议控制消息"""
# 消息类型 ID = 1
# 块流 ID = 2(协议控制消息)
msg = struct.pack('>I', chunk_size & 0x7FFFFFFF)
# 块头 (fmt=0, cs_id=2, timestamp=0)
header = encode_basic_header(0, 2)
header += struct.pack('>I', 0)[1:] # timestamp = 0
header += struct.pack('>I', 4)[1:] # msg length = 4
header += struct.pack('B', 1) # msg type = 1 (Set Chunk Size)
header += struct.pack('<I', 0) # msg stream = 0
return header + msg
def parse_set_chunk_size(data: bytes) -> int:
"""解析 Set Chunk Size"""
return struct.unpack('>I', data[:4])[0] & 0x7FFFFFFF
块大小选择策略
| 场景 | 推荐块大小 | 理由 |
|---|---|---|
| 低延迟直播 | 128-4096 | 小块交错更及时 |
| 高码率推流 | 16384-65536 | 减少头部开销 |
| 局域网传输 | 65536 | 带宽充足,减少分块次数 |
| 弱网环境 | 4096-8192 | 平衡开销和重传代价 |
3.8 Abort 消息
用于丢弃一个块流中的部分数据(例如客户端取消接收时)。
消息结构:
┌──────────────────────────────────────┐
│ chunk stream id (4 bytes, big-endian) │
└──────────────────────────────────────┘
消息类型 ID = 2
块流 ID = 2(协议控制消息)
def create_abort(cs_id: int) -> bytes:
"""创建 Abort 消息"""
msg = struct.pack('>I', cs_id)
header = encode_basic_header(0, 2)
header += struct.pack('>I', 0)[1:] # timestamp
header += struct.pack('>I', 4)[1:] # msg length
header += struct.pack('B', 2) # msg type = 2 (Abort)
header += struct.pack('<I', 0) # msg stream
return header + msg
3.9 Acknowledgement 消息
用于确认接收到的数据量,实现流量控制。
消息结构:
┌──────────────────────────────────────┐
│ sequence number (4 bytes, big-endian) │
└──────────────────────────────────────┘
消息类型 ID = 3
块流 ID = 2
sequence number: 到目前为止接收到的总字节数(取模 2^32)
确认机制
发送方 接收方
│ │
│──── 发送数据 (累计 4096 bytes) ────────→│
│ │
│←─── Acknowledgement (seq=4096) ────────│
│ │
│──── 发送数据 (累计 8192 bytes) ────────→│
│ │
│←─── Acknowledgement (seq=8192) ────────│
│ │
3.10 块流重组完整实现
#!/usr/bin/env python3
"""
RTMP Chunk Stream Reassembler
重组块流中的消息
"""
import struct
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class ChunkState:
"""单个块流的状态"""
cs_id: int
# fmt=0 的完整信息
timestamp: int = 0
timestamp_delta: int = 0
msg_length: int = 0
msg_type_id: int = 0
msg_stream_id: int = 0
# 当前消息缓冲
msg_buffer: bytes = b''
has_extended_ts: bool = False
prev_fmt: int = 3
class ChunkStreamReassembler:
"""RTMP 块流重组器"""
DEFAULT_CHUNK_SIZE = 128
def __init__(self):
self.chunk_size = self.DEFAULT_CHUNK_SIZE
self.streams: dict[int, ChunkState] = {}
def set_chunk_size(self, size: int):
"""设置接收块大小"""
self.chunk_size = size
def get_state(self, cs_id: int) -> ChunkState:
"""获取或创建块流状态"""
if cs_id not in self.streams:
self.streams[cs_id] = ChunkState(cs_id=cs_id)
return self.streams[cs_id]
def parse_chunk(self, data: bytes, offset: int = 0) -> tuple:
"""
解析一个块,返回 (message, new_offset)
如果消息不完整返回 (None, offset)
"""
# 解析基本头
fmt, cs_id, basic_hdr_len = decode_basic_header(data, offset)
pos = offset + basic_hdr_len
state = self.get_state(cs_id)
# 解析消息头
if fmt == 0:
ts = struct.unpack('>I', b'\x00' + data[pos:pos+3])[0]
pos += 3
msg_len = struct.unpack('>I', b'\x00' + data[pos:pos+3])[0]
pos += 3
msg_type = data[pos]; pos += 1
msg_stream = struct.unpack('<I', data[pos:pos+4])[0]
pos += 4
if ts == 0xFFFFFF:
ts = struct.unpack('>I', data[pos:pos+4])[0]
pos += 4
state.has_extended_ts = True
state.timestamp = ts
state.msg_length = msg_len
state.msg_type_id = msg_type
state.msg_stream_id = msg_stream
state.msg_buffer = b''
state.prev_fmt = 0
elif fmt == 1:
ts_delta = struct.unpack('>I', b'\x00' + data[pos:pos+3])[0]
pos += 3
msg_len = struct.unpack('>I', b'\x00' + data[pos:pos+3])[0]
pos += 3
msg_type = data[pos]; pos += 1
if ts_delta == 0xFFFFFF:
ts_delta = struct.unpack('>I', data[pos:pos+4])[0]
pos += 4
state.timestamp += ts_delta
state.timestamp_delta = ts_delta
state.msg_length = msg_len
state.msg_type_id = msg_type
state.msg_buffer = b''
state.prev_fmt = 1
elif fmt == 2:
ts_delta = struct.unpack('>I', b'\x00' + data[pos:pos+3])[0]
pos += 3
if ts_delta == 0xFFFFFF:
ts_delta = struct.unpack('>I', data[pos:pos+4])[0]
pos += 4
state.timestamp += ts_delta
state.timestamp_delta = ts_delta
state.msg_buffer = b''
state.prev_fmt = 2
elif fmt == 3:
if state.has_extended_ts:
pos += 4
if state.prev_fmt in (0, 1):
pass # 已在之前更新
elif state.prev_fmt == 2:
state.timestamp += state.timestamp_delta
# else: fmt=3 after fmt=3, no change
# 读取块体
remaining = min(self.chunk_size, state.msg_length - len(state.msg_buffer))
if pos + remaining > len(data):
return None, offset # 数据不足
state.msg_buffer += data[pos:pos + remaining]
pos += remaining
# 检查消息是否完整
if len(state.msg_buffer) >= state.msg_length:
message = {
'timestamp': state.timestamp,
'msg_type_id': state.msg_type_id,
'msg_stream_id': state.msg_stream_id,
'data': state.msg_buffer[:state.msg_length]
}
state.msg_buffer = b''
return message, pos
return None, pos # 消息不完整
注意事项
- 默认块大小:握手完成后默认为 128 bytes,双方应在连接建立后立即发送 Set Chunk Size
- 时间戳回绕:3 字节时间戳最大 0xFFFFFF(约 4.6 小时),超过后使用扩展时间戳
- 消息流 ID 字节序:基本头和大部分字段是大端序,但消息流 ID 是 小端序
- 块交错:同一 TCP 连接上多个块流的块可以任意交错,实现时必须维护每个 cs_id 的独立状态
- Set Chunk Size 时机:应在 connect 命令之后、音视频数据之前发送