强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

RTMP 协议精讲 / 03 - 块流机制

块流机制(Chunk Stream)

3.1 为什么需要块流

RTMP 在握手完成后,所有的消息(Message)都通过 块流(Chunk Stream) 传输。块流层是 RTMP 协议中最核心、最精巧的设计。

设计动机

在实时通信中存在一个关键矛盾:

  • 大消息:音视频帧通常较大(视频关键帧可达数十 KB)
  • 低延迟:实时场景需要快速响应控制消息
  • 多路复用:多个流的音视频和控制数据需要在同一个 TCP 连接上传输

解决方案:将大消息拆分为小的 块(Chunk),实现交错发送:

不用块流(理想但不可行):
┌──────────────────────────────────────────┐
│         大视频帧 (50KB)                   │  ← 必须等发送完才能发其他消息
└──────────────────────────────────────────┘

使用块流(RTMP 实际方案):
┌──────┐┌──────┐┌────────┐┌──────┐┌────────┐┌──────┐
│Video ││Audio ││ Ctrl   ││Video ││ Audio  ││Video │  ← 交错发送
│Chunk ││Chunk ││ Msg    ││Chunk ││ Chunk  ││Chunk │
│ #1   ││ #1   ││        ││ #2   ││ #2     ││ #3   │
└──────┘└──────┘└────────┘└──────┘└────────┘└──────┘

3.2 块的基本结构

每个块(Chunk)由两部分组成:

┌────────────────┐┌────────────────────────────────┐
│   Chunk Header ││          Chunk Body             │
│   (变长)        ││     (最大 128~1MB)              │
└────────────────┘└────────────────────────────────┘

块头结构

块头由三部分组成:

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
│   fmt (2 bits)  │        cs id (6 bits)        │   基本头(1-4字节)
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
│                                                               │
│              消息头 (0/3/7/11 bytes,取决于 fmt)               │
│                                                               │
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
│         扩展时间戳 (0/4 bytes,仅当 timestamp=0xFFFFFF)        │
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

字段说明:
┌─────────────────┬──────────────────────────────────────────────────────┐
│  字段           │  说明                                                │
├─────────────────┼──────────────────────────────────────────────────────┤
│ fmt (2 bits)    │ 块消息头格式(0-3),决定消息头的长度和内容           │
│ cs id (6-14 bits) │ 块流 ID,标识逻辑通道                              │
│ timestamp       │ 时间戳(3 bytes 或 4 bytes 扩展)                    │
│ message length  │ 消息长度(3 bytes)                                  │
│ message type    │ 消息类型 ID(1 byte)                                │
│ message stream  │ 消息流 ID(4 bytes)                                 │
└─────────────────┴──────────────────────────────────────────────────────┘

3.3 块流 ID(Chunk Stream ID)

块流 ID 用于标识不同的逻辑通道,预定义了以下特殊值:

范围用途说明
2协议控制消息Set Chunk Size、Abort 等
3消息控制消息Window Ack Size、Set Peer Bandwidth
4-6保留
7命令消息connect、createStream 等 AMF 命令
8音频消息音频数据
9视频消息视频数据
10-31保留
32-65599用户自定义扩展使用

注意:这些 ID 是约定俗成的推荐值,不同实现可能有所不同。SRS 中默认音频用 cs_id=8,视频用 cs_id=6。


3.4 块消息头格式(fmt)

fmt 字段(2 bits)决定块消息头的类型,共有 4 种格式,从类型 0 到类型 3 信息依次减少:

类型 0(fmt=0)— 11 字节

完整消息头,用于消息流的第一个块或时间戳回绕时。

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
│                 timestamp (3 bytes, big-endian)                │
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
│               message length (3 bytes, big-endian)             │
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
│    message type id (1 byte)   │                                 │
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
│              message stream id (4 bytes, little-endian)         │
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

