强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

11 - 代理实现

代理实现

11.1 为什么需要代理

Redis 代理位于客户端和 Redis 实例之间,提供透明的数据路由和管理功能:

需求代理方案原生方案
数据分片代理自动路由Cluster + 客户端支持
连接复用代理管理连接池每个客户端独立连接
读写分离代理路由读写请求客户端感知主从
协议转换代理适配不同协议无法实现
监控审计代理记录请求日志需要额外工具

代理架构

┌───────────┐     ┌─────────────┐     ┌──────────┐
│  Client   │────→│    Proxy    │────→│  Redis   │
│  (App)    │←────│   (代理)    │←────│  Master  │
└───────────┘     │             │     └──────────┘
                  │             │     ┌──────────┐
┌───────────┐     │             │────→│  Redis   │
│  Client   │────→│             │←────│  Slave   │
│  (App)    │←────│             │     └──────────┘
└───────────┘     └─────────────┘

11.2 主流代理方案对比

特性CodisTwemproxy (nutcracker)Redis ClusterEnvoy
开发方豌豆荚TwitterRedis LabsGoogle
语言GoCCC++
分片方式Pre-sharding一致性哈希/范围哈希槽可配置
在线扩缩容
Pipeline
事务✅(同槽)
多 key 操作Hash TagHash TagHash TagHash Tag
管理界面✅ (Codis Dashboard)✅ (Web UI)
维护状态活跃维护维护模式官方支持活跃维护

11.3 代理协议层

代理需要完整实现 RESP 协议的解析和转发:

请求转发

import socket
import selectors

class SimpleRedisProxy:
    def __init__(self, backends):
        """
        backends: [(host, port), ...] Redis 后端列表
        """
        self.backends = backends
        self.sel = selectors.DefaultSelector()

    def start(self, listen_host, listen_port):
        """启动代理服务"""
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        server.bind((listen_host, listen_port))
        server.listen(100)
        server.setblocking(False)
        self.sel.register(server, selectors.EVENT_READ, self._accept)

        while True:
            events = self.sel.select()
            for key, mask in events:
                callback = key.data
                callback(key.fileobj)

    def _accept(self, sock):
        """接受新连接"""
        client, addr = sock.accept()
        client.setblocking(False)
        backend = self._select_backend(client)
        self.sel.register(client, selectors.EVENT_READ,
                         lambda s: self._relay(s, backend))

    def _select_backend(self, client_sock):
        """选择后端(简单轮询)"""
        import random
        return self._connect_backend(random.choice(self.backends))

    def _connect_backend(self, addr):
        """连接后端"""
        sock = socket.create_connection(addr)
        sock.setblocking(False)
        return sock

    def _relay(self, client_sock, backend_sock):
        """转发数据"""
        try:
            data = client_sock.recv(65536)
            if data:
                backend_sock.sendall(data)
            else:
                self.sel.unregister(client_sock)
                client_sock.close()
                backend_sock.close()
        except ConnectionError:
            self.sel.unregister(client_sock)
            client_sock.close()
            backend_sock.close()

11.4 Twemproxy 实现原理

架构

                    Twemproxy (nutcracker)
                    ┌─────────────────────┐
  Client ──────────→│  接收请求            │
                    │  解析 RESP           │
                    │  提取 key            │
                    │  计算哈希            │
                    │  路由到后端          │
                    │  收集响应            │
                    │  返回客户端          │
                    └───────┬─────────────┘
                            │
            ┌───────────────┼───────────────┐
            ↓               ↓               ↓
      ┌──────────┐   ┌──────────┐   ┌──────────┐
      │ Redis 1  │   │ Redis 2  │   │ Redis 3  │
      │ slot 0-N │   │ slot N-M │   │ slot M-X │
      └──────────┘   └──────────┘   └──────────┘

哈希算法

Twemproxy 支持多种哈希算法:

算法说明
fnv1a_64FNV-1a 64 位(默认)
fnv1_64FNV-1 64 位
fnv1a_32FNV-1a 32 位
crc16CRC-16
crc32CRC-32
murmurMurmurHash
md5MD5
hsiehPaul Hsieh’s SuperFastHash

分片策略

# ketama(一致性哈希)
distribution: ketama
hash: fnv1a_64

# modula(取模)
distribution: modula
hash: fnv1a_64

# random(随机)
distribution: random

配置文件示例

# nutcracker.yml
redis-cluster:
  listen: 127.0.0.1:22121
  hash: fnv1a_64
  distribution: ketama
  timeout: 300
  preconnect: true
  server_retry_timeout: 30000
  server_failure_limit: 3
  servers:
    - 127.0.0.1:6379:1 server1
    - 127.0.0.1:6380:1 server2
    - 127.0.0.1:6381:1 server3

11.5 Codis 实现原理

