Memcached 传输协议精讲 / 第07章 二进制协议命令
第07章 二进制协议命令
本章详细讲解每个 Memcached 操作在二进制协议中的具体帧格式。
7.1 GET / GETQ / GETK / GETKQ
四种获取命令的区别
| 命令 | Opcode | 说明 |
|---|---|---|
GET | 0x00 | 标准获取 |
GETQ | 0x09 | 静默获取(key 不存在时不返回响应) |
GETK | 0x0C | 获取 + 返回 key |
GETKQ | 0x0D | 静默获取 + 返回 key |
GET 请求帧
Header (24 bytes):
Magic: 0x80
Opcode: 0x00
Key Length: len(key)
Extras: 0
Body Length: len(key)
Body:
Key: <key bytes>
GET 响应帧
Header (24 bytes):
Magic: 0x81
Opcode: 0x00
Key Length: 0 (GET) 或 len(key) (GETK)
Extras: 4 (flags)
Status: 0x0000 (成功) 或 0x0001 (未找到)
Body Length: 4 + len(key) + len(value)
CAS: <cas_unique>
Body:
Extras: <flags: uint32>
Key: <key bytes> (GETK/GETKQ)
Value: <value bytes>
Python 实现
import struct
import socket
def binary_get(sock, key: str, getk: bool = False):
"""二进制 GET 命令"""
key_bytes = key.encode()
opcode = 0x0C if getk else 0x00
header = struct.pack(
"!BBHBBHIIQ",
0x80, # magic
opcode, # opcode
len(key_bytes), # key length
0, # extras length
0x00, # data type
0, # reserved
len(key_bytes), # body length
1, # opaque
0 # cas
)
sock.sendall(header + key_bytes)
# 读取响应头
resp_header = _recv_exact(sock, 24)
(_, opcode, key_len, extras_len, _, status, body_len, opaque, cas) = \
struct.unpack("!BBHBBHIIQ", resp_header)
# 读取响应体
body = _recv_exact(sock, body_len)
if status == 0x0001: # Key Not Found
return None
flags = struct.unpack(">I", body[:extras_len])[0]
value = body[extras_len + key_len:]
return {
'value': value,
'flags': flags,
'cas': cas,
'opaque': opaque
}
def _recv_exact(sock, n: int) -> bytes:
data = b""
while len(data) < n:
chunk = sock.recv(n - len(data))
if not chunk:
raise ConnectionError("Connection closed")
data += chunk
return data
# GETQ — 静默获取
# 当 key 不存在时,不返回任何响应
# 适用于管道化请求:发送多个 GETQ,只有存在的 key 才返回响应
def binary_get_multi(sock, keys: list[str]) -> dict:
"""使用 GETKQ 批量获取"""
for i, key in enumerate(keys):
key_bytes = key.encode()
header = struct.pack(
"!BBHBBHIIQ",
0x80, 0x0D, len(key_bytes), 0, 0x00, 0, len(key_bytes), i, 0
)
sock.sendall(header + key_bytes)
# 发送 NOOP 作为结束标记
noop_header = struct.pack(
"!BBHBBHIIQ",
0x80, 0x0A, 0, 0, 0x00, 0, 0, len(keys), 0
)
sock.sendall(noop_header)
# 读取响应直到 NOOP 响应
result = {}
while True:
resp_header = _recv_exact(sock, 24)
(_, opcode, key_len, extras_len, _, status, body_len, opaque, cas) = \
struct.unpack("!BBHBBHIIQ", resp_header)
if opcode == 0x0A: # NOOP
break
body = _recv_exact(sock, body_len)
if status == 0x0000:
flags = struct.unpack(">I", body[:extras_len])[0]
key = body[extras_len:extras_len + key_len].decode()
value = body[extras_len + key_len:]
result[key] = {'value': value, 'flags': flags, 'cas': cas}
return result
7.2 SET / ADD / REPLACE
Extras 字段
存储命令的 Extras 固定为 8 字节:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Flags (uint32) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Expiration (uint32) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
SET 请求帧
Header:
Magic: 0x80
Opcode: 0x01
Key Length: len(key)
Extras: 8
Body Length: 8 + len(key) + len(value)
CAS: 0 (新写入) 或 cas_unique (CAS更新)
Body:
Extras: <flags: uint32> <exptime: uint32>
Key: <key bytes>
Value: <value bytes>
SET 响应帧
Header:
Magic: 0x81
Opcode: 0x01
Status: 0x0000 (成功)
CAS: <new cas_unique>
Body: (空)
完整实现
def binary_set(sock, key: str, value: bytes, flags: int = 0,
exptime: int = 0, cas: int = 0,
opcode: int = 0x01) -> dict:
"""
二进制 SET/ADD/REPLACE 命令
opcode: 0x01=SET, 0x02=ADD, 0x03=REPLACE
"""
key_bytes = key.encode()
extras = struct.pack(">II", flags, exptime)
body_length = len(extras) + len(key_bytes) + len(value)
header = struct.pack(
"!BBHBBHIIQ",
0x80, # magic
opcode, # opcode
len(key_bytes), # key length
len(extras), # extras length
0x00, # data type
0, # reserved
body_length, # body length
0, # opaque
cas # cas
)
sock.sendall(header + extras + key_bytes + value)
# 读取响应
resp_header = _recv_exact(sock, 24)
(_, _, _, _, _, status, _, _, new_cas) = \
struct.unpack("!BBHBBHIIQ", resp_header)
return {
'success': status == 0x0000,
'status': status,
'cas': new_cas
}
操作码对比
| 操作 | Opcode | Key 不存在 | Key 已存在 |
|---|---|---|---|
| SET | 0x01 | 创建 | 覆盖 |
| ADD | 0x02 | 创建 | 0x0005 Not Stored |
| REPLACE | 0x03 | 0x0001 Not Found | 覆盖 |
7.3 DELETE
DELETE 请求帧
Header:
Magic: 0x80
Opcode: 0x04
Key Length: len(key)
Extras: 0
Body Length: len(key)
Body:
Key: <key bytes>
DELETE 响应帧
Header:
Magic: 0x81
Opcode: 0x04
Status: 0x0000 (成功) 或 0x0001 (未找到)
Body: (空)
实现
def binary_delete(sock, key: str) -> bool:
key_bytes = key.encode()
header = struct.pack(
"!BBHBBHIIQ",
0x80, 0x04, len(key_bytes), 0, 0x00, 0, len(key_bytes), 0, 0
)
sock.sendall(header + key_bytes)
resp_header = _recv_exact(sock, 24)
(_, _, _, _, _, status, _, _, _) = \
struct.unpack("!BBHBBHIIQ", resp_header)
return status == 0x0000
7.4 INCREMENT / DECREMENT
Extras 字段
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Delta (uint64, 8 bytes) |
| |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Initial Value (uint64, 8 bytes) |
| |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Expiration (uint32) |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- Delta: 递增/递减的步长
- Initial Value: key 不存在时的初始值
- Expiration: 初始创建时的过期时间
响应
Body:
<result: uint64> — 递增/递减后的值
实现
def binary_incr_decr(sock, key: str, delta: int, initial: int = 0,
exptime: int = 0, incr: bool = True) -> int | None:
key_bytes = key.encode()
extras = struct.pack(">QQI", delta, initial, exptime)
opcode = 0x05 if incr else 0x06
header = struct.pack(
"!BBHBBHIIQ",
0x80, opcode, len(key_bytes), len(extras), 0x00, 0,
len(extras) + len(key_bytes), 0, 0
)
sock.sendall(header + extras + key_bytes)
resp_header = _recv_exact(sock, 24)
(_, _, _, _, _, status, body_len, _, _) = \
struct.unpack("!BBHBBHIIQ", resp_header)
if status == 0x0001: # Not Found
return None
body = _recv_exact(sock, body_len)
result = struct.unpack(">Q", body)[0]
return result
7.5 APPEND / PREPEND
APPEND 请求帧
Header:
Magic: 0x80
Opcode: 0x0E (Append) 或 0x0F (Prepend)
Key Length: len(key)
Extras: 0
Body Length: len(key) + len(value)
Body:
Key: <key bytes>
Value: <value bytes> — 要追加/前插的数据
实现
def binary_append(sock, key: str, value: bytes, prepend: bool = False) -> bool:
key_bytes = key.encode()
opcode = 0x0F if prepend else 0x0E
header = struct.pack(
"!BBHBBHIIQ",
0x80, opcode, len(key_bytes), 0, 0x00, 0,
len(key_bytes) + len(value), 0, 0
)
sock.sendall(header + key_bytes + value)
resp_header = _recv_exact(sock, 24)
(_, _, _, _, _, status, _, _, _) = \
struct.unpack("!BBHBBHIIQ", resp_header)
return status == 0x0000
7.6 VERSION
VERSION 请求帧
Header:
Magic: 0x80
Opcode: 0x0B
Key Length: 0
Extras: 0
Body Length: 0
Body: (空)
VERSION 响应帧
Header:
Magic: 0x81
Opcode: 0x0B
Status: 0x0000
Body Length: len(version_string)
Body:
Value: <version string bytes>
7.7 FLUSH
FLUSH 请求帧
Header:
Magic: 0x80
Opcode: 0x08
Key Length: 0
Extras: 0 或 4
Body Length: 0 或 4
Body:
Extras: <delay: uint32> (可选,延迟秒数)
实现
def binary_flush(sock, delay: int = 0) -> bool:
if delay > 0:
extras = struct.pack(">I", delay)
header = struct.pack(
"!BBHBBHIIQ",
0x80, 0x08, 0, 4, 0x00, 0, 4, 0, 0
)
sock.sendall(header + extras)
else:
header = struct.pack(
"!BBHBBHIIQ",
0x80, 0x08, 0, 0, 0x00, 0, 0, 0, 0
)
sock.sendall(header)
resp_header = _recv_exact(sock, 24)
(_, _, _, _, _, status, _, _, _) = \
struct.unpack("!BBHBBHIIQ", resp_header)
return status == 0x0000
7.8 STAT
STAT 请求帧
Header:
Magic: 0x80
Opcode: 0x10
Key Length: 0 或 len(key)
Extras: 0
Body Length: 0 或 len(key)
Body:
Key: <stat key> (可选,指定特定统计项)
STAT 响应
服务端返回多个响应帧,每帧包含一个统计键值对:
# 每个统计项一个响应帧
Header:
Magic: 0x81
Opcode: 0x10
Key Length: len(stat_name)
Body Length: len(stat_name) + len(stat_value)
Status: 0x0000
Body:
Key: <stat name>
Value: <stat value>
最后一个响应帧的 Key Length = 0,Body Length = 0,表示统计结束。
实现
def binary_stats(sock) -> dict[str, str]:
header = struct.pack(
"!BBHBBHIIQ",
0x80, 0x10, 0, 0, 0x00, 0, 0, 0, 0
)
sock.sendall(header)
result = {}
while True:
resp_header = _recv_exact(sock, 24)
(_, _, key_len, _, _, status, body_len, _, _) = \
struct.unpack("!BBHBBHIIQ", resp_header)
if key_len == 0 and body_len == 0:
break
body = _recv_exact(sock, body_len)
key = body[:key_len].decode()
value = body[key_len:].decode()
result[key] = value
return result
7.9 NOOP
NOOP 帧
Header:
Magic: 0x80
Opcode: 0x0A
Key Length: 0
Extras: 0
Body Length: 0
Body: (空)
用途
- 管道化边界标记: 在静默命令(Q 系列)之后发送 NOOP,用 NOOP 响应标记所有静默响应的结束
- 心跳检测: 检测连接是否存活
def binary_noop(sock) -> bool:
header = struct.pack(
"!BBHBBHIIQ",
0x80, 0x0A, 0, 0, 0x00, 0, 0, 0, 0
)
sock.sendall(header)
resp_header = _recv_exact(sock, 24)
(_, opcode, _, _, _, status, _, _, _) = \
struct.unpack("!BBHBBHIIQ", resp_header)
return opcode == 0x0A and status == 0x0000
7.10 Q 系列命令(静默命令)
概述
Q 系列命令(命令名带 Q 后缀)在操作成功时不返回响应,只在失败时返回。
| 命令 | Opcode | 成功时响应 |
|---|---|---|
| SETQ | 0x11 | 无 |
| ADDQ | 0x12 | 无 |
| REPLACEQ | 0x13 | 无 |
| DELETEQ | 0x14 | 无 |
| INCREMENTQ | 0x15 | 无 |
| DECREMENTQ | 0x16 | 无 |
| GETQ | 0x09 | 无(key 不存在时不返回) |
| GETKQ | 0x0D | 无(key 不存在时不返回) |
| APPENDQ | 0x19 | 无 |
| PREPENDQ | 0x1A | 无 |
| FLUSHQ | 0x18 | 无 |
| QUITQ | 0x17 | 无 |
管道化模式
def binary_multi_set(sock, items: list[tuple[str, bytes, int]]):
"""
使用 SETQ 管道化批量写入
items: [(key, value, exptime), ...]
"""
for i, (key, value, exptime) in enumerate(items):
key_bytes = key.encode()
extras = struct.pack(">II", 0, exptime)
body_len = len(extras) + len(key_bytes) + len(value)
header = struct.pack(
"!BBHBBHIIQ",
0x80, 0x11, len(key_bytes), len(extras), 0x00, 0,
body_len, i, 0
)
sock.sendall(header + extras + key_bytes + value)
# 发送 NOOP 确认所有操作完成
noop_header = struct.pack(
"!BBHBBHIIQ",
0x80, 0x0A, 0, 0, 0x00, 0, 0, len(items), 0
)
sock.sendall(noop_header)
# 读取 NOOP 响应(所有 SETQ 成功则只有 NOOP 响应)
while True:
resp_header = _recv_exact(sock, 24)
(_, opcode, _, _, _, _, _, opaque, _) = \
struct.unpack("!BBHBBHIIQ", resp_header)
if opcode == 0x0A: # NOOP
break
# 如果收到非 NOOP 响应,说明某个 SETQ 失败
body_len = struct.unpack("!I", resp_header[8:12])[0]
if body_len > 0:
_recv_exact(sock, body_len)
return True
7.11 SASL 认证
SASL 机制列表请求
Header:
Magic: 0x80
Opcode: 0x20
Body Length: 0
SASL 认证请求
Header:
Magic: 0x80
Opcode: 0x21
Key Length: len(mechanism)
Body Length: len(mechanism) + len(challenge)
Body:
Key: "PLAIN" (机制名称)
Value: <sasl challenge bytes>
PLAIN 认证示例
def binary_sasl_auth(sock, username: str, password: str) -> bool:
"""SASL PLAIN 认证"""
# PLAIN 格式: \0username\0password
challenge = b"\0" + username.encode() + b"\0" + password.encode()
mechanism = b"PLAIN"
header = struct.pack(
"!BBHBBHIIQ",
0x80, 0x21, len(mechanism), 0, 0x00, 0,
len(mechanism) + len(challenge), 0, 0
)
sock.sendall(header + mechanism + challenge)
resp_header = _recv_exact(sock, 24)
(_, _, _, _, _, status, body_len, _, _) = \
struct.unpack("!BBHBBHIIQ", resp_header)
if body_len > 0:
_recv_exact(sock, body_len)
return status == 0x0000
7.12 完整二进制客户端
#!/usr/bin/env python3
"""full_binary_client.py — 完整的 Memcached 二进制协议客户端"""
import struct
import socket
from typing import Optional
class FullBinaryMemcachedClient:
def __init__(self, host='127.0.0.1', port=11211):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((host, port))
self._opaque = 0
def _next_opaque(self) -> int:
self._opaque += 1
return self._opaque
def _send_recv(self, opcode: int, key: bytes = b"",
value: bytes = b"", extras: bytes = b"",
cas: int = 0) -> dict:
body_len = len(extras) + len(key) + len(value)
header = struct.pack(
"!BBHBBHIIQ",
0x80, opcode, len(key), len(extras), 0x00, 0,
body_len, self._next_opaque(), cas
)
self.sock.sendall(header + extras + key + value)
resp = _recv_exact(self.sock, 24)
(_, opcode, key_len, extras_len, _, status, body_len, opaque, cas) = \
struct.unpack("!BBHBBHIIQ", resp)
body = _recv_exact(self.sock, body_len) if body_len > 0 else b""
return {
'opcode': opcode, 'status': status,
'key': body[extras_len:extras_len + key_len],
'value': body[extras_len + key_len:],
'extras': body[:extras_len], 'cas': cas, 'opaque': opaque
}
def version(self) -> str:
r = self._send_recv(0x0B)
return r['value'].decode()
def set(self, key: str, value: bytes, flags: int = 0,
exptime: int = 0, cas: int = 0) -> dict:
extras = struct.pack(">II", flags, exptime)
return self._send_recv(0x01, key.encode(), value, extras, cas)
def add(self, key: str, value: bytes, flags: int = 0,
exptime: int = 0) -> dict:
extras = struct.pack(">II", flags, exptime)
return self._send_recv(0x02, key.encode(), value, extras)
def replace(self, key: str, value: bytes, flags: int = 0,
exptime: int = 0) -> dict:
extras = struct.pack(">II", flags, exptime)
return self._send_recv(0x03, key.encode(), value, extras)
def get(self, key: str) -> Optional[tuple[bytes, int, int]]:
r = self._send_recv(0x00, key.encode())
if r['status'] == 0x0001:
return None
flags = struct.unpack(">I", r['extras'])[0] if r['extras'] else 0
return r['value'], flags, r['cas']
def delete(self, key: str) -> bool:
r = self._send_recv(0x04, key.encode())
return r['status'] == 0x0000
def incr(self, key: str, delta: int, initial: int = 0,
exptime: int = 0) -> Optional[int]:
extras = struct.pack(">QQI", delta, initial, exptime)
r = self._send_recv(0x05, key.encode(), extras=extras)
if r['status'] == 0x0001:
return None
return struct.unpack(">Q", r['value'])[0]
def decr(self, key: str, delta: int, initial: int = 0,
exptime: int = 0) -> Optional[int]:
extras = struct.pack(">QQI", delta, initial, exptime)
r = self._send_recv(0x06, key.encode(), extras=extras)
if r['status'] == 0x0001:
return None
return struct.unpack(">Q", r['value'])[0]
def flush(self, delay: int = 0) -> bool:
if delay > 0:
extras = struct.pack(">I", delay)
r = self._send_recv(0x08, extras=extras)
else:
r = self._send_recv(0x08)
return r['status'] == 0x0000
def noop(self) -> bool:
r = self._send_recv(0x0A)
return r['status'] == 0x0000
def close(self):
self._send_recv(0x07) # QUIT
self.sock.close()
# 测试
client = FullBinaryMemcachedClient()
print(f"Version: {client.version()}")
client.set("bin:test", b"hello binary", flags=0, exptime=300)
result = client.get("bin:test")
if result:
print(f"Value: {result[0].decode()}, Flags: {result[1]}, CAS: {result[2]}")
print(f"Incr: {client.incr('bin:counter', 1, initial=100)}")
print(f"Incr: {client.incr('bin:counter', 1)}")
client.delete("bin:test")
client.delete("bin:counter")
client.close()
7.13 注意事项
| 编号 | 注意事项 | 说明 |
|---|---|---|
| 1 | Q 系列命令静默 | 成功时不返回响应,需要 NOOP 标记结束 |
| 2 | INCR/DECR 初始值 | 二进制协议支持 key 不存在时设置初始值(文本协议不支持) |
| 3 | CAS 为 0 语义 | 二进制协议中 CAS=0 表示"不做 CAS 检查" |
| 4 | Extras 长度固定 | 每个命令的 Extras 长度是固定的 |
| 5 | Body 长度验证 | extras_len + key_len + value_len == body_len |
7.14 扩展阅读
上一章: 第06章 二进制协议基础 下一章: 第08章 Meta 协议 — 探索 Memcached 1.6+ 的 Meta 协议高级特性。