字段说明:
┌──────────────┬──────────────────────────────────────────────────┐
│ timestamp    │ 绝对时间戳,3 bytes。若为 0xFFFFFF 则读取扩展时间戳│
│ msg length   │ 消息总长度(非块长度)                            │
│ msg type id  │ 消息类型(1=Set Chunk Size, 8=Audio, 9=Video 等)│
│ msg stream   │ 消息流 ID,注意是 **小端序**(Little-Endian)     │
└──────────────┴──────────────────────────────────────────────────┘

类型 1(fmt=1)— 7 字节

省略消息流 ID(沿用前一个块),用于 同一消息流 中消息结构变化的情况。

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
│              timestamp delta (3 bytes, big-endian)             │
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
│               message length (3 bytes, big-endian)             │
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
│    message type id (1 byte)   │
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

与类型 0 的区别:
- 时间戳是 **增量值**(相对于前一个块)
- 消息流 ID 沿用前一个块

类型 2(fmt=2)— 3 字节

只包含时间戳增量,用于 同一消息流、同一消息长度和类型 的情况(如连续的音频块)。

 0                   1                   2
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
│              timestamp delta (3 bytes)         │
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

字段沿用:
- 消息长度 = 前一个块
- 消息类型 = 前一个块
- 消息流 ID = 前一个块

类型 3(fmt=3)— 0 字节

无消息头,所有信息沿用前一个块。用于:

  1. 拆分后的后续块(同一个消息的多个块)
  2. 连续相同参数的消息

格式对比总表

fmt头大小timestampmsg lengthmsg typemsg stream
011 bytes绝对值✅ 新值✅ 新值✅ 新值
17 bytes增量✅ 新值✅ 新值❌ 沿用
23 bytes增量❌ 沿用❌ 沿用❌ 沿用
30 bytes❌ 沿用❌ 沿用❌ 沿用❌ 沿用

3.5 基本头编码

基本头(Basic Header)编码块流 ID,长度为 1-3 字节:

1 字节格式 (cs id: 2-63):
 0 1 2 3 4 5 6 7
+-+-+-+-+-+-+-+-+
│fmt│   cs id   │     cs id = 该字段值
+-+-+-+-+-+-+-+-+

2 字节格式 (cs id: 64-319):
 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
│fmt│  0 (6b)   │   cs id - 64  │     cs id = 第二字节 + 64
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

3 字节格式 (cs id: 320-65599):
 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
│fmt│  1 (6b)   │      cs id - 64 (2 bytes)     │     cs id = 后两字节 + 64
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

编码解码实现

def encode_basic_header(fmt: int, cs_id: int) -> bytes:
    """编码基本头"""
    fmt = fmt & 0x03
    if 2 <= cs_id <= 63:
        return bytes([(fmt << 6) | cs_id])
    elif 64 <= cs_id <= 319:
        return bytes([fmt << 6, cs_id - 64])
    elif 320 <= cs_id <= 65599:
        return bytes([(fmt << 6) | 1,
                       (cs_id - 64) & 0xFF,
                       ((cs_id - 64) >> 8) & 0xFF])
    else:
        raise ValueError(f"Invalid cs_id: {cs_id}")


def decode_basic_header(data: bytes, offset: int = 0):
    """解码基本头,返回 (fmt, cs_id, header_length)"""
    byte0 = data[offset]
    fmt = (byte0 >> 6) & 0x03
    cs_id = byte0 & 0x3F

    if cs_id == 0:
        # 2 字节格式
        cs_id = data[offset + 1] + 64
        return fmt, cs_id, 2
    elif cs_id == 1:
        # 3 字节格式
        cs_id = data[offset + 1] + (data[offset + 2] << 8) + 64
        return fmt, cs_id, 3
    else:
        # 1 字节格式
        return fmt, cs_id, 1

3.6 消息拆分(Message Splitting)

当消息大于最大块长(Default Chunk Size = 128 bytes)时,必须拆分为多个块。

拆分规则

  1. 第一个块使用适当的 fmt(0/1/2/3)
  2. 后续块使用 fmt=3(无消息头,所有信息沿用)
  3. 最后一个块的 body 可能小于最大块长

拆分示例