架构

┌─────────────────────────────────────────────────┐
│                  Codis Cluster                   │
│                                                  │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐       │
│  │ Codis    │  │ Codis    │  │ Codis    │       │
│  │ Proxy    │  │ Proxy    │  │ Proxy    │       │
│  └────┬─────┘  └────┬─────┘  └────┬─────┘       │
│       │              │              │             │
│  ┌────┴──────────────┴──────────────┴────┐       │
│  │         Codis Dashboard               │       │
│  │    (元数据管理 + 槽位分配)             │       │
│  └────────────────────┬──────────────────┘       │
│                       │                          │
│  ┌────────┬───────────┼───────────┬────────┐     │
│  │        │           │           │        │     │
│  ↓        ↓           ↓           ↓        ↓     │
│ Group1  Group2     Group3     Group4   Group5     │
│ Master  Master     Master     Master   Master     │
│ Slave   Slave      Slave      Slave    Slave      │
│                                                  │
│  ┌──────────────────────────────────────────┐    │
│  │              ZooKeeper / etcd             │    │
│  │           (元数据存储 + 分布式锁)         │    │
│  └──────────────────────────────────────────┘    │
└─────────────────────────────────────────────────┘

Codis 的分片

Codis 将 1024 个槽位分配到多个 Redis Group:

槽位 0-255     → Group 1 (Redis Master + Slave)
槽位 256-511   → Group 2 (Redis Master + Slave)
槽位 512-767   → Group 3 (Redis Master + Slave)
槽位 768-1023  → Group 4 (Redis Master + Slave)

在线扩缩容

Codis 支持在线迁移槽位:

1. Dashboard 发起迁移任务
2. 源节点 → 目标节点:逐个迁移 key
3. 迁移期间 Proxy 处理 ASK 重定向
4. 迁移完成,更新槽位映射

11.6 连接池管理

代理中的连接池

import threading
import queue

class ConnectionPool:
    """连接池管理"""

    def __init__(self, max_size=10, max_idle=5):
        self.max_size = max_size
        self.max_idle = max_idle
        self.pool = queue.Queue(maxsize=max_idle)
        self.active_count = 0
        self.lock = threading.Lock()
        self.backend_addr = None

    def connect(self, host, port):
        self.backend_addr = (host, port)
        # 预创建连接
        for _ in range(self.max_idle):
            conn = self._create_connection()
            self.pool.put(conn)

    def _create_connection(self):
        sock = socket.create_connection(self.backend_addr)
        with self.lock:
            self.active_count += 1
        return sock

    def get(self, timeout=5):
        """获取连接"""
        try:
            conn = self.pool.get_nowait()
            if self._is_alive(conn):
                return conn
            # 连接已失效,创建新的
            with self.lock:
                self.active_count -= 1
        except queue.Empty:
            pass

        with self.lock:
            if self.active_count < self.max_size:
                return self._create_connection()

        # 等待可用连接
        return self.pool.get(timeout=timeout)

    def put(self, conn):
        """归还连接"""
        try:
            self.pool.put_nowait(conn)
        except queue.Full:
            conn.close()
            with self.lock:
                self.active_count -= 1

    def _is_alive(self, conn):
        """检查连接是否存活"""
        try:
            conn.sendall(b"*1\r\n$4\r\nPING\r\n")
            resp = conn.recv(1024)
            return resp == b"+PONG\r\n"
        except:
            return False

连接池配置

# redis-py 连接池配置
import redis

pool = redis.ConnectionPool(
    host="127.0.0.1",
    port=6379,
    max_connections=50,         # 最大连接数
    socket_timeout=2.0,         # Socket 超时
    socket_connect_timeout=1.0, # 连接超时
    retry_on_timeout=True,      # 超时重试
    health_check_interval=30,   # 健康检查间隔
    decode_responses=True       # 自动解码
)

r = redis.Redis(connection_pool=pool)

11.7 路由策略

基于 key 的路由

def route_by_key(key, num_shards):
    """基于 key 的一致性哈希路由"""
    hash_val = crc16(key.encode())
    return hash_val % num_shards

基于命令的路由

READ_COMMANDS = {"GET", "MGET", "HGET", "HGETALL", "LRANGE", "SMEMBERS", "KEYS"}
WRITE_COMMANDS = {"SET", "MSET", "DEL", "HSET", "LPUSH", "SADD"}

def route_command(cmd, args, master_conn, slave_conn):
    """读写分离路由"""
    cmd_upper = cmd.upper()

    if cmd_upper in READ_COMMANDS:
        # 读命令发往从节点
        return slave_conn
    elif cmd_upper in WRITE_COMMANDS:
        # 写命令发往主节点
        return master_conn
    else:
        # 默认发往主节点
        return master_conn

11.8 实现代理的关键挑战

