强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

第08章 Meta 协议

第08章 Meta 协议

Meta 协议是 Memcached 1.6 引入的新一代命令集,用紧凑的文本格式传递丰富的元数据,兼顾可读性与高性能。


8.1 Meta 协议概述

设计动机

Meta 协议解决了文本协议和二进制协议各自的不足:

协议优点缺点
文本协议可读性好,易于调试功能有限,需要多次往返
二进制协议高效,功能完整不可读,难以调试
Meta 协议可读 + 功能丰富 + 高效需要 1.6+ 版本

核心特性

  1. 单命令完成多操作: 一条 meta 命令可以获取值 + 元数据 + 条件判断
  2. 减少往返次数: 原本需要 get + cas 两条命令的操作,现在一条 meta get 即可完成
  3. Token 标记机制: 用单字母 token 传递选项和返回元数据
  4. 向后兼容: Meta 协议基于 TCP 文本协议扩展,不破坏兼容性

版本要求

# 查看 Memcached 版本
echo "version" | nc 127.0.0.1 11211
# VERSION 1.6.22 — 支持 Meta 协议

# 启动时启用现代特性
memcached -o modern ...

8.2 Token 标记语法

请求 Token

Meta 命令使用单字母后缀标记传递选项:

<command> <key> <token1> <token2> ...
Token说明用于
b返回值的字节数get
c返回 CAS uniqueget
f返回 flagsget
h返回 item 是否已命中(hit)get
k返回 keyget
l返回 item 最后访问时间get
N设置新的 TTL(如果已有则跳过)get/set
O设置 opaque 标识get/set
q静默模式(noreply)get/set/delete
s返回 item 大小get
T设置 TTLset
M设置模式(set/add/replace)set
I惰性失效(不立即删除,标记为无效)delete
CCAS unique(条件操作)set/delete
k返回 keyset
v返回值(默认就返回)get
t返回 TTLget
n仅当 key 不存在时创建(类似 add)set
x返回上次访问时间get
X返回创建时间get
R自动刷新 TTL(touch)get

响应 Token

响应中的 token 以空格分隔的 token + value 形式返回:

VALUE <token1><value1> <token2><value2> ...
<data>
END

8.3 META GET

语法

mg <key> [tokens...]\r\n

常用 Token 组合

# 获取值 + flags + CAS + TTL
mg user:1001 f c t
VALUE f0 c123456 t3600
{"name":"Bob"}
END

# 获取值 + 大小 + opaque
mg user:1001 s O12345
VALUE s13 O12345
{"name":"Bob"}
END

# 仅获取元数据,不返回值
mg user:1001 s f c t b
VALUE s13 f0 c123456 t3600 b13
END

# 条件获取(仅当 CAS 匹配时返回值)
mg user:1001 c C123455
END  # CAS 不匹配,不返回值

# 静默模式
mg user:1001 q
# 不存在时无响应

完整参数表

Token参数响应说明
vv返回值(默认)
bb<n>返回字节数
cc<n>返回 CAS unique
ff<n>返回 flags
hh<0|1>返回是否命中
kk<key>返回 key
ll<n>返回最后访问时间(秒前)
ss<n>返回 item 大小
tt<n>返回剩余 TTL
xx<n>返回最后访问时间戳
XX<n>返回创建时间戳
O<opaque>O<opaque>返回 opaque 标识
N<ttl>仅当未命中时设置新 TTL
R自动刷新 TTL(touch)
T<ttl>更新 TTL
C<cas>条件获取(CAS 匹配)
q静默模式

Python 实现

import socket

