11 - 代理实现
代理实现
11.1 为什么需要代理
Redis 代理位于客户端和 Redis 实例之间,提供透明的数据路由和管理功能:
| 需求 | 代理方案 | 原生方案 |
|---|---|---|
| 数据分片 | 代理自动路由 | Cluster + 客户端支持 |
| 连接复用 | 代理管理连接池 | 每个客户端独立连接 |
| 读写分离 | 代理路由读写请求 | 客户端感知主从 |
| 协议转换 | 代理适配不同协议 | 无法实现 |
| 监控审计 | 代理记录请求日志 | 需要额外工具 |
代理架构
┌───────────┐ ┌─────────────┐ ┌──────────┐
│ Client │────→│ Proxy │────→│ Redis │
│ (App) │←────│ (代理) │←────│ Master │
└───────────┘ │ │ └──────────┘
│ │ ┌──────────┐
┌───────────┐ │ │────→│ Redis │
│ Client │────→│ │←────│ Slave │
│ (App) │←────│ │ └──────────┘
└───────────┘ └─────────────┘
11.2 主流代理方案对比
| 特性 | Codis | Twemproxy (nutcracker) | Redis Cluster | Envoy |
|---|---|---|---|---|
| 开发方 | 豌豆荚 | Redis Labs | ||
| 语言 | Go | C | C | C++ |
| 分片方式 | Pre-sharding | 一致性哈希/范围 | 哈希槽 | 可配置 |
| 在线扩缩容 | ✅ | ❌ | ✅ | ✅ |
| Pipeline | ✅ | ✅ | ✅ | ✅ |
| 事务 | ❌ | ❌ | ✅(同槽) | ❌ |
| 多 key 操作 | Hash Tag | Hash Tag | Hash Tag | Hash Tag |
| 管理界面 | ✅ (Codis Dashboard) | ❌ | ❌ | ✅ (Web UI) |
| 维护状态 | 活跃维护 | 维护模式 | 官方支持 | 活跃维护 |
11.3 代理协议层
代理需要完整实现 RESP 协议的解析和转发:
请求转发
import socket
import selectors
class SimpleRedisProxy:
def __init__(self, backends):
"""
backends: [(host, port), ...] Redis 后端列表
"""
self.backends = backends
self.sel = selectors.DefaultSelector()
def start(self, listen_host, listen_port):
"""启动代理服务"""
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((listen_host, listen_port))
server.listen(100)
server.setblocking(False)
self.sel.register(server, selectors.EVENT_READ, self._accept)
while True:
events = self.sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj)
def _accept(self, sock):
"""接受新连接"""
client, addr = sock.accept()
client.setblocking(False)
backend = self._select_backend(client)
self.sel.register(client, selectors.EVENT_READ,
lambda s: self._relay(s, backend))
def _select_backend(self, client_sock):
"""选择后端(简单轮询)"""
import random
return self._connect_backend(random.choice(self.backends))
def _connect_backend(self, addr):
"""连接后端"""
sock = socket.create_connection(addr)
sock.setblocking(False)
return sock
def _relay(self, client_sock, backend_sock):
"""转发数据"""
try:
data = client_sock.recv(65536)
if data:
backend_sock.sendall(data)
else:
self.sel.unregister(client_sock)
client_sock.close()
backend_sock.close()
except ConnectionError:
self.sel.unregister(client_sock)
client_sock.close()
backend_sock.close()
11.4 Twemproxy 实现原理
架构
Twemproxy (nutcracker)
┌─────────────────────┐
Client ──────────→│ 接收请求 │
│ 解析 RESP │
│ 提取 key │
│ 计算哈希 │
│ 路由到后端 │
│ 收集响应 │
│ 返回客户端 │
└───────┬─────────────┘
│
┌───────────────┼───────────────┐
↓ ↓ ↓
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Redis 1 │ │ Redis 2 │ │ Redis 3 │
│ slot 0-N │ │ slot N-M │ │ slot M-X │
└──────────┘ └──────────┘ └──────────┘
哈希算法
Twemproxy 支持多种哈希算法:
| 算法 | 说明 |
|---|---|
fnv1a_64 | FNV-1a 64 位(默认) |
fnv1_64 | FNV-1 64 位 |
fnv1a_32 | FNV-1a 32 位 |
crc16 | CRC-16 |
crc32 | CRC-32 |
murmur | MurmurHash |
md5 | MD5 |
hsieh | Paul Hsieh’s SuperFastHash |
分片策略
# ketama(一致性哈希)
distribution: ketama
hash: fnv1a_64
# modula(取模)
distribution: modula
hash: fnv1a_64
# random(随机)
distribution: random
配置文件示例
# nutcracker.yml
redis-cluster:
listen: 127.0.0.1:22121
hash: fnv1a_64
distribution: ketama
timeout: 300
preconnect: true
server_retry_timeout: 30000
server_failure_limit: 3
servers:
- 127.0.0.1:6379:1 server1
- 127.0.0.1:6380:1 server2
- 127.0.0.1:6381:1 server3
11.5 Codis 实现原理
架构
┌─────────────────────────────────────────────────┐
│ Codis Cluster │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Codis │ │ Codis │ │ Codis │ │
│ │ Proxy │ │ Proxy │ │ Proxy │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ ┌────┴──────────────┴──────────────┴────┐ │
│ │ Codis Dashboard │ │
│ │ (元数据管理 + 槽位分配) │ │
│ └────────────────────┬──────────────────┘ │
│ │ │
│ ┌────────┬───────────┼───────────┬────────┐ │
│ │ │ │ │ │ │
│ ↓ ↓ ↓ ↓ ↓ │
│ Group1 Group2 Group3 Group4 Group5 │
│ Master Master Master Master Master │
│ Slave Slave Slave Slave Slave │
│ │
│ ┌──────────────────────────────────────────┐ │
│ │ ZooKeeper / etcd │ │
│ │ (元数据存储 + 分布式锁) │ │
│ └──────────────────────────────────────────┘ │
└─────────────────────────────────────────────────┘
Codis 的分片
Codis 将 1024 个槽位分配到多个 Redis Group:
槽位 0-255 → Group 1 (Redis Master + Slave)
槽位 256-511 → Group 2 (Redis Master + Slave)
槽位 512-767 → Group 3 (Redis Master + Slave)
槽位 768-1023 → Group 4 (Redis Master + Slave)
在线扩缩容
Codis 支持在线迁移槽位:
1. Dashboard 发起迁移任务
2. 源节点 → 目标节点:逐个迁移 key
3. 迁移期间 Proxy 处理 ASK 重定向
4. 迁移完成,更新槽位映射
11.6 连接池管理
代理中的连接池
import threading
import queue
class ConnectionPool:
"""连接池管理"""
def __init__(self, max_size=10, max_idle=5):
self.max_size = max_size
self.max_idle = max_idle
self.pool = queue.Queue(maxsize=max_idle)
self.active_count = 0
self.lock = threading.Lock()
self.backend_addr = None
def connect(self, host, port):
self.backend_addr = (host, port)
# 预创建连接
for _ in range(self.max_idle):
conn = self._create_connection()
self.pool.put(conn)
def _create_connection(self):
sock = socket.create_connection(self.backend_addr)
with self.lock:
self.active_count += 1
return sock
def get(self, timeout=5):
"""获取连接"""
try:
conn = self.pool.get_nowait()
if self._is_alive(conn):
return conn
# 连接已失效,创建新的
with self.lock:
self.active_count -= 1
except queue.Empty:
pass
with self.lock:
if self.active_count < self.max_size:
return self._create_connection()
# 等待可用连接
return self.pool.get(timeout=timeout)
def put(self, conn):
"""归还连接"""
try:
self.pool.put_nowait(conn)
except queue.Full:
conn.close()
with self.lock:
self.active_count -= 1
def _is_alive(self, conn):
"""检查连接是否存活"""
try:
conn.sendall(b"*1\r\n$4\r\nPING\r\n")
resp = conn.recv(1024)
return resp == b"+PONG\r\n"
except:
return False
连接池配置
# redis-py 连接池配置
import redis
pool = redis.ConnectionPool(
host="127.0.0.1",
port=6379,
max_connections=50, # 最大连接数
socket_timeout=2.0, # Socket 超时
socket_connect_timeout=1.0, # 连接超时
retry_on_timeout=True, # 超时重试
health_check_interval=30, # 健康检查间隔
decode_responses=True # 自动解码
)
r = redis.Redis(connection_pool=pool)
11.7 路由策略
基于 key 的路由
def route_by_key(key, num_shards):
"""基于 key 的一致性哈希路由"""
hash_val = crc16(key.encode())
return hash_val % num_shards
基于命令的路由
READ_COMMANDS = {"GET", "MGET", "HGET", "HGETALL", "LRANGE", "SMEMBERS", "KEYS"}
WRITE_COMMANDS = {"SET", "MSET", "DEL", "HSET", "LPUSH", "SADD"}
def route_command(cmd, args, master_conn, slave_conn):
"""读写分离路由"""
cmd_upper = cmd.upper()
if cmd_upper in READ_COMMANDS:
# 读命令发往从节点
return slave_conn
elif cmd_upper in WRITE_COMMANDS:
# 写命令发往主节点
return master_conn
else:
# 默认发往主节点
return master_conn
11.8 实现代理的关键挑战
挑战一:请求-响应配对
Pipeline 中多个请求可能在同一个 TCP 段中到达,代理需要正确解析并配对:
class RESPProxy:
def __init__(self):
self.pending_requests = 0
self.response_buffer = b""
def forward_pipeline(self, client_sock, backend_sock, num_commands):
"""转发 Pipeline 请求"""
# 读取所有命令
commands = []
for _ in range(num_commands):
cmd = self._read_complete_command(client_sock)
commands.append(cmd)
# 一次性发送到后端
backend_sock.sendall(b"".join(commands))
# 收集响应
responses = []
for _ in range(num_commands):
resp = self._read_response(backend_sock)
responses.append(resp)
# 返回客户端
client_sock.sendall(b"".join(responses))
挑战二:Pub/Sub 消息转发
Pub/Sub 消息是服务器主动推送的,代理需要特殊处理:
class PubSubProxy:
def handle_subscribe(self, client_sock, backend_sock, channels):
"""处理订阅命令"""
# 转发 SUBSCRIBE 命令
cmd = encode_subscribe_command(channels)
backend_sock.sendall(cmd)
# 读取订阅确认
for _ in range(len(channels)):
resp = self._read_response(backend_sock)
client_sock.sendall(self._encode_response(resp))
# 进入消息转发模式
self._forward_messages(client_sock, backend_sock)
def _forward_messages(self, client_sock, backend_sock):
"""持续转发 Pub/Sub 消息"""
while True:
msg = self._read_response(backend_sock)
client_sock.sendall(self._encode_response(msg))
挑战三:错误处理
def handle_error(self, client_sock, error_msg):
"""将错误转发给客户端"""
error_resp = f"-{error_msg}\r\n".encode()
client_sock.sendall(error_resp)
11.9 性能优化
多线程模型
import concurrent.futures
class ThreadedProxy:
def __init__(self, max_workers=100):
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
def handle_client(self, client_sock):
"""在独立线程中处理客户端"""
backend_sock = self.connect_backend()
while True:
data = client_sock.recv(65536)
if not data:
break
# 解析并路由
commands = self.parse_commands(data)
for cmd, args in commands:
target = self.route(args)
target.sendall(self.encode_command(cmd, args))
resp = self.read_response(target)
client_sock.sendall(resp)
client_sock.close()
backend_sock.close()
异步模型(推荐)
import asyncio
class AsyncProxy:
def __init__(self):
self.connections = {}
async def handle_client(self, reader, writer):
"""异步处理客户端连接"""
backend_reader, backend_writer = await asyncio.open_connection(
*self.select_backend()
)
try:
while True:
data = await reader.read(65536)
if not data:
break
# 转发到后端
backend_writer.write(data)
await backend_writer.drain()
# 读取响应
resp = await backend_reader.read(65536)
writer.write(resp)
await writer.drain()
finally:
writer.close()
backend_writer.close()
async def start(self, host, port):
server = await asyncio.start_server(
self.handle_client, host, port
)
async with server:
await server.serve_forever()
11.10 监控与运维
代理监控指标
| 指标 | 说明 |
|---|---|
| 连接数 | 客户端连接数、后端连接数 |
| 请求速率 | QPS(每秒查询数) |
| 延迟 | P50/P99/P999 延迟 |
| 错误率 | 错误响应比例 |
| 后端健康 | 后端节点状态 |
Prometheus 监控
from prometheus_client import Counter, Histogram, Gauge
# 定义指标
REQUEST_COUNT = Counter('redis_proxy_requests_total', 'Total requests', ['command'])
REQUEST_LATENCY = Histogram('redis_proxy_request_duration_seconds', 'Request latency')
ACTIVE_CONNECTIONS = Gauge('redis_proxy_active_connections', 'Active connections')
def monitor_request(command, duration):
REQUEST_COUNT.labels(command=command).inc()
REQUEST_LATENCY.observe(duration)
11.11 注意事项
⚠️ 代理是单点故障 代理本身需要高可用方案(如多实例 + 负载均衡器)。
⚠️ 增加延迟 代理会增加约 0.1-1ms 的延迟。在延迟敏感场景中需要评估影响。
⚠️ Pipeline 语义 代理可能改变 Pipeline 的原子性保证(如跨后端的 Pipeline)。
⚠️ 连接限制 代理需要管理大量连接,注意文件描述符限制和内存消耗。
11.12 扩展阅读
| 资源 | 说明 |
|---|---|
| Twemproxy GitHub | Twitter 的 Redis 代理 |
| Codis GitHub | 豌豆荚的 Redis 集群方案 |
| Envoy Redis Proxy | Envoy 的 Redis 支持 |
| Redis Cluster Proxy | 官方集群代理 |