挑战一:请求-响应配对

Pipeline 中多个请求可能在同一个 TCP 段中到达,代理需要正确解析并配对:

class RESPProxy:
    def __init__(self):
        self.pending_requests = 0
        self.response_buffer = b""

    def forward_pipeline(self, client_sock, backend_sock, num_commands):
        """转发 Pipeline 请求"""
        # 读取所有命令
        commands = []
        for _ in range(num_commands):
            cmd = self._read_complete_command(client_sock)
            commands.append(cmd)

        # 一次性发送到后端
        backend_sock.sendall(b"".join(commands))

        # 收集响应
        responses = []
        for _ in range(num_commands):
            resp = self._read_response(backend_sock)
            responses.append(resp)

        # 返回客户端
        client_sock.sendall(b"".join(responses))

挑战二:Pub/Sub 消息转发

Pub/Sub 消息是服务器主动推送的,代理需要特殊处理:

class PubSubProxy:
    def handle_subscribe(self, client_sock, backend_sock, channels):
        """处理订阅命令"""
        # 转发 SUBSCRIBE 命令
        cmd = encode_subscribe_command(channels)
        backend_sock.sendall(cmd)

        # 读取订阅确认
        for _ in range(len(channels)):
            resp = self._read_response(backend_sock)
            client_sock.sendall(self._encode_response(resp))

        # 进入消息转发模式
        self._forward_messages(client_sock, backend_sock)

    def _forward_messages(self, client_sock, backend_sock):
        """持续转发 Pub/Sub 消息"""
        while True:
            msg = self._read_response(backend_sock)
            client_sock.sendall(self._encode_response(msg))

挑战三:错误处理

def handle_error(self, client_sock, error_msg):
    """将错误转发给客户端"""
    error_resp = f"-{error_msg}\r\n".encode()
    client_sock.sendall(error_resp)

11.9 性能优化

多线程模型

import concurrent.futures

class ThreadedProxy:
    def __init__(self, max_workers=100):
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)

    def handle_client(self, client_sock):
        """在独立线程中处理客户端"""
        backend_sock = self.connect_backend()

        while True:
            data = client_sock.recv(65536)
            if not data:
                break

            # 解析并路由
            commands = self.parse_commands(data)
            for cmd, args in commands:
                target = self.route(args)
                target.sendall(self.encode_command(cmd, args))
                resp = self.read_response(target)
                client_sock.sendall(resp)

        client_sock.close()
        backend_sock.close()

异步模型(推荐)

import asyncio

class AsyncProxy:
    def __init__(self):
        self.connections = {}

    async def handle_client(self, reader, writer):
        """异步处理客户端连接"""
        backend_reader, backend_writer = await asyncio.open_connection(
            *self.select_backend()
        )

        try:
            while True:
                data = await reader.read(65536)
                if not data:
                    break

                # 转发到后端
                backend_writer.write(data)
                await backend_writer.drain()

                # 读取响应
                resp = await backend_reader.read(65536)
                writer.write(resp)
                await writer.drain()
        finally:
            writer.close()
            backend_writer.close()

    async def start(self, host, port):
        server = await asyncio.start_server(
            self.handle_client, host, port
        )
        async with server:
            await server.serve_forever()

11.10 监控与运维

代理监控指标

指标说明
连接数客户端连接数、后端连接数
请求速率QPS(每秒查询数)
延迟P50/P99/P999 延迟
错误率错误响应比例
后端健康后端节点状态

Prometheus 监控

from prometheus_client import Counter, Histogram, Gauge

# 定义指标
REQUEST_COUNT = Counter('redis_proxy_requests_total', 'Total requests', ['command'])
REQUEST_LATENCY = Histogram('redis_proxy_request_duration_seconds', 'Request latency')
ACTIVE_CONNECTIONS = Gauge('redis_proxy_active_connections', 'Active connections')

def monitor_request(command, duration):
    REQUEST_COUNT.labels(command=command).inc()
    REQUEST_LATENCY.observe(duration)

11.11 注意事项

⚠️ 代理是单点故障 代理本身需要高可用方案(如多实例 + 负载均衡器)。

⚠️ 增加延迟 代理会增加约 0.1-1ms 的延迟。在延迟敏感场景中需要评估影响。

⚠️ Pipeline 语义 代理可能改变 Pipeline 的原子性保证(如跨后端的 Pipeline)。

⚠️ 连接限制 代理需要管理大量连接,注意文件描述符限制和内存消耗。


11.12 扩展阅读

资源说明
Twemproxy GitHubTwitter 的 Redis 代理
Codis GitHub豌豆荚的 Redis 集群方案
Envoy Redis ProxyEnvoy 的 Redis 支持
Redis Cluster Proxy官方集群代理

上一章:集群协议 | 下一章:最佳实践