class MetaGetClient:
    def __init__(self, host='127.0.0.1', port=11211):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.connect((host, port))

    def meta_get(self, key: str, tokens: str = "") -> dict | None:
        """
        mg <key> [tokens...]\r\n

        返回解析后的结果字典,或 None(未命中)
        """
        cmd = f"mg {key} {tokens}".strip() + "\r\n"
        self.sock.sendall(cmd.encode())

        buffer = b""
        while True:
            chunk = self.sock.recv(65536)
            buffer += chunk
            if b"END\r\n" in buffer:
                break

        lines = buffer.split(b"\r\n")
        first_line = lines[0].decode()

        if first_line == "END":
            return None  # 未命中

        # 解析 VALUE 行
        if not first_line.startswith("VALUE"):
            return None

        parts = first_line.split(" ")
        result = {}

        # 解析 token
        for token in parts[1:]:
            if len(token) >= 2:
                token_char = token[0]
                token_value = token[1:]
                result[token_char] = token_value
            elif len(token) == 1:
                result[token] = True

        # 如果有数据体
        if len(lines) >= 3 and lines[1]:
            result['_value'] = lines[1]

        return result

    def close(self):
        self.sock.sendall(b"quit\r\n")
        self.sock.close()

# 使用示例
client = MetaGetClient()

# 获取值 + flags + CAS + TTL
result = client.meta_get("user:1001", "f c t b")
if result:
    print(f"Flags: {result.get('f')}")
    print(f"CAS: {result.get('c')}")
    print(f"TTL: {result.get('t')}")
    print(f"Size: {result.get('b')}")
    print(f"Value: {result.get('_value')}")
else:
    print("未命中")

client.close()

8.4 META SET

语法

ms <key> <length> [tokens...]\r\n
<data block>\r\n

常用 Token

# 基本设置(带 flags 和 TTL)
ms user:1001 13 T3600 F0
{"name":"Bob"}
HD

# 条件设置(仅当 CAS 匹配时)
ms user:1001 15 C123456 T3600 F0
{"name":"Alice"}
HD

# 仅当 key 不存在时设置(类似 add)
ms newkey 5 T60 n
hello
HD

# 静默设置
ms user:1002 10 T300 q
{"id":1002}
# 无响应(成功时)

# 设置并返回 CAS
ms user:1003 10 T300 c
{"id":1003}
HD c789012

Token 详解

Token参数说明
T<ttl>设置 TTL
F<flags>设置 flags
C<cas>条件设置(CAS 匹配)
N<ttl>仅当未命中时设置新 TTL
n类似 add(仅 key 不存在时写入)
k响应中返回 key
c响应中返回新 CAS
q静默模式
O<opaque>返回 opaque
I惰性失效

响应

HD [tokens...]\r\n
  • HD = “Header Done”,表示存储成功
  • 可选返回 c<cas_unique> 等 token

错误响应

NS\r\n    # Not Stored(CAS 不匹配、条件不满足等)
EX\r\n    # Item Exists(CAS 冲突)
NF\r\n    # Not Found(key 不存在)

实现

class MetaSetClient:
    def __init__(self, host='127.0.0.1', port=11211):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.connect((host, port))

    def meta_set(self, key: str, value: bytes, ttl: int = 0,
                 flags: int = 0, cas: int = 0,
                 add_only: bool = False, noreply: bool = False,
                 return_cas: bool = False) -> dict:
        tokens = []
        if ttl > 0:
            tokens.append(f"T{ttl}")
        if flags > 0:
            tokens.append(f"F{flags}")
        if cas > 0:
            tokens.append(f"C{cas}")
        if add_only:
            tokens.append("n")
        if noreply:
            tokens.append("q")
        if return_cas:
            tokens.append("c")

        cmd = f"ms {key} {len(value)} {' '.join(tokens)}".strip() + "\r\n"
        self.sock.sendall(cmd.encode() + value + b"\r\n")

        if noreply:
            return {'status': 'noreply'}

        buffer = b""
        while True:
            chunk = self.sock.recv(4096)
            buffer += chunk
            if b"\r\n" in buffer:
                break

        line = buffer.decode().strip()
        result = {'raw': line}

        if line.startswith("HD"):
            result['status'] = 'stored'
            # 解析响应 token
            parts = line.split(" ")
            for token in parts[1:]:
                if token.startswith("c"):
                    result['cas'] = int(token[1:])
        elif line == "NS":
            result['status'] = 'not_stored'
        elif line == "EX":
            result['status'] = 'exists'
        elif line == "NF":
            result['status'] = 'not_found'

        return result

    def close(self):
        self.sock.sendall(b"quit\r\n")
        self.sock.close()

