强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

10 - 集群协议

集群协议

10.1 集群概述

Redis Cluster 是 Redis 的分布式方案,通过数据分片(Sharding)实现水平扩展。

核心概念

概念说明
哈希槽(Hash Slot)16384 个槽位,每个 key 属于一个槽
节点(Node)每个节点负责一部分槽位
槽位分配如 0-5460 → Node A, 5461-10922 → Node B, 10923-16383 → Node C
重定向客户端访问错误节点时,收到 MOVED/ASK 重定向

槽位计算

HASH_SLOT = CRC16(key) mod 16384
def crc16(s: bytes) -> int:
    """Redis 使用的 CRC16 算法"""
    crc = 0
    for byte in s:
        crc ^= byte << 8
        for _ in range(8):
            if crc & 0x8000:
                crc = (crc << 1) ^ 0x1021
            else:
                crc <<= 1
            crc &= 0xFFFF
    return crc

def hash_slot(key: str) -> int:
    """计算 key 的哈希槽"""
    # 处理 hash tag {}
    start = key.find('{')
    if start != -1:
        end = key.find('}', start + 1)
        if end != -1 and end != start + 1:
            key = key[start + 1:end]

    return crc16(key.encode()) % 16384

10.2 MOVED 重定向

概述

当客户端向错误的节点发送命令时,节点返回 MOVED 错误,告诉客户端正确的节点地址。

协议格式

-MOVED <slot> <host>:<port>

交互流程

Client                         Node A (负责 0-5460)
  │                                │
  │──── GET key:7777 ────────────→│  key:7777 在槽 7777
  │                                │  槽 7777 不属于 Node A
  │←── MOVED 7777 127.0.0.1:6380 ─│
  │                                │
  │──── GET key:7777 ────────────→│  Node B (负责 5461-10922)
  │←── $5\r\nhello ──────────────│

客户端处理逻辑

import socket
import re

class ClusterClient:
    def __init__(self, startup_nodes):
        """startup_nodes: 初始节点列表 [(host, port), ...]"""
        self.nodes = {addr: self._connect(*addr) for addr in startup_nodes}
        self.slot_map = {}  # slot → (host, port)
        self._refresh_slots()

    def _connect(self, host, port):
        return socket.create_connection((host, port))

    def _refresh_slots(self):
        """刷新槽位映射"""
        for addr, sock in self.nodes.items():
            try:
                sock.sendall(b"*2\r\n$7\r\nCLUSTER\r\n$9\r\nSLOTS\r\n\r\n")
                resp = self._read_response(sock)
                # 解析 CLUSTER SLOTS 响应
                # [[0, 5460, ["127.0.0.1", 6379]], ...]
                for slot_range in resp:
                    start_slot = slot_range[0]
                    end_slot = slot_range[1]
                    node_info = slot_range[2]
                    node_addr = (node_info[0], node_info[1])
                    for slot in range(start_slot, end_slot + 1):
                        self.slot_map[slot] = node_addr
                return
            except:
                continue

    def execute(self, *args):
        """执行命令,处理 MOVED 重定向"""
        for attempt in range(3):
            # 计算槽位
            key = args[1] if len(args) > 1 else None
            slot = hash_slot(key) if key else 0

            # 获取目标节点
            target = self.slot_map.get(slot)
            if target is None:
                self._refresh_slots()
                target = self.slot_map.get(slot)

            sock = self.nodes.get(target)
            # 发送命令
            cmd = encode_command(*args)
            sock.sendall(cmd)

            # 读取响应
            resp = self._read_response(sock)

            # 检查 MOVED
            if isinstance(resp, Exception) and str(resp).startswith("MOVED"):
                parts = str(resp).split()
                new_host, new_port = parts[2].split(":")
                new_addr = (new_host, int(new_port))

                # 更新槽位映射
                self.slot_map[slot] = new_addr

                # 连接到新节点
                if new_addr not in self.nodes:
                    self.nodes[new_addr] = self._connect(*new_addr)

                continue  # 重试

            return resp

        raise RuntimeError("Max retries exceeded")

CLUSTER SLOTS 命令

