强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

03 - Python 实践 / Python Implementation

Python 实践 / Python Implementation

本章介绍如何在 Python 中使用 msgpack 库进行序列化和反序列化,包括自定义类型、流式处理和性能优化。

This chapter covers using the msgpack library in Python for serialization and deserialization, including custom types, streaming, and performance optimization.


📖 库概览 / Library Overview

Python 的官方 MessagePack 库为 msgpack(PyPI 包名),底层由 C 扩展实现,性能优异。

属性
包名msgpack
PyPIhttps://pypi.org/project/msgpack/
GitHubhttps://github.com/msgpack/msgpack-python
最低 Python3.8+
核心特性C 扩展加速、流式解码、自定义类型钩子

安装

# 推荐方式
pip install msgpack

# 指定版本
pip install msgpack>=1.0.0

# 验证安装
python -c "import msgpack; print(msgpack.version)"

💻 基础序列化 / Basic Serialization

packb / unpackb

import msgpack

# ========== 序列化 ==========
data = {
    "id": 1001,
    "name": "Alice",
    "scores": [95, 87, 92],
    "active": True,
    "address": None
}

# packb: 将 Python 对象序列化为 bytes
packed = msgpack.packb(data)
print(type(packed))   # <class 'bytes'>
print(len(packed))    # 43 bytes (vs JSON ~70 bytes)
print(packed.hex())   # 十六进制查看

# ========== 反序列化 ==========
# unpackb: 将 bytes 反序列化为 Python 对象
unpacked = msgpack.unpackb(packed)
print(unpacked)
# {b'id': 1001, b'name': b'Alice', ...}

⚠️ 关键陷阱: 默认情况下,字符串被解码为 bytes,映射键也被解码为 bytes。这是最常见的问题。

正确处理字符串 (raw=False)

import msgpack

data = {"name": "Alice", "city": "北京"}
packed = msgpack.packb(data)

# ❌ 默认行为: 字符串变为 bytes
bad = msgpack.unpackb(packed)
print(bad)  # {b'name': b'Alice', b'city': b'\xe5\x8c\x97\xe4\xba\xac'}

# ✅ 正确做法: 使用 raw=False
good = msgpack.unpackb(packed, raw=False)
print(good)  # {'name': 'Alice', 'city': '北京'}

📝 建议: 在所有 unpackb 调用中始终加上 raw=False

pack / unpack(流式)

import msgpack
import io

data = {"hello": "world"}

# pack: 写入文件对象
buf = io.BytesIO()
msgpack.pack(data, buf)

# unpack: 从文件对象读取
buf.seek(0)
result = msgpack.unpack(buf, raw=False)
print(result)  # {'hello': 'world'}

💻 类型映射 / Type Mapping

Python → MessagePack

Python 类型MessagePack 类型说明
Nonenil空值
boolbooleantrue/false
intint/uint自动选择最小编码
floatfloat64双精度浮点
strstrUTF-8 字符串
bytesbin原始字节
bytearraybin原始字节
listarray数组
tuplearray元组也序列化为数组
dictmap映射

MessagePack → Python (raw=False)

MessagePack 类型Python 类型
nilNone
booleanbool
int/uintint
float32/64float
strstr
binbytes
arraylist
mapdict
extmsgpack.ExtType

处理 bytes vs str

import msgpack

# 当数据中同时包含文本和二进制时
data = {
    "name": "Alice",          # 应为 str
    "avatar": b"\x89PNG...",  # 应为 bytes
}

packed = msgpack.packb(data)

# raw=False 时,str 解码为 str,bin 解码为 bytes
result = msgpack.unpackb(packed, raw=False)
assert isinstance(result["name"], str)
assert isinstance(result["avatar"], bytes)

💻 自定义类型序列化 / Custom Type Serialization

使用 default 钩子

当 MessagePack 不认识某个类型时,会调用 default 函数:

import msgpack
from datetime import datetime, date
from decimal import Decimal
from uuid import UUID

