08 - 流式处理 / Streaming
流式处理 / Streaming
本章讲解如何使用 MessagePack 进行流式消息处理,包括无边界消息拆分、多消息编解码、网络协议设计和 RPC 框架集成。
This chapter covers streaming message processing with MessagePack, including boundary-free message splitting, multi-message encoding/decoding, network protocol design, and RPC framework integration.
📖 为什么需要流式处理? / Why Streaming?
MessagePack 本身不包含**消息边界(Message Boundary)**信息。当多条消息通过同一连接传输时,接收端无法自动区分消息的起止位置。
MessagePack itself does not contain message boundary information. When multiple messages are transmitted over the same connection, the receiver cannot automatically determine where each message starts and ends.
┌────────────────────────────────────────────────────┐
│ TCP 流 / TCP Stream │
│ │
│ [msg1 bytes][msg2 bytes][msg3 bytes]... │
│ ↑ ↑ ↑ │
│ 如何区分? 如何区分? 如何区分? │
└────────────────────────────────────────────────────┘
常见解决方案
| 方案 | 原理 | 适用场景 |
|---|---|---|
| 长度前缀 | 每条消息前加 4 字节长度 | 最常用,TCP/文件 |
| 分隔符 | 特殊字节序列分割消息 | 文本协议(不推荐二进制) |
| 自定界格式 | 消息本身可判断结束 | 小消息场景 |
| HTTP chunked | 利用 HTTP 分块传输 | Web API |
| WebSocket frame | WebSocket 帧自带边界 | 实时通信 |
📖 长度前缀协议 / Length-Prefixed Protocol
协议设计
最常用的方案是 Length-Prefixed Framing:
┌──────────────────┬──────────────────────┐
│ 长度头 (4 字节) │ 消息体 (N 字节) │
│ Big-Endian uint32 │ MessagePack 数据 │
└──────────────────┴──────────────────────┘
多条消息:
[4B len][msg1 bytes][4B len][msg2 bytes][4B len][msg3 bytes]
Python 实现
import msgpack
import struct
import socket
import io
class LengthPrefixedProtocol:
"""长度前缀协议编解码器"""
HEADER_SIZE = 4 # 4 字节长度头
@staticmethod
def pack(data: dict) -> bytes:
"""打包一条消息:4字节长度头 + msgpack 体"""
body = msgpack.packb(data, use_bin_type=True)
header = struct.pack(">I", len(body))
return header + body
@staticmethod
def pack_multi(messages: list) -> bytes:
"""打包多条消息"""
buf = io.BytesIO()
for msg in messages:
body = msgpack.packb(msg, use_bin_type=True)
buf.write(struct.pack(">I", len(body)))
buf.write(body)
return buf.getvalue()
@staticmethod
def unpack_stream(stream) -> list:
"""从流中解包所有消息"""
results = []
while True:
header = stream.read(LengthPrefixedProtocol.HEADER_SIZE)
if len(header) < LengthPrefixedProtocol.HEADER_SIZE:
break
length = struct.unpack(">I", header)[0]
body = stream.read(length)
if len(body) < length:
break
results.append(msgpack.unpackb(body, raw=False))
return results
# 使用示例
protocol = LengthPrefixedProtocol()
# 打包
data = protocol.pack({"type": "greeting", "msg": "hello"})
print(f"打包大小: {len(data)} bytes") # 4 + N
# 流式解包
stream = io.BytesIO(data)
messages = protocol.unpack_stream(stream)
print(f"解包消息: {messages}")
Python 异步流式处理
import asyncio
import msgpack
import struct
class AsyncMsgPackStream:
"""异步 MessagePack 流处理器"""
def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
self.reader = reader
self.writer = writer
async def send(self, data: dict):
"""发送一条消息"""
body = msgpack.packb(data, use_bin_type=True)
header = struct.pack(">I", len(body))
self.writer.write(header + body)
await self.writer.drain()
async def receive(self) -> dict:
"""接收一条消息"""
header = await self.reader.readexactly(4)
length = struct.unpack(">I", header)[0]
body = await self.reader.readexactly(length)
return msgpack.unpackb(body, raw=False)
async def __aiter__(self):
"""异步迭代接收消息"""
try:
while True:
yield await self.receive()
except asyncio.IncompleteReadError:
pass
# 服务端
async def handle_client(reader, writer):
stream = AsyncMsgPackStream(reader, writer)
async for msg in stream:
print(f"收到: {msg}")
await stream.send({"type": "ack", "id": msg.get("id")})
async def server():
server = await asyncio.start_server(handle_client, "127.0.0.1", 8080)
async with server:
await server.serve_forever()
# 客户端
async def client():
reader, writer = await asyncio.open_connection("127.0.0.1", 8080)
stream = AsyncMsgPackStream(reader, writer)
await stream.send({"type": "ping", "id": 1})
response = await stream.receive()
print(f"响应: {response}")
writer.close()
await writer.wait_closed()
Go 实现
package main
import (
"bufio"
"encoding/binary"
"fmt"
"io"
"net"
"github.com/vmihailenco/msgpack/v5"
)
// LengthPrefixedCodec 长度前缀编解码器
type LengthPrefixedCodec struct {
conn net.Conn
reader *bufio.Reader
writer *bufio.Writer
}
func NewLengthPrefixedCodec(conn net.Conn) *LengthPrefixedCodec {
return &LengthPrefixedCodec{
conn: conn,
reader: bufio.NewReader(conn),
writer: bufio.NewWriter(conn),
}
}
// Send 发送一条消息
func (c *LengthPrefixedCodec) Send(v interface{}) error {
body, err := msgpack.Marshal(v)
if err != nil {
return err
}
// 写入 4 字节长度头
header := make([]byte, 4)
binary.BigEndian.PutUint32(header, uint32(len(body)))
if _, err := c.writer.Write(header); err != nil {
return err
}
if _, err := c.writer.Write(body); err != nil {
return err
}
return c.writer.Flush()
}
// Receive 接收一条消息
func (c *LengthPrefixedCodec) Receive(v interface{}) error {
// 读取 4 字节长度头
header := make([]byte, 4)
if _, err := io.ReadFull(c.reader, header); err != nil {
return err
}
length := binary.BigEndian.Uint32(header)
// 读取消息体
body := make([]byte, length)
if _, err := io.ReadFull(c.reader, body); err != nil {
return err
}
return msgpack.Unmarshal(body, v)
}
// 消息类型定义
type Message struct {
Type string `msgpack:"type"`
ID int `msgpack:"id"`
Payload interface{} `msgpack:"payload,omitempty"`
}
// 服务端示例
func startServer(addr string) error {
ln, err := net.Listen("tcp", addr)
if err != nil {
return err
}
defer ln.Close()
for {
conn, err := ln.Accept()
if err != nil {
continue
}
go handleConnection(conn)
}
}
func handleConnection(conn net.Conn) {
defer conn.Close()
codec := NewLengthPrefixedCodec(conn)
for {
var msg Message
if err := codec.Receive(&msg); err != nil {
if err == io.EOF {
break
}
fmt.Printf("接收错误: %v\n", err)
break
}
fmt.Printf("收到: %+v\n", msg)
// 发送响应
response := Message{Type: "ack", ID: msg.ID}
if err := codec.Send(response); err != nil {
fmt.Printf("发送错误: %v\n", err)
break
}
}
}
JavaScript (Node.js) 实现
import { encode, decode } from "@msgpack/msgpack";
import net from "net";
class LengthPrefixedCodec {
constructor(socket) {
this.socket = socket;
this.buffer = Buffer.alloc(0);
}
// 发送消息
send(data) {
const body = encode(data);
const header = Buffer.alloc(4);
header.writeUInt32BE(body.length, 0);
this.socket.write(Buffer.concat([header, body]));
}
// 处理接收数据
onData(callback) {
this.socket.on("data", (chunk) => {
this.buffer = Buffer.concat([this.buffer, chunk]);
this._processBuffer(callback);
});
}
_processBuffer(callback) {
while (this.buffer.length >= 4) {
const length = this.buffer.readUInt32BE(0);
if (this.buffer.length < 4 + length) break; // 数据不完整
const body = this.buffer.subarray(4, 4 + length);
this.buffer = this.buffer.subarray(4 + length);
const message = decode(body);
callback(message);
}
}
}
// 服务端
const server = net.createServer((socket) => {
const codec = new LengthPrefixedCodec(socket);
codec.onData((msg) => {
console.log("收到:", msg);
codec.send({ type: "ack", id: msg.id });
});
});
server.listen(8080, () => console.log("监听 8080"));
// 客户端
const client = net.createConnection(8080, () => {
const codec = new LengthPrefixedCodec(client);
codec.send({ type: "ping", id: 1 });
codec.onData((msg) => {
console.log("响应:", msg);
client.end();
});
});
📖 RPC 集成 / RPC Integration
简单 RPC 协议设计
请求格式:
┌──────────┬──────────┬──────────┬──────────┐
│ type (1B) │ id (4B) │ method │ params │
│ 0x01=req │ 请求ID │ (msgpack)│ (msgpack)│
└──────────┴──────────┴──────────┴──────────┘
响应格式:
┌──────────┬──────────┬──────────┬──────────┐
│ type (1B) │ id (4B) │ result │ error │
│ 0x02=res │ 对应reqID│ (msgpack)│ (msgpack)│
└──────────┴──────────┴──────────┴──────────┘
Python RPC 实现
import msgpack
import struct
import asyncio
import uuid
from typing import Any, Callable
class MsgPackRPC:
"""基于 MessagePack 的简单 RPC 框架"""
TYPE_REQUEST = 0x01
TYPE_RESPONSE = 0x02
TYPE_NOTIFICATION = 0x03
def __init__(self):
self.methods: dict[str, Callable] = {}
self.pending: dict[int, asyncio.Future] = {}
def register(self, name: str, func: Callable):
"""注册 RPC 方法"""
self.methods[name] = func
def _pack_message(self, msg_type: int, msg_id: int, payload: dict) -> bytes:
body = msgpack.packb(payload, use_bin_type=True)
header = struct.pack(">BI", msg_type, msg_id)
length = struct.pack(">I", len(header) + len(body))
return length + header + body
async def send_request(self, writer, method: str, params: list) -> Any:
"""发送 RPC 请求并等待响应"""
msg_id = int.from_bytes(uuid.uuid4().bytes[:4], 'big')
payload = {"method": method, "params": params}
future = asyncio.get_event_loop().create_future()
self.pending[msg_id] = future
data = self._pack_message(self.TYPE_REQUEST, msg_id, payload)
writer.write(data)
await writer.drain()
return await future
async def handle_connection(self, reader, writer):
"""处理入站连接"""
while True:
try:
# 读取消息
len_bytes = await reader.readexactly(4)
length = struct.unpack(">I", len_bytes)[0]
msg_bytes = await reader.readexactly(length)
msg_type = msg_bytes[0]
msg_id = struct.unpack(">I", msg_bytes[1:5])[0]
payload = msgpack.unpackb(msg_bytes[5:], raw=False)
if msg_type == self.TYPE_REQUEST:
# 处理请求
method = payload["method"]
params = payload.get("params", [])
try:
if method in self.methods:
result = self.methods[method](*params)
response = {"result": result}
else:
response = {"error": f"未知方法: {method}"}
except Exception as e:
response = {"error": str(e)}
data = self._pack_message(self.TYPE_RESPONSE, msg_id, response)
writer.write(data)
await writer.drain()
elif msg_type == self.TYPE_RESPONSE:
# 处理响应
if msg_id in self.pending:
future = self.pending.pop(msg_id)
if "error" in payload:
future.set_exception(Exception(payload["error"]))
else:
future.set_result(payload.get("result"))
except asyncio.IncompleteReadError:
break
# 使用示例
async def main():
rpc = MsgPackRPC()
# 注册方法
rpc.register("add", lambda a, b: a + b)
rpc.register("multiply", lambda a, b: a * b)
rpc.register("greet", lambda name: f"你好, {name}!")
# 启动服务
server = await asyncio.start_server(rpc.handle_connection, "127.0.0.1", 8080)
print("RPC 服务启动在 8080")
async with server:
await server.serve_forever()
asyncio.run(main())
Go RPC 实现
package main
import (
"context"
"encoding/binary"
"fmt"
"io"
"net"
"sync"
"github.com/vmihailenco/msgpack/v5"
)
// MsgType 消息类型
type MsgType uint8
const (
MsgRequest MsgType = 0x01
MsgResponse MsgType = 0x02
MsgNotification MsgType = 0x03
)
// RPCRequest 请求消息
type RPCRequest struct {
Method string `msgpack:"method"`
Params interface{} `msgpack:"params"`
}
// RPCResponse 响应消息
type RPCResponse struct {
Result interface{} `msgpack:"result,omitempty"`
Error string `msgpack:"error,omitempty"`
}
// RPCServer RPC 服务器
type RPCServer struct {
methods map[string]func(params interface{}) (interface{}, error)
mu sync.RWMutex
}
func NewRPCServer() *RPCServer {
return &RPCServer{
methods: make(map[string]func(params interface{}) (interface{}, error)),
}
}
func (s *RPCServer) Register(name string, handler func(params interface{}) (interface{}, error)) {
s.mu.Lock()
s.methods[name] = handler
s.mu.Unlock()
}
func (s *RPCServer) Handle(conn net.Conn) {
defer conn.Close()
for {
// 读取长度
var length uint32
if err := binary.Read(conn, binary.BigEndian, &length); err != nil {
if err == io.EOF {
return
}
fmt.Printf("读取长度错误: %v\n", err)
return
}
// 读取消息体
body := make([]byte, length)
if _, err := io.ReadFull(conn, body); err != nil {
fmt.Printf("读取消息错误: %v\n", err)
return
}
msgType := MsgType(body[0])
msgID := binary.BigEndian.Uint32(body[1:5])
switch msgType {
case MsgRequest:
var req RPCRequest
if err := msgpack.Unmarshal(body[5:], &req); err != nil {
s.sendError(conn, msgID, err.Error())
continue
}
s.mu.RLock()
handler, ok := s.methods[req.Method]
s.mu.RUnlock()
if !ok {
s.sendError(conn, msgID, fmt.Sprintf("未知方法: %s", req.Method))
continue
}
result, err := handler(req.Params)
if err != nil {
s.sendError(conn, msgID, err.Error())
continue
}
s.sendResult(conn, msgID, result)
}
}
}
func (s *RPCServer) sendResult(conn net.Conn, msgID uint32, result interface{}) {
resp := RPCResponse{Result: result}
s.sendResponse(conn, msgID, resp)
}
func (s *RPCServer) sendError(conn net.Conn, msgID uint32, errMsg string) {
resp := RPCResponse{Error: errMsg}
s.sendResponse(conn, msgID, resp)
}
func (s *RPCServer) sendResponse(conn net.Conn, msgID uint32, resp RPCResponse) {
body, _ := msgpack.Marshal(resp)
header := make([]byte, 5)
header[0] = byte(MsgResponse)
binary.BigEndian.PutUint32(header[1:], msgID)
lengthBytes := make([]byte, 4)
binary.BigEndian.PutUint32(lengthBytes, uint32(5+len(body)))
conn.Write(lengthBytes)
conn.Write(header)
conn.Write(body)
}
// RPCClient RPC 客户端
type RPCClient struct {
conn net.Conn
pending map[uint32]chan RPCResponse
mu sync.RWMutex
nextID uint32
}
func NewRPCClient(addr string) (*RPCClient, error) {
conn, err := net.Dial("tcp", addr)
if err != nil {
return nil, err
}
client := &RPCClient{
conn: conn,
pending: make(map[uint32]chan RPCResponse),
}
go client.readLoop()
return client, nil
}
func (c *RPCClient) Call(ctx context.Context, method string, params interface{}) (interface{}, error) {
c.mu.Lock()
c.nextID++
msgID := c.nextID
ch := make(chan RPCResponse, 1)
c.pending[msgID] = ch
c.mu.Unlock()
// 发送请求
req := RPCRequest{Method: method, Params: params}
body, _ := msgpack.Marshal(req)
header := make([]byte, 5)
header[0] = byte(MsgRequest)
binary.BigEndian.PutUint32(header[1:], msgID)
lengthBytes := make([]byte, 4)
binary.BigEndian.PutUint32(lengthBytes, uint32(5+len(body)))
c.conn.Write(lengthBytes)
c.conn.Write(header)
c.conn.Write(body)
// 等待响应
select {
case resp := <-ch:
if resp.Error != "" {
return nil, fmt.Errorf("RPC error: %s", resp.Error)
}
return resp.Result, nil
case <-ctx.Done():
return nil, ctx.Err()
}
}
func (c *RPCClient) readLoop() {
for {
var length uint32
if err := binary.Read(c.conn, binary.BigEndian, &length); err != nil {
return
}
body := make([]byte, length)
if _, err := io.ReadFull(c.conn, body); err != nil {
return
}
msgID := binary.BigEndian.Uint32(body[1:5])
var resp RPCResponse
msgpack.Unmarshal(body[5:], &resp)
c.mu.RLock()
ch, ok := c.pending[msgID]
c.mu.RUnlock()
if ok {
ch <- resp
c.mu.Lock()
delete(c.pending, msgID)
c.mu.Unlock()
}
}
}
func (c *RPCClient) Close() error {
return c.conn.Close()
}
📖 与消息队列集成 / Message Queue Integration
Redis Pub/Sub + MessagePack
import redis
import msgpack
import json
class MsgPackRedisPubSub:
def __init__(self, redis_url="redis://localhost"):
self.redis = redis.from_url(redis_url)
self.pubsub = self.redis.pubsub()
def publish(self, channel: str, data: dict):
"""发布 MessagePack 编码的消息"""
packed = msgpack.packb(data, use_bin_type=True)
self.redis.publish(channel, packed)
def subscribe(self, *channels):
"""订阅频道"""
self.pubsub.subscribe(*channels)
def listen(self):
"""监听消息"""
for message in self.pubsub.listen():
if message["type"] == "message":
data = msgpack.unpackb(message["data"], raw=False)
yield message["channel"].decode(), data
# 发布者
pub = MsgPackRedisPubSub()
pub.publish("events", {"type": "user_login", "user_id": 123, "ts": 1700000000})
# 订阅者
sub = MsgPackRedisPubSub()
sub.subscribe("events")
for channel, event in sub.listen():
print(f"[{channel}] {event}")
Kafka + MessagePack
from kafka import KafkaProducer, KafkaConsumer
import msgpack
# 生产者
class MsgPackKafkaProducer:
def __init__(self, bootstrap_servers):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: msgpack.packb(v, use_bin_type=True),
key_serializer=lambda k: k.encode("utf-8") if isinstance(k, str) else k,
)
def send(self, topic: str, value: dict, key: str = None):
self.producer.send(topic, value=value, key=key)
self.producer.flush()
# 消费者
class MsgPackKafkaConsumer:
def __init__(self, bootstrap_servers, topic, group_id):
self.consumer = KafkaConsumer(
topic,
bootstrap_servers=bootstrap_servers,
group_id=group_id,
value_deserializer=lambda v: msgpack.unpackb(v, raw=False),
key_deserializer=lambda k: k.decode("utf-8") if k else None,
auto_offset_reset="earliest",
)
def consume(self):
for message in self.consumer:
yield {
"topic": message.topic,
"partition": message.partition,
"offset": message.offset,
"key": message.key,
"value": message.value,
}
# 使用
producer = MsgPackKafkaProducer(["localhost:9092"])
producer.send("user-events", {"action": "click", "page": "/home"}, key="user:123")
consumer = MsgPackKafkaConsumer(["localhost:9092"], "user-events", "my-group")
for msg in consumer.consume():
print(msg)
📖 缓冲区管理 / Buffer Management
环形缓冲区
import struct
import msgpack
class RingBuffer:
"""环形缓冲区,用于流式 MessagePack 解析"""
def __init__(self, capacity: int = 65536):
self.buffer = bytearray(capacity)
self.capacity = capacity
self.head = 0
self.tail = 0
self.size = 0
def write(self, data: bytes):
"""写入数据"""
if len(data) > self.capacity - self.size:
raise BufferError("缓冲区已满")
for byte in data:
self.buffer[self.tail] = byte
self.tail = (self.tail + 1) % self.capacity
self.size += 1
def read_message(self) -> dict | None:
"""尝试读取一条完整消息"""
if self.size < 4:
return None
# 预读长度(不移动 head)
length = self._peek_uint32()
if self.size < 4 + length:
return None
# 读取 header
header = self._read_bytes(4)
# 读取 body
body = self._read_bytes(length)
return msgpack.unpackb(body, raw=False)
def _peek_uint32(self) -> int:
b = bytes([
self.buffer[(self.head + i) % self.capacity] for i in range(4)
])
return struct.unpack(">I", b)[0]
def _read_bytes(self, n: int) -> bytes:
result = bytearray(n)
for i in range(n):
result[i] = self.buffer[self.head]
self.head = (self.head + 1) % self.capacity
self.size -= 1
return bytes(result)
自适应缓冲区
import io
import msgpack
class AdaptiveBuffer:
"""自适应缓冲区,根据数据量自动扩缩容"""
MIN_SIZE = 1024 # 最小 1KB
MAX_SIZE = 16 * 1024 * 1024 # 最大 16MB
GROW_FACTOR = 1.5 # 增长因子
SHRINK_THRESHOLD = 0.3 # 缩容阈值
def __init__(self):
self._buf = io.BytesIO()
self._capacity = self.MIN_SIZE
self._buf.truncate(self._capacity)
def write_message(self, data: dict):
"""写入一条消息"""
body = msgpack.packb(data, use_bin_type=True)
header = len(body).to_bytes(4, 'big')
new_size = self._buf.tell() + 4 + len(body)
# 需要扩容
if new_size > self._capacity:
self._grow(new_size)
self._buf.write(header)
self._buf.write(body)
def read_messages(self) -> list:
"""读取所有完整消息"""
self._buf.seek(0)
data = self._buf.read()
messages = []
offset = 0
while offset + 4 <= len(data):
length = int.from_bytes(data[offset:offset+4], 'big')
if offset + 4 + length > len(data):
break
body = data[offset+4:offset+4+length]
messages.append(msgpack.unpackb(body, raw=False))
offset += 4 + length
# 重置缓冲区
remaining = data[offset:]
self._buf = io.BytesIO()
self._buf.write(remaining)
# 检查是否需要缩容
self._maybe_shrink()
return messages
def _grow(self, needed: int):
while self._capacity < needed:
self._capacity = int(self._capacity * self.GROW_FACTOR)
self._capacity = min(self._capacity, self.MAX_SIZE)
def _maybe_shrink(self):
usage = self._buf.tell() / self._capacity
if usage < self.SHRINK_THRESHOLD and self._capacity > self.MIN_SIZE:
self._capacity = max(self.MIN_SIZE, self._capacity // 2)
💻 大数据流处理 / Large Data Streaming
分块传输大数组
import msgpack
import struct
import io
class ChunkedTransfer:
"""分块传输大数组"""
CHUNK_SIZE = 1000 # 每块 1000 条
@staticmethod
def send_chunked(writer, items: list):
"""分块发送大数组"""
total_chunks = (len(items) + ChunkedTransfer.CHUNK_SIZE - 1) // ChunkedTransfer.CHUNK_SIZE
# 发送头部
header = msgpack.packb({
"type": "chunked_start",
"total_items": len(items),
"total_chunks": total_chunks,
}, use_bin_type=True)
LengthPrefixedProtocol._write(writer, header)
# 分块发送
for i in range(0, len(items), ChunkedTransfer.CHUNK_SIZE):
chunk = items[i:i + ChunkedTransfer.CHUNK_SIZE]
chunk_data = msgpack.packb({
"type": "chunk",
"index": i // ChunkedTransfer.CHUNK_SIZE,
"data": chunk,
}, use_bin_type=True)
LengthPrefixedProtocol._write(writer, chunk_data)
# 发送结束标记
end = msgpack.packb({"type": "chunked_end"}, use_bin_type=True)
LengthPrefixedProtocol._write(writer, end)
@staticmethod
def receive_chunked(reader) -> list:
"""接收分块数据"""
items = []
expected_chunks = 0
chunks_received = 0
while True:
msg = LengthPrefixedProtocol._read(reader)
if msg is None:
break
if msg["type"] == "chunked_start":
expected_chunks = msg["total_chunks"]
elif msg["type"] == "chunk":
items.extend(msg["data"])
chunks_received += 1
elif msg["type"] == "chunked_end":
break
if chunks_received >= expected_chunks:
break
return items
⚠️ 注意事项 / Pitfalls
1. 消息边界丢失
❌ 错误: 直接使用 msgpack.unpackb 读取 TCP 流
问题: 多条消息粘包时,只能解析第一条,后续数据丢失
✅ 正确: 使用长度前缀或其他分帧协议
2. 缓冲区溢出
# ⚠️ 注意最大消息长度限制
MAX_MSG_SIZE = 16 * 1024 * 1024 # 16MB
def safe_read(reader, length):
if length > MAX_MSG_SIZE:
raise ValueError(f"消息过大: {length} bytes")
return reader.read(length)
3. 字节序一致性
⚠️ 确保所有客户端和服务端使用相同的字节序
MessagePack 内部使用大端序,长度头也应使用大端序
4. 错误恢复
# 流中某条消息损坏时的恢复策略
def resilient_unpacker(stream):
while True:
try:
yield read_message(stream)
except msgpack.UnpackException:
# 跳过损坏的消息,尝试重新同步
sync_to_next_message(stream)
🔗 扩展阅读 / Further Reading
| 资源 | 链接 |
|---|---|
| TCP 粘包问题 | https://en.wikipedia.org/wiki/Transmission_Control_Protocol |
| MessagePack-RPC 规范 | https://github.com/msgpack-rpc/msgpack-rpc |
| gRPC 消息分帧 | https://grpc.io/docs/what-is-grpc/core-concepts/ |
| WebSocket 协议 | https://tools.ietf.org/html/rfc6455 |
📝 下一章 / Next: 第 9 章 - Docker 中的使用 / Docker Usage — 在容器化环境中使用 MessagePack。