假设发送一条 300 字节的消息(消息类型 9=Video,块流 ID 6):

原始消息 (300 bytes):
┌─────────────────────────────────────────────────────────────────┐
│                        300 bytes of data                         │
└─────────────────────────────────────────────────────────────────┘

拆分为 3 个块(最大块长 128 bytes):

Chunk #1 (fmt=0):    ← 首块,完整消息头
┌───────────┬──────────────────────────────┐
│ Header    │         Body                  │
│ 1+11 bytes│      128 bytes                │
│           │ [byte 0 ~ 127]               │
└───────────┴──────────────────────────────┘

Chunk #2 (fmt=3):    ← 后续块,无消息头
┌───────────┬──────────────────────────────┐
│ Header    │         Body                  │
│ 1 byte    │      128 bytes                │
│           │ [byte 128 ~ 255]             │
└───────────┴──────────────────────────────┘

Chunk #3 (fmt=3):    ← 最后一个块
┌───────────┬──────────────────────────────┐
│ Header    │         Body                  │
│ 1 byte    │      44 bytes                 │
│           │ [byte 256 ~ 299]             │
└───────────┴──────────────────────────────┘

Python 实现消息拆分

DEFAULT_CHUNK_SIZE = 128


def split_message_into_chunks(
    message: bytes,
    msg_type_id: int,
    msg_stream_id: int,
    timestamp: int,
    chunk_stream_id: int,
    chunk_size: int = DEFAULT_CHUNK_SIZE
) -> list:
    """将消息拆分为多个块"""
    chunks = []
    msg_length = len(message)
    offset = 0
    first_chunk = True

    while offset < msg_length:
        body_length = min(chunk_size, msg_length - offset)
        body = message[offset:offset + body_length]

        if first_chunk:
            # 第一个块:使用 fmt=0(完整头)
            header = encode_chunk_header(
                fmt=0,
                cs_id=chunk_stream_id,
                timestamp=timestamp,
                msg_length=msg_length,
                msg_type_id=msg_type_id,
                msg_stream_id=msg_stream_id
            )
            first_chunk = False
        else:
            # 后续块:使用 fmt=3(无头)
            header = encode_chunk_header(
                fmt=3,
                cs_id=chunk_stream_id
            )

        chunks.append(header + body)
        offset += body_length

    return chunks


def encode_chunk_header(fmt, cs_id, timestamp=0, msg_length=0,
                        msg_type_id=0, msg_stream_id=0) -> bytes:
    """编码块头"""
    parts = []

    # 基本头
    parts.append(encode_basic_header(fmt, cs_id))

    # 消息头(根据 fmt)
    if fmt == 0:
        ts = min(timestamp, 0xFFFFFF)
        parts.append(struct.pack('>I', ts)[1:])  # 3 bytes
        parts.append(struct.pack('>I', msg_length)[1:])  # 3 bytes
        parts.append(struct.pack('B', msg_type_id))
        parts.append(struct.pack('<I', msg_stream_id))  # little-endian!
        if timestamp >= 0xFFFFFF:
            parts.append(struct.pack('>I', timestamp))  # 扩展时间戳
    elif fmt == 1:
        ts = min(timestamp, 0xFFFFFF)
        parts.append(struct.pack('>I', ts)[1:])
        parts.append(struct.pack('>I', msg_length)[1:])
        parts.append(struct.pack('B', msg_type_id))
        if timestamp >= 0xFFFFFF:
            parts.append(struct.pack('>I', timestamp))
    elif fmt == 2:
        ts = min(timestamp, 0xFFFFFF)
        parts.append(struct.pack('>I', ts)[1:])
        if timestamp >= 0xFFFFFF:
            parts.append(struct.pack('>I', timestamp))
    # fmt == 3: 无消息头

    return b''.join(parts)

3.7 Set Chunk Size 消息

协议控制消息,用于协商最大块长。

消息结构:
┌──────────────────────────────────────────────┐
│  chunk size (4 bytes, big-endian)             │
│  31 bits: 新的块大小                           │
│  1 bit: 0(保留,必须为 0)                     │
└──────────────────────────────────────────────┘