→ CLUSTER SLOTS
← *3
← *4
← :0              ← 起始槽
← :5460           ← 结束槽
← *2              ← 主节点
← $9
← 127.0.0.1
← :6379
← *2              ← 从节点
← $9
← 127.0.0.1
← :6380
← *4
← :5461
← :10922
← *2
← $9
← 127.0.0.1
← :6381
← *2
← $9
← 127.0.0.1
← :6382
← *4
← :10923
← :16383
← *2
← $9
← 127.0.0.1
← :6383
← *2
← $9
← 127.0.0.1
← :6384

CLUSTER NODES 命令

返回集群拓扑的文本描述:

→ CLUSTER NODES
← $<多行文本>

# 格式:<node-id> <ip:port@cport> <flags> <master-id> <ping-sent> <pong-recv> <config-epoch> <link-state> <slot> <slot> ...

示例输出:

a]1b2c3d4e5f... 127.0.0.1:6379@16379 myself,master - 0 0 1 connected 0-5460
b2c3d4e5f6... 127.0.0.1:6381@16381 master - 0 0 2 connected 5461-10922
c3d4e5f6a7... 127.0.0.1:6383@16383 master - 0 0 3 connected 10923-16383

10.3 ASK 重定向

MOVED vs ASK

特性MOVEDASK
含义槽位永久归属新节点槽位正在迁移中
客户端行为更新本地槽位映射仅本次重定向,不更新映射
后续请求直接发往新节点仍发往旧节点(直到迁移完成)

槽位迁移流程

1. Node A: CLUSTER SETSLOT 5461 MIGRATING <node-b-id>
2. Node B: CLUSTER SETSLOT 5461 IMPORTING <node-a-id>
3. 逐个迁移槽位中的 key
4. 所有节点: CLUSTER SETSLOT 5461 NODE <node-b-id>

ASK 交互流程

Client                         Node A (正在迁出槽 5461)
  │                                │
  │──── GET key:5461 ────────────→│  key 已迁移或不存在
  │                                │
  │←── ASK 5461 127.0.0.1:6380 ───│
  │                                │
  │──── ASKING ──────────────────→│  Node B (正在迁入槽 5461)
  │←── +OK ───────────────────────│  ASKING 标记允许访问导入中的槽
  │                                │
  │──── GET key:5461 ────────────→│
  │←── $5\r\nhello ──────────────│

ASKING 命令

ASKING 命令告诉节点"我知道这个槽正在导入,请让我访问":

*1\r\n$6\r\nASKING\r\n

响应:+OK\r\n

客户端 ASK 处理

def handle_ask(self, slot, new_addr, original_args):
    """处理 ASK 重定向"""
    # 1. 连接到新节点
    sock = self._get_or_connect(new_addr)

    # 2. 发送 ASKING
    sock.sendall(b"*1\r\n$6\r\nASKING\r\n")
    self._read_response(sock)  # +OK

    # 3. 重新发送原始命令
    cmd = encode_command(*original_args)
    sock.sendall(cmd)
    resp = self._read_response(sock)

    # 注意:不更新 slot_map,因为槽位还在迁移中
    return resp

10.4 Gossip 协议

概述

Redis Cluster 使用 Gossip 协议在节点间交换集群状态信息。每个节点定期与其他节点交换消息。

Gossip 消息类型

消息说明
PING节点 A 向节点 B 发送心跳
PONG对 PING 的响应,也用于广播自身状态
MEET邀请新节点加入集群
FAIL广播节点故障

消息格式

Gossip 消息通过 Redis 集群总线(Cluster Bus)传输,使用专用端口(数据端口 + 10000):

# 集群总线端口
数据端口: 6379
总线端口: 16379

CLUSTER INFO 命令

→ CLUSTER INFO
← cluster_state:ok
← cluster_slots_assigned:16384
← cluster_slots_ok:16384
← cluster_slots_pfail:0
← cluster_slots_fail:0
← cluster_known_nodes:6
← cluster_size:3
← cluster_current_epoch:6
← cluster_my_epoch:1
← cluster_stats_messages_sent:14839
← cluster_stats_messages_received:14839
字段说明
cluster_stateokfail
cluster_slots_assigned已分配的槽位数(应为 16384)
cluster_slots_pfail处于 PFAIL 状态的槽位数
cluster_slots_fail处于 FAIL 状态的槽位数
cluster_known_nodes已知节点数
cluster_size主节点数
cluster_current_epoch当前纪元号

