强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

06 - 流操作

流操作(Stream Operations)

6.1 流的概念模型

RTMP 中的 流(Stream) 是一个逻辑通道,用于承载音视频数据和相关的控制信息。一个 RTMP 连接(NetConnection)可以包含多个流。

NetConnection (一个 TCP 连接)
├── Stream 0 (默认控制流,用于 NetConnection 级别的命令)
├── Stream 1 (流 "camera_feed")
│   ├── 视频数据 (Type 9)
│   ├── 音频数据 (Type 8)
│   └── 元数据 (Type 18)
├── Stream 2 (流 "screen_share")
│   ├── 视频数据 (Type 9)
│   └── 音频数据 (Type 8)
└── Stream 3 (流 "slides")
    └── 视频数据 (Type 9)

流的生命周期

                    createStream()
    ┌──────────┐ ────────────────→ ┌──────────┐
    │ Uninit   │                   │ Created  │
    │          │                   │          │
    └──────────┘                   └──────────┘
                                      │    │
                              play()  │    │ publish()
                                      ▼    ▼
                               ┌──────────┐  ┌──────────┐
                               │ Playing  │  │Publishing│
                               │          │  │          │
                               └──────────┘  └──────────┘
                                      │    │
                              deleteStream()
                                      ▼
                               ┌──────────┐
                               │ Closed   │
                               │          │
                               └──────────┘

6.2 createStream

createStream 创建一个逻辑流,是所有流操作的前提。

请求

def create_create_stream_command(transaction_id: float, stream_name: str = None) -> list:
    """
    createStream 命令
    
    注意:createStream 只有一个 Transaction ID 和可选的 Command Object,
    没有 stream name 参数。stream name 在 play/publish 中指定。
    """
    return [
        "createStream",   # Command Name
        transaction_id,   # Transaction ID (1.0, 2.0, ...)
        None,             # Command Object (null)
    ]

响应

def create_create_stream_response(
    transaction_id: float,
    stream_id: float = 1.0
) -> list:
    """createStream 成功响应"""
    return [
        "_result",
        transaction_id,
        None,
        stream_id,  # 分配的流 ID
    ]

完整交互

Client                                    Server
  │                                         │
  │── createStream (txid=4) ──────────────→│  AMF Command
  │                                         │
  │←── _result (txid=4, stream_id=1) ─────│  AMF Response
  │                                         │
  │   现在可以使用 stream_id=1 进行         │
  │   play/publish 操作                     │

6.3 publish(推流)

publish 命令开始向服务端发送音视频数据。

命令格式

class PublishType:
    LIVE = "live"       # 实时直播,不录制
    RECORD = "record"   # 录制到文件
    APPEND = "append"   # 追加到已有文件


def create_publish_command(
    stream_name: str,
    publish_type: str = PublishType.LIVE,
    transaction_id: float = 0
) -> list:
    """
    publish 命令
    
    参数:
        stream_name: 流名称 (如 "camera_feed")
        publish_type: "live", "record", "append"
        transaction_id: 通常为 0
    """
    return [
        "publish",
        transaction_id,
        None,
        stream_name,
        publish_type,
    ]

服务端响应

def create_publish_start(stream_id: int) -> list:
    """onStatus: Publish.Start"""
    return [
        "onStatus",
        0,
        None,
        {
            "level": "status",
            "code": "NetStream.Publish.Start",
            "description": "Publishing stream.",
        }
    ]


def create_publish_bad_name(stream_id: int) -> list:
    """onStatus: Publish.BadName"""
    return [
        "onStatus",
        0,
        None,
        {
            "level": "error",
            "code": "NetStream.Publish.BadName",
            "description": "Stream name already in use.",
        }
    ]

推流完整时序