默认块大小: 128 bytes
常见配置: 4096 ~ 65536 bytes
最大值: 2^31 - 1 = 2,147,483,647 bytes

设置块大小

def create_set_chunk_size(chunk_size: int) -> bytes:
    """创建 Set Chunk Size 协议控制消息"""
    # 消息类型 ID = 1
    # 块流 ID = 2(协议控制消息)
    msg = struct.pack('>I', chunk_size & 0x7FFFFFFF)

    # 块头 (fmt=0, cs_id=2, timestamp=0)
    header = encode_basic_header(0, 2)
    header += struct.pack('>I', 0)[1:]  # timestamp = 0
    header += struct.pack('>I', 4)[1:]  # msg length = 4
    header += struct.pack('B', 1)       # msg type = 1 (Set Chunk Size)
    header += struct.pack('<I', 0)      # msg stream = 0

    return header + msg


def parse_set_chunk_size(data: bytes) -> int:
    """解析 Set Chunk Size"""
    return struct.unpack('>I', data[:4])[0] & 0x7FFFFFFF

块大小选择策略

场景推荐块大小理由
低延迟直播128-4096小块交错更及时
高码率推流16384-65536减少头部开销
局域网传输65536带宽充足,减少分块次数
弱网环境4096-8192平衡开销和重传代价

3.8 Abort 消息

用于丢弃一个块流中的部分数据(例如客户端取消接收时)。

消息结构:
┌──────────────────────────────────────┐
│  chunk stream id (4 bytes, big-endian) │
└──────────────────────────────────────┘

消息类型 ID = 2
块流 ID = 2(协议控制消息)
def create_abort(cs_id: int) -> bytes:
    """创建 Abort 消息"""
    msg = struct.pack('>I', cs_id)
    header = encode_basic_header(0, 2)
    header += struct.pack('>I', 0)[1:]  # timestamp
    header += struct.pack('>I', 4)[1:]  # msg length
    header += struct.pack('B', 2)       # msg type = 2 (Abort)
    header += struct.pack('<I', 0)      # msg stream
    return header + msg

3.9 Acknowledgement 消息

用于确认接收到的数据量,实现流量控制。

消息结构:
┌──────────────────────────────────────┐
│  sequence number (4 bytes, big-endian) │
└──────────────────────────────────────┘

消息类型 ID = 3
块流 ID = 2

sequence number: 到目前为止接收到的总字节数(取模 2^32)

确认机制

     发送方                                    接收方
       │                                         │
       │──── 发送数据 (累计 4096 bytes) ────────→│
       │                                         │
       │←─── Acknowledgement (seq=4096) ────────│
       │                                         │
       │──── 发送数据 (累计 8192 bytes) ────────→│
       │                                         │
       │←─── Acknowledgement (seq=8192) ────────│
       │                                         │

3.10 块流重组完整实现

#!/usr/bin/env python3
"""
RTMP Chunk Stream Reassembler
重组块流中的消息
"""

import struct
from dataclasses import dataclass, field
from typing import Optional


@dataclass
class ChunkState:
    """单个块流的状态"""
    cs_id: int
    # fmt=0 的完整信息
    timestamp: int = 0
    timestamp_delta: int = 0
    msg_length: int = 0
    msg_type_id: int = 0
    msg_stream_id: int = 0
    # 当前消息缓冲
    msg_buffer: bytes = b''
    has_extended_ts: bool = False
    prev_fmt: int = 3