10.5 集群命令

CLUSTER MYID

获取当前节点 ID:

→ CLUSTER MYID
← $40
← abc123...

CLUSTER MEET

邀请节点加入集群:

→ CLUSTER MEET 127.0.0.1 6385
← +OK

CLUSTER ADDSLOTS / DELSLOTS

分配/移除槽位:

→ CLUSTER ADDSLOTS 0 1 2 3 4 5
← +OK

→ CLUSTER DELSLOTS 0 1 2
← +OK

CLUSTER KEYSLOT

查看 key 属于哪个槽:

→ CLUSTER KEYSLOT mykey
← :14687

10.6 集群中的多 key 操作

限制

所有多 key 操作(如 MGET、MSET、事务、Lua 脚本)要求所有 key 在同一个槽:

# ❌ 错误:key 在不同槽
→ MGET user:1 user:2
← -CROSSSLOT Keys in request don't hash to the same slot

Hash Tag 解决方案

使用 {} 指定 hash tag,确保相关 key 映射到同一个槽:

# 使用 hash tag
{user:1000}.name    → 槽 X
{user:1000}.email   → 槽 X(相同)
{user:1000}.age     → 槽 X(相同)

# MGET 可以执行
→ MGET {user:1000}.name {user:1000}.email {user:1000}.age

Hash Tag 策略

def make_keys(user_id, *fields):
    """生成使用 hash tag 的 key 列表"""
    return [f"{{{user_id}}}:{field}" for field in fields]

keys = make_keys("user:1000", "name", "email", "age")
# ["{user:1000}:name", "{user:1000}:email", "{user:1000}:age"]

# 所有 key 都在同一个槽
assert all(hash_slot(keys[0]) == hash_slot(k) for k in keys)

10.7 集群中的 Pipeline

Pipeline 在集群模式下需要按槽位分组:

def cluster_pipeline(self, commands):
    """集群模式的 Pipeline"""
    # 按槽位分组
    slot_groups = {}
    for cmd_args in commands:
        key = cmd_args[1]
        slot = hash_slot(key)
        if slot not in slot_groups:
            slot_groups[slot] = []
        slot_groups[slot].append(cmd_args)

    # 每个槽位一组 Pipeline
    results = []
    for slot, cmds in slot_groups.items():
        node = self.slot_map[slot]
        sock = self.nodes[node]

        # 发送 Pipeline
        buf = b""
        for cmd_args in cmds:
            buf += encode_command(*cmd_args)
        sock.sendall(buf)

        # 读取响应
        for _ in cmds:
            resp = self._read_response(sock)
            if isinstance(resp, Exception) and str(resp).startswith("MOVED"):
                # 需要重新路由
                pass
            results.append(resp)

    return results

10.8 客户端重试策略

指数退避重试

import time
import random

def execute_with_retry(self, *args, max_retries=5):
    """带指数退避的重试策略"""
    for attempt in range(max_retries):
        try:
            return self._execute(*args)
        except (MOVED, ASK) as e:
            # 重定向,立即重试
            continue
        except ConnectionError:
            # 连接失败,退避重试
            if attempt < max_retries - 1:
                delay = min(0.1 * (2 ** attempt), 2.0)
                delay += random.uniform(0, delay * 0.1)  # 抖动
                time.sleep(delay)
    raise RuntimeError("Max retries exceeded")

10.9 注意事项

⚠️ 最少 3 个主节点 集群至少需要 3 个主节点才能正常工作。奇数个主节点有利于故障时的选举。

⚠️ 槽位迁移期间的可用性 迁移中的 key 可能需要两次重定向(ASK + 操作),这会增加延迟。

⚠️ 集群不支持跨槽事务 事务和 Lua 脚本中的所有 key 必须在同一个槽中。使用 hash tag 确保这一点。

⚠️ 客户端缓存槽位映射 客户端应缓存槽位到节点的映射,避免每次都通过 MOVED 发现。使用 CLUSTER SLOTS 定期刷新。


10.10 扩展阅读

资源说明
Redis Cluster 教程官方教程
Redis Cluster 规范集群协议规范
CLUSTER SLOTS槽位查询命令
Hash TagHash Tag 规范

上一章:哨兵协议 | 下一章:代理实现