Client                                    Server
  │                                         │
  │════ connect 成功 ═══════════════════════│
  │                                         │
  │── createStream (txid=4) ──────────────→│
  │←── _result (stream_id=1) ─────────────│
  │                                         │
  │── publish("mystream", "live") ────────→│
  │                                         │
  │←── onStatus (Publish.Start) ──────────│
  │                                         │
  │── @setDataFrame (onMetaData) ─────────→│  含比特率、分辨率等
  │                                         │
  │── AAC Sequence Header ────────────────→│  音频解码器配置
  │── AVC Sequence Header ────────────────→│  视频解码器配置
  │                                         │
  │── Video (Keyframe) ───────────────────→│  第一个关键帧
  │── Audio ──────────────────────────────→│
  │── Video (Inter) ──────────────────────→│
  │── Audio ──────────────────────────────→│
  │── Video (Inter) ──────────────────────→│
  │── Audio ──────────────────────────────→│
  │       ... 持续发送 ...                  │

onMetaData 消息

推流前通常先发送一个 onMetaData 数据消息,包含流的媒体信息:

def create_on_meta_data(
    width: int = 1920,
    height: int = 1080,
    video_bitrate: int = 2000,
    audio_bitrate: int = 128,
    fps: float = 30.0,
    audio_sample_rate: int = 44100,
    audio_channels: int = 2,
    video_codec: str = "avc1",
    audio_codec: str = "mp4a",
    duration: float = 0,
) -> list:
    """
    创建 onMetaData 数据消息
    注意:这是 Type 18 (AMF0 Data),不是 Type 20 (Command)
    """
    return [
        "@setDataFrame",    # AMF0 Data 标识
        "onMetaData",       # 数据类型
        {
            "width": width,
            "height": height,
            "videodatarate": video_bitrate,
            "framerate": fps,
            "videocodecid": 7,  # AVC
            "audiodatarate": audio_bitrate,
            "audiosamplerate": audio_sample_rate,
            "audiosamplesize": 16,
            "stereo": audio_channels == 2,
            "audiocodecid": 10,  # AAC
            "duration": duration,
            "encoder": "obs-rtmp/1.0",
        }
    ]

6.4 play(播放)

play 命令请求服务端发送指定流的音视频数据。

命令格式

def create_play_command(
    stream_name: str,
    start: float = -2,
    duration: float = -1,
    reset: bool = True,
    transaction_id: float = 0
) -> list:
    """
    play 命令
    
    参数:
        stream_name: 流名称 (如 "mystream")
        start: 开始时间
            -2: 查找直播流,没有则等待
            -1: 从头播放
            >=0: 从指定秒数开始
        duration: 播放时长(秒)
            -1: 播放到结束
            -2: 播放到文件末尾
            >0: 指定时长
        reset: 是否重置流
            true: 清除之前的 play 命令
            false: 不清除
    """
    return [
        "play",
        transaction_id,
        None,
        stream_name,
        start,
        duration,
        reset,
    ]

服务端响应

def create_play_reset() -> list:
    """onStatus: Play.Reset"""
    return [
        "onStatus", 0, None,
        {"level": "status", "code": "NetStream.Play.Reset",
         "description": "Resetting stream."}
    ]


def create_play_start() -> list:
    """onStatus: Play.Start"""
    return [
        "onStatus", 0, None,
        {"level": "status", "code": "NetStream.Play.Start",
         "description": "Started playing."}
    ]


def create_play_stream_not_found() -> list:
    """onStatus: Play.StreamNotFound"""
    return [
        "onStatus", 0, None,
        {"level": "error", "code": "NetStream.Play.StreamNotFound",
         "description": "Stream not found."}
    ]


def create_play_stop() -> list:
    """onStatus: Play.Stop"""
    return [
        "onStatus", 0, None,
        {"level": "status", "code": "NetStream.Play.Stop",
         "description": "Stopped playing."}
    ]

播放完整时序