# 使用
client = MetaSetClient()

# 基本设置
r = client.meta_set("user:1001", b'{"name":"Bob"}', ttl=3600, flags=0)
print(r)  # {'status': 'stored'}

# 设置并获取 CAS
r = client.meta_set("user:1002", b'{"name":"Alice"}', ttl=3600, return_cas=True)
print(r)  # {'status': 'stored', 'cas': 123456}

# 条件设置(CAS 匹配)
r = client.meta_set("user:1002", b'{"name":"Eve"}', cas=123456, return_cas=True)
print(r)  # {'status': 'stored'} 或 {'status': 'exists'}

client.close()

8.5 META DELETE

语法

md <key> [tokens...]\r\n

Token

Token参数说明
C<cas>条件删除(CAS 匹配)
q静默模式
I惰性失效(不立即删除,标记为无效)
T<ttl>设置惰性失效的 TTL
k响应中返回 key
O<opaque>返回 opaque

响应

HD\r\n     # 删除成功
NF\r\n     # Key 不存在
EX\r\n     # CAS 不匹配

实现

def meta_delete(sock, key: str, cas: int = 0,
                lazy: bool = False, ttl: int = 0) -> str:
    tokens = []
    if cas > 0:
        tokens.append(f"C{cas}")
    if lazy:
        tokens.append("I")
        if ttl > 0:
            tokens.append(f"T{ttl}")

    cmd = f"md {key} {' '.join(tokens)}".strip() + "\r\n"
    sock.sendall(cmd.encode())

    buffer = b""
    while True:
        chunk = sock.recv(4096)
        buffer += chunk
        if b"\r\n" in buffer:
            break

    return buffer.decode().strip()

惰性失效

# 标记为无效(不立即从哈希表中删除)
md user:1001 I T60
HD

# 此后 get 操作返回未命中
mg user:1001 v
END

# 60 秒后 item 被自动清理

8.6 META ARITHMETIC

语法

ma <key> [tokens...]\r\n

Token

Token参数说明
D<delta>递增/递减步长
N<initial>不存在时的初始值
T<ttl>初始创建时的 TTL
J<value>设置新值(绝对值)
M<delta>乘以
O<delta>
A<delta>
X<delta>异或
q静默模式
c返回 CAS
t返回 TTL
C<cas>条件操作
k返回 key

响应

VA <size> [tokens...]\r\n
<value>\r\n

NF\r\n     # Not Found(且未指定 N)

实现

def meta_arithmetic(sock, key: str, delta: int = 1,
                    initial: int = 0, ttl: int = 0) -> int | None:
    tokens = [f"D{delta}"]
    if initial is not None:
        tokens.append(f"N{initial}")
    if ttl > 0:
        tokens.append(f"T{ttl}")

    cmd = f"ma {key} {' '.join(tokens)}".strip() + "\r\n"
    sock.sendall(cmd.encode())

    buffer = b""
    while True:
        chunk = sock.recv(4096)
        buffer += chunk
        if b"\r\n" in buffer:
            break

    lines = buffer.decode().strip().split("\r\n")
    if lines[0].startswith("VA"):
        return int(lines[1])
    return None

使用示例

# 递增 1(不存在时初始化为 100)
ma counter D1 N100 T3600
VA 3
100

# 递增 5
ma counter D5
VA 3
105

# 递减 10
ma counter D-10
VA 2
95