def custom_encoder(obj):
    """自定义类型编码器"""
    if isinstance(obj, datetime):
        return {"__type__": "datetime", "value": obj.isoformat()}
    elif isinstance(obj, date):
        return {"__type__": "date", "value": obj.isoformat()}
    elif isinstance(obj, Decimal):
        return {"__type__": "decimal", "value": str(obj)}
    elif isinstance(obj, UUID):
        return {"__type__": "uuid", "value": str(obj)}
    elif isinstance(obj, set):
        return {"__type__": "set", "value": list(obj)}
    elif isinstance(obj, complex):
        return {"__type__": "complex", "real": obj.real, "imag": obj.imag}
    raise TypeError(f"Unknown type: {type(obj)}")

# 使用
data = {
    "created": datetime(2024, 1, 15, 10, 30, 0),
    "price": Decimal("99.99"),
    "tags": {"python", "msgpack"},
}

packed = msgpack.packb(data, default=custom_encoder, use_bin_type=True)
result = msgpack.unpackb(packed, raw=False)
print(result)
# {'created': {'__type__': 'datetime', 'value': '2024-01-15T10:30:00'},
#  'price': {'__type__': 'decimal', 'value': '99.99'},
#  'tags': {'__type__': 'set', 'value': ['python', 'msgpack']}}

使用 object_hook 钩子

import msgpack
from datetime import datetime
from decimal import Decimal
from uuid import UUID

def custom_decoder(obj):
    """自定义类型解码器"""
    if isinstance(obj, dict) and "__type__" in obj:
        type_name = obj["__type__"]
        if type_name == "datetime":
            return datetime.fromisoformat(obj["value"])
        elif type_name == "date":
            return datetime.fromisoformat(obj["value"]).date()
        elif type_name == "decimal":
            return Decimal(obj["value"])
        elif type_name == "uuid":
            return UUID(obj["value"])
        elif type_name == "set":
            return set(obj["value"])
        elif type_name == "complex":
            return complex(obj["real"], obj["imag"])
    return obj

# 使用
result = msgpack.unpackb(packed, raw=False, object_hook=custom_decoder)
print(type(result["created"]))  # <class 'datetime.datetime'>
print(type(result["price"]))    # <class 'decimal.Decimal'>
print(type(result["tags"]))     # <class 'set'>

使用 ExtType(推荐用于性能敏感场景)

ExtType 是 MessagePack 原生的扩展类型,比 JSON 风格的 __type__ 更紧凑:

import msgpack
from datetime import datetime
import struct

# ========== 编码 ==========
def encode_datetime(obj):
    if isinstance(obj, datetime):
        # 使用 MessagePack timestamp 扩展 (type=-1)
        ts = int(obj.timestamp())
        nsec = obj.microsecond * 1000
        if nsec == 0:
            data = struct.pack(">I", ts & 0x3FFFFFFFF)
        else:
            data = struct.pack(">QI", ts, nsec)
        return msgpack.ExtType(-1, data)
    raise TypeError(f"Unknown type: {type(obj)}")