Client                                    Server
  │                                         │
  │── createStream (txid=4) ──────────────→│
  │←── _result (stream_id=1) ─────────────│
  │                                         │
  │── play("mystream") ───────────────────→│  stream_id=1
  │                                         │
  │←── Set Chunk Size ────────────────────│
  │←── User Control (Stream Begin, 1) ────│
  │←── onStatus (Play.Reset) ─────────────│
  │←── onStatus (Play.Start) ─────────────│
  │                                         │
  │←── RtmpSampleAccess ─────────────────│  Type 18
  │←── onMetaData ────────────────────────│  Type 18
  │                                         │
  │←── AAC Sequence Header ───────────────│  Type 8
  │←── AVC Sequence Header ───────────────│  Type 9
  │                                         │
  │←── Video (Keyframe) ──────────────────│
  │←── Audio ─────────────────────────────│
  │←── Video (Inter) ─────────────────────│
  │←── Audio ─────────────────────────────│
  │       ... 持续接收 ...                  │

6.5 pause / pauseResponse

暂停/恢复流播放。

命令格式

def create_pause_command(pause: bool, position: float) -> list:
    """
    pause 命令
    
    参数:
        pause: true=暂停, false=恢复
        position: 暂停时的时间戳(毫秒)
    """
    return [
        "pause",
        0,      # Transaction ID
        None,   # Command Object
        pause,
        position,
    ]


def create_pause_response() -> list:
    """onStatus: Pause.Notify / Unpause.Notify"""
    return [
        "onStatus", 0, None,
        {"level": "status", "code": "NetStream.Pause.Notify",
         "description": "Stream paused."}
    ]


def create_unpause_response() -> list:
    return [
        "onStatus", 0, None,
        {"level": "status", "code": "NetStream.Unpause.Notify",
         "description": "Stream unpaused."}
    ]

暂停/恢复流程

Client                                    Server
  │                                         │
  │   正在接收音视频数据                      │
  │←── Video ─────────────────────────────│
  │←── Audio ─────────────────────────────│
  │                                         │
  │── pause(true, 5000) ─────────────────→│  暂停在 5 秒处
  │                                         │
  │←── onStatus (Pause.Notify) ───────────│
  │   服务端停止发送音视频数据                │
  │                                         │
  │   ... 用户点击继续 ...                    │
  │                                         │
  │── pause(false, 5000) ────────────────→│  从 5 秒处恢复
  │                                         │
  │←── onStatus (Unpause.Notify) ─────────│
  │←── Video ─────────────────────────────│
  │←── Audio ─────────────────────────────│

6.6 seek

跳转到流中的指定位置。

命令格式

def create_seek_command(position_ms: float) -> list:
    """
    seek 命令
    
    参数:
        position_ms: 目标位置(毫秒)
    """
    return [
        "seek",
        0,
        None,
        position_ms,
    ]


def create_seek_notify(position_ms: float) -> list:
    """onStatus: Seek.Notify"""
    return [
        "onStatus", 0, None,
        {
            "level": "status",
            "code": "NetStream.Seek.Notify",
            "description": f"Seeking to {position_ms}ms.",
        }
    ]

Seek 流程

Client                                    Server
  │                                         │
  │   正在播放 (时间戳 5000ms)               │
  │                                         │
  │── seek(10000) ────────────────────────→│  跳到 10 秒
  │                                         │
  │←── onStatus (Seek.Notify) ────────────│
  │                                         │
  │←── Video (Keyframe @10s) ─────────────│  从最近的关键帧开始
  │←── Audio ─────────────────────────────│
  │←── Video (Inter) ─────────────────────│
  │       ... 从 10 秒处继续播放 ...         │

6.7 closeStream / deleteStream

关闭流和删除流。

def create_close_stream() -> list:
    """closeStream 命令(客户端发起)"""
    return [
        "closeStream",
        0,
        None,
    ]


def create_delete_stream(stream_id: float) -> list:
    """deleteStream 命令(客户端发起)"""
    return [
        "deleteStream",
        0,
        None,
        stream_id,
    ]

