05 - AMF 编码与命令
AMF 编码与命令
5.1 AMF 概述
AMF(Action Message Format,动作消息格式)是 RTMP 协议中用于序列化数据的二进制编码格式。它最初是 Adobe Flash 平台的数据交换格式,类似于 JSON 或 Protocol Buffers,但专门为实时通信优化。
AMF 的角色
应用层(开发者视角):
connect("rtmp://live.example.com/app", "stream1")
↓ AMF 编码
协议层(网络传输):
[02 00 07] [63 6F 6E 6E 65 63 74] ← "connect" (AMF String)
[02 00 24] [72 74 6D 70 3A 2F ...] ← "rtmp://..." (AMF String)
↓ 块流层拆分
传输层(TCP):
Chunk #1 | Chunk #2 | Chunk #3 | ...
AMF 版本
| 版本 | 引入时间 | 特点 | 消息类型 |
|---|---|---|---|
| AMF0 | Flash MX (2002) | 基础类型,兼容性好 | Type 18/20 |
| AMF3 | Flash 9 (2006) | 支持更多类型,更紧凑 | Type 15/17 |
5.2 AMF0 数据类型
AMF0 使用类型标记(Type Marker)+ 值的格式:
| 标记 | 值 | 类型 | 说明 |
|---|---|---|---|
| 0x00 | 0 | Number | 8 字节 IEEE 754 双精度浮点数 |
| 0x01 | 1 | Boolean | 0x00=false, 0x01=true |
| 0x02 | 2 | String | 2 字节长度 + UTF-8 字符串 |
| 0x03 | 3 | Object | 键值对集合(ECMA Array) |
| 0x04 | 4 | MovieClip | 保留(Flash 特有) |
| 0x05 | 5 | Null | 空值(无附加数据) |
| 0x06 | 6 | Undefined | 未定义(无附加数据) |
| 0x07 | 7 | Reference | 对象引用(2 字节索引) |
| 0x08 | 8 | ECMA Array | 关联数组(4 字节计数 + 键值对) |
| 0x0A | 10 | Strict Array | 索引数组(4 字节计数 + 值) |
| 0x0B | 11 | Date | 日期(8 字节毫秒 + 2 字节时区) |
| 0x0C | 12 | Long String | 长字符串(4 字节长度) |
| 0x0D | 13 | Unsupported | 不支持的类型 |
| 0x0E | 14 | Recordset | 保留 |
| 0x0F | 15 | XML Document | XML 文档 |
| 0x10 | 16 | Typed Object | 自定义类型对象 |
| 0x11 | 17 | AMF3 Switch | 切换到 AMF3 编码 |
5.3 AMF0 编码实现
#!/usr/bin/env python3
"""
AMF0 Encoder/Decoder
完整的 AMF0 编解码实现
"""
import struct
from datetime import datetime, timezone
class AMF0:
"""AMF0 类型标记常量"""
NUMBER = 0x00
BOOLEAN = 0x01
STRING = 0x02
OBJECT = 0x03
NULL = 0x05
UNDEFINED = 0x06
ECMA_ARRAY = 0x08
OBJECT_END = 0x09 # Object 结束标记 (0x00 0x00 0x09)
STRICT_ARRAY = 0x0A
DATE = 0x0B
LONG_STRING = 0x0C
XML_DOCUMENT = 0x0F
TYPED_OBJECT = 0x10
AMF3_SWITCH = 0x11
class AMF0Encoder:
"""AMF0 编码器"""
def encode(self, *values) -> bytes:
"""编码多个 AMF0 值"""
result = b''
for value in values:
result += self._encode_value(value)
return result
def _encode_value(self, value) -> bytes:
"""编码单个值"""
if value is None:
return bytes([AMF0.NULL])
elif isinstance(value, bool):
return self._encode_boolean(value)
elif isinstance(value, (int, float)):
return self._encode_number(float(value))
elif isinstance(value, str):
return self._encode_string(value)
elif isinstance(value, dict):
return self._encode_object(value)
elif isinstance(value, list):
return self._encode_strict_array(value)
elif isinstance(value, datetime):
return self._encode_date(value)
else:
return bytes([AMF0.UNDEFINED])
def _encode_number(self, value: float) -> bytes:
return bytes([AMF0.NUMBER]) + struct.pack('>d', value)
def _encode_boolean(self, value: bool) -> bytes:
return bytes([AMF0.BOOLEAN, 0x01 if value else 0x00])
def _encode_string(self, value: str) -> bytes:
encoded = value.encode('utf-8')
if len(encoded) <= 0xFFFF:
return bytes([AMF0.STRING]) + struct.pack('>H', len(encoded)) + encoded
else:
return bytes([AMF0.LONG_STRING]) + struct.pack('>I', len(encoded)) + encoded
def _encode_object(self, value: dict) -> bytes:
result = bytes([AMF0.OBJECT])
for key, val in value.items():
key_bytes = key.encode('utf-8')
result += struct.pack('>H', len(key_bytes)) + key_bytes
result += self._encode_value(val)
result += b'\x00\x00' + bytes([AMF0.OBJECT_END])
return result
def _encode_strict_array(self, value: list) -> bytes:
result = bytes([AMF0.STRICT_ARRAY]) + struct.pack('>I', len(value))
for item in value:
result += self._encode_value(item)
return result
def _encode_date(self, value: datetime) -> bytes:
timestamp_ms = int(value.timestamp() * 1000)
timezone_offset = 0 # UTC
return (bytes([AMF0.DATE]) +
struct.pack('>d', float(timestamp_ms)) +
struct.pack('>h', timezone_offset))
class AMF0Decoder:
"""AMF0 解码器"""
def __init__(self, data: bytes, offset: int = 0):
self.data = data
self.offset = offset
def decode(self) -> list:
"""解码所有值,返回值列表"""
results = []
while self.offset < len(self.data):
try:
value = self._decode_value()
results.append(value)
except (struct.error, IndexError):
break
return results
def _decode_value(self):
"""解码单个值"""
if self.offset >= len(self.data):
return None
marker = self.data[self.offset]
self.offset += 1
if marker == AMF0.NUMBER:
return self._decode_number()
elif marker == AMF0.BOOLEAN:
return self._decode_boolean()
elif marker == AMF0.STRING:
return self._decode_string()
elif marker == AMF0.OBJECT:
return self._decode_object()
elif marker == AMF0.NULL:
return None
elif marker == AMF0.UNDEFINED:
return None
elif marker == AMF0.ECMA_ARRAY:
return self._decode_ecma_array()
elif marker == AMF0.STRICT_ARRAY:
return self._decode_strict_array()
elif marker == AMF0.DATE:
return self._decode_date()
elif marker == AMF0.LONG_STRING:
return self._decode_long_string()
elif marker == AMF0.AMF3_SWITCH:
return self._decode_value() # 递归解码后续值
else:
return f'[Unknown marker: 0x{marker:02x}]'
def _decode_number(self) -> float:
value = struct.unpack('>d', self.data[self.offset:self.offset+8])[0]
self.offset += 8
return value
def _decode_boolean(self) -> bool:
value = self.data[self.offset] != 0
self.offset += 1
return value
def _decode_string(self) -> str:
length = struct.unpack('>H', self.data[self.offset:self.offset+2])[0]
self.offset += 2
value = self.data[self.offset:self.offset+length].decode('utf-8')
self.offset += length
return value
def _decode_long_string(self) -> str:
length = struct.unpack('>I', self.data[self.offset:self.offset+4])[0]
self.offset += 4
value = self.data[self.offset:self.offset+length].decode('utf-8')
self.offset += length
return value
def _decode_object(self) -> dict:
result = {}
while self.offset < len(self.data):
# 检查结束标记
if (self.data[self.offset] == 0 and
self.data[self.offset+1] == 0 and
self.data[self.offset+2] == AMF0.OBJECT_END):
self.offset += 3
break
key = self._decode_string_key()
value = self._decode_value()
result[key] = value
return result
def _decode_string_key(self) -> str:
length = struct.unpack('>H', self.data[self.offset:self.offset+2])[0]
self.offset += 2
value = self.data[self.offset:self.offset+length].decode('utf-8')
self.offset += length
return value
def _decode_ecma_array(self) -> dict:
count = struct.unpack('>I', self.data[self.offset:self.offset+4])[0]
self.offset += 4
# ECMA Array 使用与 Object 相同的键值对格式
return self._decode_object()
def _decode_strict_array(self) -> list:
count = struct.unpack('>I', self.data[self.offset:self.offset+4])[0]
self.offset += 4
result = []
for _ in range(count):
result.append(self._decode_value())
return result
def _decode_date(self) -> datetime:
timestamp_ms = struct.unpack('>d', self.data[self.offset:self.offset+8])[0]
self.offset += 8
timezone_offset = struct.unpack('>h', self.data[self.offset:self.offset+2])[0]
self.offset += 2
return datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc)
5.4 AMF3 数据类型
AMF3 引入了 引用机制(引用池),可以更高效地编码重复的对象和字符串:
| 标记 | 值 | 类型 | 说明 |
|---|---|---|---|
| 0x00 | 0 | Undefined | — |
| 0x01 | 1 | Null | — |
| 0x02 | 2 | Boolean False | — |
| 0x03 | 3 | Boolean True | — |
| 0x04 | 4 | Integer | 可变长度 29 位有符号整数 |
| 0x05 | 5 | Double | 8 字节浮点数 |
| 0x06 | 6 | String | 引用或 UTF-8 字符串 |
| 0x07 | 7 | XML Document | — |
| 0x08 | 8 | Date | — |
| 0x09 | 9 | Array | — |
| 0x0A | 10 | Object | — |
| 0x0B | 11 | XML | — |
| 0x0C | 12 | ByteArray | — |
AMF3 Integer 编码
AMF3 使用 可变长度编码(Variable-Length Encoding)表示整数:
规则:
- 有效位为 29 位(0 ~ 2^28-1)
- 每字节的最高位为"继续位"
- 大端序
示例:
值 0x03:
0x03 → 0x03
值 0x80 (128):
0x81 0x00 → 1 0000001 0000000
= 0b10000000 = 128
值 0x20000 (131072):
0x84 0x80 0x00 → 0b100 0000000 0000000
= 131072
def amf3_read_integer(data: bytes, offset: int) -> tuple:
"""读取 AMF3 可变长度整数,返回 (value, new_offset)"""
result = 0
b = data[offset]
offset += 1
if b < 0x80:
return b, offset
result = (b & 0x7F) << 7
b = data[offset]; offset += 1
if b < 0x80:
return result | b, offset
result = (result | (b & 0x7F)) << 7
b = data[offset]; offset += 1
if b < 0x80:
return result | b, offset
result = (result | (b & 0x7F)) << 8
b = data[offset]; offset += 1
return result | b, offset
def amf3_write_integer(value: int) -> bytes:
"""写入 AMF3 可变长度整数"""
if value < 0x80:
return bytes([value])
elif value < 0x4000:
return bytes([0x80 | (value >> 7), value & 0x7F])
elif value < 0x200000:
return bytes([0x80 | (value >> 14), 0x80 | ((value >> 7) & 0x7F), value & 0x7F])
else:
return bytes([0x80 | (value >> 22), 0x80 | ((value >> 15) & 0x7F),
0x80 | ((value >> 8) & 0x7F), value & 0xFF])
5.5 RTMP 命令消息
命令消息使用 AMF 编码,通过消息类型 20(AMF0 Command)或 17(AMF3 Command)传输。
命令消息结构
AMF0 Command (Type 20) Body 结构:
┌──────────────────────────────────────────────────┐
│ Command Name (AMF0 String) ← "connect" │
│ Transaction ID (AMF0 Number) ← 1.0 │
│ Command Object (AMF0 Object) ← {app, tcUrl} │
│ Optional Args (AMF0 Value...) ← ... │
└──────────────────────────────────────────────────┘
5.5.1 connect 命令
客户端发送 connect 命令建立与应用程序的连接:
def create_connect_command(
app: str,
tc_url: str,
flash_ver: str = "FMLE/3.0",
fpad: bool = False,
capabilities: int = 239,
audio_codecs: int = 3575,
video_codecs: int = 252,
video_function: int = 1,
object_encoding: int = 0,
transaction_id: float = 1.0
) -> list:
"""构造 connect 命令的 AMF 值列表"""
return [
"connect", # Command Name
transaction_id, # Transaction ID
{ # Command Object
"app": app,
"flashVer": flash_ver,
"fpad": fpad,
"capabilities": capabilities,
"audioCodecs": audio_codecs,
"videoCodecs": video_codecs,
"videoFunction": video_function,
"objectEncoding": object_encoding,
"tcUrl": tc_url,
}
]
connect 命令的 Command Object 字段:
| 字段 | 类型 | 说明 |
|---|---|---|
| app | String | 应用名称,如 “live” |
| flashVer | String | Flash Player 版本 |
| fpad | Boolean | 是否支持代理 |
| capabilities | Number | 客户端能力标志 |
| audioCodecs | Number | 支持的音频编码 |
| videoCodecs | Number | 支持的视频编码 |
| videoFunction | Number | 视频功能标志 |
| objectEncoding | Number | AMF 版本(0=AMF0, 3=AMF3) |
| tcUrl | String | RTMP 完整 URL |
5.5.2 _result / _error 响应
服务端响应 connect 命令:
def create_connect_result(transaction_id: float) -> list:
"""构造 connect 成功响应"""
return [
"_result", # Command Name
transaction_id, # Transaction ID (对应 connect 的 ID)
{ # Properties
"fmsVer": "FMS/3,0,1,123",
"capabilities": 31,
},
{ # Information
"level": "status",
"code": "NetConnection.Connect.Success",
"description": "Connection succeeded.",
"objectEncoding": 0,
}
]
def create_connect_error(transaction_id: float, code: str, desc: str) -> list:
"""构造 connect 错误响应"""
return [
"_error",
transaction_id,
None,
{
"level": "error",
"code": code,
"description": desc,
}
]
5.5.3 createStream 命令
创建逻辑流:
def create_create_stream(transaction_id: float) -> list:
"""构造 createStream 命令"""
return [
"createStream",
transaction_id,
None, # 无 Command Object
]
def create_create_stream_result(transaction_id: float, stream_id: float = 1.0) -> list:
"""构造 createStream 响应"""
return [
"_result",
transaction_id,
None,
stream_id, # 新建的流 ID
]
5.5.4 play 命令
请求播放流:
def create_play(
stream_name: str,
stream_id: int = 1,
start: float = -2,
duration: float = -1,
reset: bool = True
) -> list:
"""构造 play 命令"""
return [
"play",
0, # Transaction ID (通常为 0)
None,
stream_name, # 流名称
start, # 开始时间 (-2=直播, -1=从头)
duration, # 持续时间 (-1=到结束)
reset, # 是否重置
]
5.5.5 publish 命令
开始推流:
def create_publish(
stream_name: str,
publish_type: str = "live"
) -> list:
"""构造 publish 命令"""
return [
"publish",
0,
None,
stream_name, # 流名称
publish_type, # "live", "record", "append"
]
5.6 远程过程调用(RPC)
RTMP 支持两种 RPC 机制:
5.6.1 NetConnection.call
客户端可以直接调用服务端的方法:
Client → Server (AMF Command, Type 20):
┌────────────────────────────────────────────────┐
│ "call" ← 命令名 │
│ transaction_id ← 事务 ID │
│ null ← 无对象 │
│ "getServerTime" ← 方法名 │
│ arg1, arg2, ... ← 参数 │
└────────────────────────────────────────────────┘
5.6.2 RPC 调用
def create_rpc_call(method: str, transaction_id: float, *args) -> list:
"""构造 RPC 调用命令"""
return ["call", transaction_id, None, method] + list(args)
# 示例:调用服务端的 getServerTime
encoder = AMF0Encoder()
rpc_msg = create_rpc_call("getServerTime", 3.0)
encoded = encoder.encode(*rpc_msg)
5.7 onStatus 事件通知
服务端向客户端发送状态事件:
def create_on_status(
level: str,
code: str,
description: str,
info: dict = None
) -> list:
"""构造 onStatus 事件"""
return [
"onStatus",
0,
None,
{
"level": level,
"code": code,
"description": description,
**(info or {}),
}
]
# 常见事件代码:
# NetStream.Play.Start - 开始播放
# NetStream.Play.Stop - 停止播放
# NetStream.Play.Reset - 播放重置
# NetStream.Publish.Start - 开始推流
# NetStream.Unpublish.Success - 停止推流
# NetStream.Play.StreamNotFound - 流未找到
5.8 完整命令流程
推流(Publish)完整流程
Client Server
│ │
│════ 握手完成 ════════════════════════════│
│ │
│─── Set Chunk Size (4096) ───────────────→│
│─── connect("live") ────────────────────→│ Type 20
│ │
│←── Window Ack Size ─────────────────────│ Type 5
│←── Set Peer Bandwidth ──────────────────│ Type 6
│←── User Control (Stream Begin, 0) ──────│ Type 4
│←── _result (NetConnection.Connect.Success)│ Type 20
│ │
│─── createStream() ─────────────────────→│ Type 20
│←── _result (stream_id=1) ───────────────│ Type 20
│ │
│─── @setDataFrame (onMetaData) ─────────→│ Type 18
│─── publish("mystream", "live") ────────→│ Type 20
│ │
│←── onStatus (Publish.Start) ────────────│ Type 20
│ │
│─── AAC Sequence Header ────────────────→│ Type 8
│─── AVC Sequence Header ────────────────→│ Type 9
│ │
│─── Audio Data ─────────────────────────→│ Type 8
│─── Video Data ─────────────────────────→│ Type 9
│─── Audio Data ─────────────────────────→│ Type 8
│─── Video Data ─────────────────────────→│ Type 9
│ ... 持续发送 ... │
播放(Play)完整流程
Client Server
│ │
│─── connect("live") ────────────────────→│
│←── _result ─────────────────────────────│
│ │
│─── createStream() ─────────────────────→│
│←── _result (stream_id=1) ───────────────│
│ │
│─── play("mystream") ───────────────────→│
│ │
│←── User Control (Stream Begin, 1) ──────│
│←── onStatus (Play.Reset) ───────────────│
│←── onStatus (Play.Start) ───────────────│
│←── RtmpSampleAccess (false, false) ────│ Type 18
│←── onMetaData ─────────────────────────│ Type 18
│ │
│←── AAC Sequence Header ─────────────────│ Type 8
│←── AVC Sequence Header ─────────────────│ Type 9
│ │
│←── Audio Data ──────────────────────────│ Type 8
│←── Video Data ──────────────────────────│ Type 9
│ ... 持续接收 ... │
5.9 AMF 编解码测试
def test_amf0_codec():
"""AMF0 编解码测试"""
encoder = AMF0Encoder()
decoder = AMF0Decoder
# 测试 connect 命令
values = create_connect_command(
app="live",
tc_url="rtmp://localhost:1935/live"
)
encoded = encoder.encode(*values)
print(f"Encoded {len(values)} values: {len(encoded)} bytes")
print(f"Hex: {encoded.hex()}")
# 解码验证
d = decoder(encoded)
decoded = d.decode()
print(f"Decoded: {decoded}")
# 验证
assert decoded[0] == "connect"
assert decoded[1] == 1.0
assert decoded[2]["app"] == "live"
print("✅ AMF0 codec test passed!")
if __name__ == '__main__':
test_amf0_codec()
注意事项
- AMF0 vs AMF3 选择:大多数 RTMP 实现默认使用 AMF0,AMF3 在 Flash Remoting 中更常见
- Transaction ID 递增:每个命令的 Transaction ID 应递增,用于匹配请求和响应
- Object Encoding:connect 命令中的 objectEncoding 字段告诉服务端客户端使用的 AMF 版本
- 字符串编码:AMF0 String 使用 UTF-8,注意不要超过 65535 字节(Long String 例外)
- 数字精度:AMF0 Number 是 64 位浮点数,整数不要超过 2^53 以避免精度丢失
- onMetaData:此消息中的 metadata 对象不固定,不同编码器/版本可能包含不同字段