# ========== 解码 ==========
def decode_datetime(code, data):
    if code == -1:  # timestamp
        if len(data) == 4:
            ts = struct.unpack(">I", data)[0]
        elif len(data) == 8:
            value = struct.unpack(">Q", data)[0]
            ts = value & 0x3FFFFFFFF
            nsec = value >> 34
            return datetime.fromtimestamp(ts).replace(microsecond=nsec // 1000)
        elif len(data) == 12:
            ts, nsec = struct.unpack(">QI", data)
            return datetime.fromtimestamp(ts).replace(microsecond=nsec // 1000)
        return datetime.fromtimestamp(ts)
    return msgpack.ExtType(code, data)

# ========== 使用 ==========
data = {"event": "login", "time": datetime(2024, 6, 15, 14, 30, 0)}
packed = msgpack.packb(data, default=encode_datetime, use_bin_type=True)
result = msgpack.unpackb(packed, raw=False, ext_hook=decode_datetime)

print(result)
# {'event': 'login', 'time': datetime.datetime(2024, 6, 15, 14, 30)}
print(f"ExtType 编码大小: {len(packed)} bytes")
# 比 __type__ 包装方式更紧凑

💻 流式处理 / Streaming

批量打包多个消息

import msgpack
import io

def pack_multiple(messages):
    """将多个消息打包到一个 buffer"""
    buf = io.BytesIO()
    for msg in messages:
        msgpack.pack(msg, buf)
    return buf.getvalue()

def unpack_multiple(data, raw=False):
    """从 buffer 中解包多个消息"""
    buf = io.BytesIO(data)
    unpacker = msgpack.Unpacker(buf, raw=raw)
    return list(unpacker)

# 使用
messages = [
    {"type": "greeting", "data": "hello"},
    {"type": "data", "values": [1, 2, 3]},
    {"type": "farewell", "data": "bye"},
]

packed = pack_multiple(messages)
results = unpack_multiple(packed)
print(f"打包了 {len(results)} 条消息")
for msg in results:
    print(f"  {msg}")

使用 Unpacker 处理流数据

import msgpack
import socket

def handle_connection(conn):
    """处理网络连接的流式解包"""
    # Unpacker 可以从流中自动提取消息
    unpacker = msgpack.Unpacker(conn, raw=False, max_buffer_size=1024*1024)
    
    for msg in unpacker:
        # 每次循环处理一条完整消息
        process_message(msg)

def process_message(msg):
    print(f"收到: {msg}")

# 模拟网络场景
# handle_connection(socket_connection)

自定义流式处理(带长度前缀)

import msgpack
import struct
import io

def pack_with_length(data):
    """添加长度前缀的消息打包"""
    packed = msgpack.packb(data, use_bin_type=True)
    # 4 字节大端长度前缀
    length = struct.pack(">I", len(packed))
    return length + packed

def unpack_stream(stream):
    """从流中解包带长度前缀的消息"""
    while True:
        # 读取 4 字节长度头
        header = stream.read(4)
        if len(header) < 4:
            break
        
        length = struct.unpack(">I", header)[0]
        
        # 读取消息体
        data = stream.read(length)
        if len(data) < length:
            break
        
        yield msgpack.unpackb(data, raw=False)

# 使用
buf = io.BytesIO()
buf.write(pack_with_length({"msg": "hello"}))
buf.write(pack_with_length({"msg": "world"}))
buf.seek(0)

for msg in unpack_stream(buf):
    print(msg)
# {'msg': 'hello'}
# {'msg': 'world'}

💻 性能优化 / Performance Optimization

选择合适的参数

import msgpack

data = {"count": 42, "name": "test"}

# 基准测试
import timeit

# 1. use_bin_type=True (推荐,区分 str 和 bytes)
t1 = timeit.timeit(
    lambda: msgpack.unpackb(msgpack.packb(data, use_bin_type=True), raw=False),
    number=100000
)
print(f"use_bin_type=True:  {t1:.3f}s")

# 2. use_single_float=True (浮点数用 float32,节省空间)
float_data = {"pi": 3.14159, "e": 2.71828}
t2 = timeit.timeit(
    lambda: msgpack.unpackb(msgpack.packb(float_data, use_single_float=True), raw=False),
    number=100000
)
print(f"use_single_float:   {t2:.3f}s")

# 3. 禁用自动类型推断(严格模式)
t3 = timeit.timeit(
    lambda: msgpack.packb(42),
    number=1000000
)
print(f"简单整数序列化:      {t3:.3f}s")

大数据处理优化

import msgpack
import io

def efficient_batch_process(data_list, batch_size=1000):
    """高效批量处理"""
    results = []
    buf = io.BytesIO()
    
    for i, item in enumerate(data_list):
        msgpack.pack(item, buf)
        
        if (i + 1) % batch_size == 0:
            # 每批处理一次
            buf.seek(0)
            unpacker = msgpack.Unpacker(buf, raw=False)
            for msg in unpacker:
                results.append(msg)
            buf = io.BytesIO()
    
    # 处理剩余
    buf.seek(0)
    unpacker = msgpack.Unpacker(buf, raw=False)
    for msg in unpacker:
        results.append(msg)
    
    return results

# 生成测试数据
data = [{"id": i, "value": f"item_{i}"} for i in range(10000)]
results = efficient_batch_process(data)
print(f"处理了 {len(results)} 条记录")

内存优化:使用 Unpacker 避免一次性加载

import msgpack

# ❌ 不好: 一次性加载全部数据
def bad_approach(huge_data):
    return msgpack.unpackb(huge_data, raw=False)

# ✅ 好: 流式处理,逐条读取
def good_approach(huge_data):
    results = []
    unpacker = msgpack.Unpacker(io.BytesIO(huge_data), raw=False)
    for item in unpacker:
        process_item(item)  # 逐条处理,不全部加载到内存
    return results

💻 与常见框架集成 / Framework Integration

与 Redis 集成

import msgpack
import redis

class MsgPackRedis:
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def set(self, key, value, ex=None):
        """使用 MessagePack 序列化存储"""
        packed = msgpack.packb(value, use_bin_type=True, default=str)
        self.redis.set(key, packed, ex=ex)
    
    def get(self, key):
        """读取并反序列化"""
        data = self.redis.get(key)
        if data is None:
            return None
        return msgpack.unpackb(data, raw=False)

# 使用
r = redis.Redis()
cache = MsgPackRedis(r)

cache.set("user:1001", {
    "name": "Alice",
    "roles": ["admin", "editor"],
    "login_count": 42
})

user = cache.get("user:1001")
print(user)  # {'name': 'Alice', 'roles': ['admin', 'editor'], 'login_count': 42}

与 FastAPI 集成

from fastapi import FastAPI, Request, Response
import msgpack

app = FastAPI()

class MsgPackResponse(Response):
    """MessagePack 响应类"""
    media_type = "application/x-msgpack"
    
    def render(self, content) -> bytes:
        return msgpack.packb(content, use_bin_type=True)

@app.get("/api/users/{user_id}")
async def get_user(user_id: int):
    user = {"id": user_id, "name": "Alice", "scores": [95, 87, 92]}
    return MsgPackResponse(content=user)

@app.post("/api/users")
async def create_user(request: Request):
    body = await request.body()
    data = msgpack.unpackb(body, raw=False)
    # 处理 data...
    return MsgPackResponse(content={"status": "ok", "user": data})

⚠️ 注意事项 / Pitfalls

1. 始终设置 raw=False

# ❌ 常见错误
data = msgpack.unpackb(packed)          # 字符串变成 bytes!
data = msgpack.unpackb(packed, raw=False)  # ✅ 正确

2. use_bin_type=True

# ❌ 默认行为: str 和 bytes 都编码为同一个类型
msgpack.packb("hello")        # str → fixstr
msgpack.packb(b"hello")       # bytes → fixstr (混淆!)

# ✅ 明确区分
msgpack.packb("hello", use_bin_type=True)   # str → str
msgpack.packb(b"hello", use_bin_type=True)  # bytes → bin

3. max_buffer_size 限制

# 处理大文件时,注意缓冲区限制
unpacker = msgpack.Unpacker(
    file_obj,
    raw=False,
    max_buffer_size=1024 * 1024 * 100  # 100MB
)

4. 元组会变成列表

import msgpack

original = (1, 2, 3)
packed = msgpack.packb(original)
result = msgpack.unpackb(packed)
print(type(result))  # <class 'list'>  — 元组丢失了!

5. dict 键类型限制

import msgpack

# ❌ 非字符串键
data = {1: "one", 2: "two"}
packed = msgpack.packb(data)
result = msgpack.unpackb(packed, raw=False)
print(result)  # {1: 'one', 2: 'two'} — 整数键保留,但不推荐

# ✅ 推荐: 使用字符串键
data = {"1": "one", "2": "two"}

🔗 扩展阅读 / Further Reading

资源链接
msgpack-python 文档https://github.com/msgpack/msgpack-python
API 参考https://msgpack-python.readthedocs.io/
性能对比https://github.com/nicholasgasior/gophers-serialization-benchmarks
msgpack-toolshttps://github.com/ludocode/msgpack-tools

📝 下一章 / Next: 第 4 章 - JavaScript 实践 / JavaScript Implementation — 在浏览器和 Node.js 中使用 MessagePack。