8.7 Meta 协议 vs 传统协议对比

操作文本协议Meta 协议改进
获取 + Flagsget + 解析mg key f一条命令
获取 + CASgetsmg key c一条命令
条件获取getscas (2次)mg key c C<cas>1 次往返
设置 + 返回 CASsetgets (2次)ms key len c1 次往返
CAS 删除不支持md key C<cas>新功能
惰性删除不支持md key I新功能
条件递增不支持ma key C<cas>新功能

8.8 业务场景

场景一:原子读取并刷新 TTL

# 读取用户数据并刷新 TTL 为 3600 秒
mg user:1001 f T3600
VALUE f0
{"name":"Bob"}
END

传统方式需要两条命令:

get user:1001
VALUE user:1001 0 13
{"name":"Bob"}
END

# 然后 touch(需要额外往返)
touch user:1001 3600
TOUCHED

场景二:带条件检查的缓存更新

def safe_update(sock, key: str, new_value: bytes, max_retries: int = 5):
    """使用 Meta 协议的原子 CAS 更新"""
    for attempt in range(max_retries):
        # 1. 获取当前值和 CAS
        sock.sendall(f"mg {key} c f\r\n".encode())
        resp = _read_response(sock)

        if resp is None:
            # key 不存在,直接写入
            sock.sendall(f"ms {key} {len(new_value)} n\r\n".encode()
                        + new_value + b"\r\n")
            result = _read_response(sock)
            if result == "HD":
                return True
            continue

        cas = resp.get('c')
        flags = resp.get('f', '0')

        # 2. CAS 写入
        sock.sendall(
            f"ms {key} {len(new_value)} C{cas} F{flags}\r\n".encode()
            + new_value + b"\r\n"
        )
        result = _read_response(sock)
        if result == "HD":
            return True  # 成功
        # EX = CAS 冲突,重试

    return False

场景三:分布式锁(使用 CAS 删除)

import uuid
import time

class MetaDistributedLock:
    def __init__(self, sock, lock_key: str, ttl: int = 30):
        self.sock = sock
        self.lock_key = f"lock:{lock_key}"
        self.token = str(uuid.uuid4()).encode()
        self.ttl = ttl
        self.cas = 0

    def acquire(self, timeout: int = 10) -> bool:
        deadline = time.time() + timeout
        while time.time() < deadline:
            # 尝试 add
            cmd = f"ms {self.lock_key} {len(self.token)} n T{self.ttl} c\r\n"
            self.sock.sendall(cmd.encode() + self.token + b"\r\n")
            resp = self._read_resp()

            if resp and resp.startswith("HD"):
                # 解析返回的 CAS
                parts = resp.split(" ")
                for part in parts:
                    if part.startswith("c"):
                        self.cas = int(part[1:])
                return True
            time.sleep(0.1)
        return False

    def release(self) -> bool:
        if self.cas == 0:
            return False
        # 使用 CAS 删除(只有持有正确 CAS 的客户端才能删除)
        cmd = f"md {self.lock_key} C{self.cas}\r\n"
        self.sock.sendall(cmd.encode())
        resp = self._read_resp()
        return resp == "HD"

    def _read_resp(self) -> str:
        buffer = b""
        while True:
            chunk = self.sock.recv(4096)
            buffer += chunk
            if b"\r\n" in buffer:
                break
        return buffer.decode().strip()

8.9 注意事项

编号注意事项说明
1版本要求需要 Memcached 1.6+
2Token 区分大小写Tt 含义不同
3静默模式不返回成功q token 使成功时无响应
4惰性删除不释放内存需要等待 LRU 淘汰或 TTL 过期
5mg 默认返回值不加任何 token 也返回值
6ms 必须指定长度数据长度必须精确

8.10 扩展阅读


上一章: 第07章 二进制协议命令 下一章: 第09章 代理与分布式 — 实现 Memcached 代理、连接池和一致性哈希。