流关闭流程

Client                                    Server
  │                                         │
  │── closeStream ────────────────────────→│  通知服务端停止
  │                                         │
  │── deleteStream (stream_id=1) ─────────→│  删除流
  │                                         │
  │   服务端释放流资源                        │

6.8 receiveAudio / receiveVideo

控制是否接收音视频数据(用于选择性接收):

def create_receive_audio(flag: bool) -> list:
    """
    receiveAudio 命令
    flag=false: 停止接收音频(节省带宽)
    flag=true: 恢复接收音频
    """
    return [
        "receiveAudio",
        0,
        None,
        flag,
    ]


def create_receive_video(flag: bool) -> list:
    """receiveVideo 命令"""
    return [
        "receiveVideo",
        0,
        None,
        flag,
    ]

使用场景

  • 画中画模式:小窗只接收视频,不接收音频
  • 纯语音模式:只接收音频,不接收视频以节省带宽
  • 低带宽客户端:动态切换音视频接收

6.9 完整的流操作客户端实现

#!/usr/bin/env python3
"""
RTMP Stream Operations Demo
演示 RTMP 流操作的完整流程
"""

import socket
import struct


class RTMPStreamClient:
    """RTMP 流操作客户端(演示框架)"""

    def __init__(self, host: str, port: int = 1935):
        self.host = host
        self.port = port
        self.sock = None
        self.stream_id = 0
        self.transaction_id = 0
        self.chunk_size = 128

    def connect(self, app: str, tc_url: str):
        """建立连接并执行握手"""
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.connect((self.host, self.port))
        self._handshake()
        self._send_connect(app, tc_url)
        self._wait_for_result()

    def create_stream(self) -> int:
        """创建流"""
        self.transaction_id += 1
        msg = self._encode_amf_command(
            "createStream", self.transaction_id, None
        )
        self._send_amf_message(20, 3, msg)
        result = self._wait_for_result()
        self.stream_id = int(result)
        return self.stream_id

    def publish(self, stream_name: str, publish_type: str = "live"):
        """开始推流"""
        msg = self._encode_amf_command(
            "publish", 0, None, stream_name, publish_type
        )
        self._send_amf_message(20, self.stream_id + 6, msg)

    def play(self, stream_name: str, start: float = -2, duration: float = -1):
        """开始播放"""
        msg = self._encode_amf_command(
            "play", 0, None, stream_name, start, duration, True
        )
        self._send_amf_message(20, self.stream_id + 6, msg)

    def pause(self, pause: bool, position_ms: float):
        """暂停/恢复"""
        msg = self._encode_amf_command(
            "pause", 0, None, pause, position_ms
        )
        self._send_amf_message(20, self.stream_id + 6, msg)

    def seek(self, position_ms: float):
        """跳转"""
        msg = self._encode_amf_command(
            "seek", 0, None, position_ms
        )
        self._send_amf_message(20, self.stream_id + 6, msg)

    def send_video(self, data: bytes, timestamp: int, is_keyframe: bool = False):
        """发送视频数据"""
        frame_type = 0x10 if is_keyframe else 0x20  # Keyframe / Inter
        video_body = bytes([frame_type | 0x07]) + data  # Codec ID = 7 (AVC)
        self._send_media_message(9, self.stream_id + 5, video_body, timestamp)

    def send_audio(self, data: bytes, timestamp: int):
        """发送音频数据"""
        audio_body = bytes([0xAF, 0x01]) + data  # AAC, Raw
        self._send_media_message(8, self.stream_id + 4, audio_body, timestamp)

    def close_stream(self):
        """关闭流"""
        msg = self._encode_amf_command("closeStream", 0, None)
        self._send_amf_message(20, self.stream_id + 6, msg)

        msg = self._encode_amf_command(
            "deleteStream", 0, None, float(self.stream_id)
        )
        self._send_amf_message(20, 3, msg)

    # === 内部方法 ===

    def _handshake(self):
        """执行 RTMP 握手"""
        import os, time
        c0 = struct.pack('B', 3)
        ts = int(time.time()) & 0xFFFFFFFF
        c1 = struct.pack('>II', ts, 0) + os.urandom(1528)
        self.sock.sendall(c0 + c1)

        s0s1s2 = self._recv_exact(3073)
        s1 = s0s1s2[1:1537]

        c2 = s1[:4] + struct.pack('>I', int(time.time()) & 0xFFFFFFFF) + s1[8:]
        self.sock.sendall(c2)

    def _recv_exact(self, size: int) -> bytes:
        """精确接收指定字节数"""
        data = b''
        while len(data) < size:
            chunk = self.sock.recv(size - len(data))
            if not chunk:
                raise ConnectionError("Connection closed")
            data += chunk
        return data

    def _send_amf_message(self, msg_type: int, cs_id: int, data: bytes):
        """发送 AMF 消息"""
        header = self._encode_chunk_header(0, cs_id, 0, len(data), msg_type, 0)
        # 简化:一次性发送(实际应按 chunk_size 拆分)
        self.sock.sendall(header + data)

    def _send_media_message(self, msg_type: int, cs_id: int, data: bytes, timestamp: int):
        """发送媒体消息"""
        header = self._encode_chunk_header(0, cs_id, timestamp, len(data), msg_type, self.stream_id)
        # 拆分为多个 chunk
        offset = 0
        chunks = header + data[:self.chunk_size]
        offset = self.chunk_size
        while offset < len(data):
            chunk_hdr = self._encode_chunk_header(3, cs_id, 0, 0, 0, 0)
            chunk_body = data[offset:offset + self.chunk_size]
            chunks += chunk_hdr + chunk_body
            offset += self.chunk_size
        self.sock.sendall(chunks)

    def _encode_chunk_header(self, fmt, cs_id, ts, msg_len, msg_type, msg_stream):
        """编码块头"""
        hdr = bytes([(fmt << 6) | cs_id]) if cs_id >= 2 and cs_id <= 63 else bytes([(fmt << 6)])
        if fmt == 0:
            hdr += struct.pack('>I', ts)[1:]
            hdr += struct.pack('>I', msg_len)[1:]
            hdr += struct.pack('B', msg_type)
            hdr += struct.pack('<I', msg_stream)
        return hdr

    def _encode_amf_command(self, *values):
        """简单 AMF0 编码(演示用)"""
        result = b''
        for v in values:
            if v is None:
                result += bytes([0x05])
            elif isinstance(v, str):
                encoded = v.encode('utf-8')
                result += bytes([0x02]) + struct.pack('>H', len(encoded)) + encoded
            elif isinstance(v, float):
                result += bytes([0x00]) + struct.pack('>d', v)
            elif isinstance(v, bool):
                result += bytes([0x01, 0x01 if v else 0x00])
        return result

    def _wait_for_result(self):
        """等待 _result 响应(简化版)"""
        # 实际实现需要解析接收到的消息
        data = self.sock.recv(4096)
        return 1.0  # 简化返回

注意事项

  1. Stream ID 分配:createStream 返回的 stream_id 用于后续 play/publish 的消息流 ID
  2. 块流 ID 映射:消息流 ID 和块流 ID 是不同的概念,通常音频使用 cs_id=8,视频使用 cs_id=6
  3. Sequence Header 顺序:publish 时必须先发 AAC/AVC Sequence Header,再发音视频数据
  4. 直播 vs 点播:play 的 start 参数为 -2 时表示直播模式,-1 表示从头播放点播
  5. deleteStream 时机:客户端停止播放/推流后应发送 deleteStream,否则服务端可能不释放资源

扩展阅读


上一章05 - AMF 编码与命令 下一章07 - 视频编解码 — 了解 RTMP 中的视频编码与 FLV 封装