class ChunkStreamReassembler:
    """RTMP 块流重组器"""

    DEFAULT_CHUNK_SIZE = 128

    def __init__(self):
        self.chunk_size = self.DEFAULT_CHUNK_SIZE
        self.streams: dict[int, ChunkState] = {}

    def set_chunk_size(self, size: int):
        """设置接收块大小"""
        self.chunk_size = size

    def get_state(self, cs_id: int) -> ChunkState:
        """获取或创建块流状态"""
        if cs_id not in self.streams:
            self.streams[cs_id] = ChunkState(cs_id=cs_id)
        return self.streams[cs_id]

    def parse_chunk(self, data: bytes, offset: int = 0) -> tuple:
        """
        解析一个块,返回 (message, new_offset)
        如果消息不完整返回 (None, offset)
        """
        # 解析基本头
        fmt, cs_id, basic_hdr_len = decode_basic_header(data, offset)
        pos = offset + basic_hdr_len
        state = self.get_state(cs_id)

        # 解析消息头
        if fmt == 0:
            ts = struct.unpack('>I', b'\x00' + data[pos:pos+3])[0]
            pos += 3
            msg_len = struct.unpack('>I', b'\x00' + data[pos:pos+3])[0]
            pos += 3
            msg_type = data[pos]; pos += 1
            msg_stream = struct.unpack('<I', data[pos:pos+4])[0]
            pos += 4

            if ts == 0xFFFFFF:
                ts = struct.unpack('>I', data[pos:pos+4])[0]
                pos += 4
                state.has_extended_ts = True

            state.timestamp = ts
            state.msg_length = msg_len
            state.msg_type_id = msg_type
            state.msg_stream_id = msg_stream
            state.msg_buffer = b''
            state.prev_fmt = 0

        elif fmt == 1:
            ts_delta = struct.unpack('>I', b'\x00' + data[pos:pos+3])[0]
            pos += 3
            msg_len = struct.unpack('>I', b'\x00' + data[pos:pos+3])[0]
            pos += 3
            msg_type = data[pos]; pos += 1

            if ts_delta == 0xFFFFFF:
                ts_delta = struct.unpack('>I', data[pos:pos+4])[0]
                pos += 4

            state.timestamp += ts_delta
            state.timestamp_delta = ts_delta
            state.msg_length = msg_len
            state.msg_type_id = msg_type
            state.msg_buffer = b''
            state.prev_fmt = 1

        elif fmt == 2:
            ts_delta = struct.unpack('>I', b'\x00' + data[pos:pos+3])[0]
            pos += 3

            if ts_delta == 0xFFFFFF:
                ts_delta = struct.unpack('>I', data[pos:pos+4])[0]
                pos += 4

            state.timestamp += ts_delta
            state.timestamp_delta = ts_delta
            state.msg_buffer = b''
            state.prev_fmt = 2

        elif fmt == 3:
            if state.has_extended_ts:
                pos += 4
            if state.prev_fmt in (0, 1):
                pass  # 已在之前更新
            elif state.prev_fmt == 2:
                state.timestamp += state.timestamp_delta
            # else: fmt=3 after fmt=3, no change

        # 读取块体
        remaining = min(self.chunk_size, state.msg_length - len(state.msg_buffer))
        if pos + remaining > len(data):
            return None, offset  # 数据不足

        state.msg_buffer += data[pos:pos + remaining]
        pos += remaining

        # 检查消息是否完整
        if len(state.msg_buffer) >= state.msg_length:
            message = {
                'timestamp': state.timestamp,
                'msg_type_id': state.msg_type_id,
                'msg_stream_id': state.msg_stream_id,
                'data': state.msg_buffer[:state.msg_length]
            }
            state.msg_buffer = b''
            return message, pos

        return None, pos  # 消息不完整

注意事项

  1. 默认块大小:握手完成后默认为 128 bytes,双方应在连接建立后立即发送 Set Chunk Size
  2. 时间戳回绕:3 字节时间戳最大 0xFFFFFF(约 4.6 小时),超过后使用扩展时间戳
  3. 消息流 ID 字节序:基本头和大部分字段是大端序,但消息流 ID 是 小端序
  4. 块交错:同一 TCP 连接上多个块流的块可以任意交错,实现时必须维护每个 cs_id 的独立状态
  5. Set Chunk Size 时机:应在 connect 命令之后、音视频数据之前发送

扩展阅读


上一章02 - 握手过程 下一章04 - 消息格式 — 了解 RTMP 消息类型与结构