Redis 传输协议精讲 / 05 - Pipeline 管道机制
Pipeline 管道机制
5.1 为什么需要 Pipeline
RTT 的代价
标准的 Redis 交互是"请求-响应"模式:客户端发送一个命令,等待响应后才能发送下一个命令。
Client Server
│ │
│──── SET key1 value1 ─────────→│ T0
│ │
│←──── +OK ─────────────────────│ T0 + RTT
│ │
│──── SET key2 value2 ─────────→│ T0 + RTT
│ │
│←──── +OK ─────────────────────│ T0 + 2*RTT
│ │
│──── SET key3 value3 ─────────→│ T0 + 2*RTT
│ │
│←──── +OK ─────────────────────│ T0 + 3*RTT
如果 RTT = 1ms,3 个命令需要 3ms(不计命令执行时间)。10000 个命令需要 10 秒!
Pipeline 的解决方案
Pipeline 允许客户端一次发送多个命令,然后批量读取响应:
Client Server
│ │
│──── SET key1 value1 ─────────→│
│──── SET key2 value2 ─────────→│ 所有命令一次发送
│──── SET key3 value3 ─────────→│
│ │
│←──── +OK ─────────────────────│
│←──── +OK ─────────────────────│ 所有响应一次返回
│←──── +OK ─────────────────────│
3 个命令只需要 1 个 RTT!
5.2 Pipeline 协议层面
从协议角度看,Pipeline 就是在同一个 TCP 连接上连续发送多个 RESP 数组,不等待响应:
*3\r\n$3\r\nSET\r\n$4\r\nkey1\r\n$6\r\nvalue1\r\n
*3\r\n$3\r\nSET\r\n$4\r\nkey2\r\n$6\r\nvalue2\r\n
*3\r\n$3\r\nSET\r\n$4\r\nkey3\r\n$6\r\nvalue3\r\n
这 3 个 RESP 数组在同一个 TCP 段中发送。服务器依次处理每个命令,并将响应依次返回:
+OK\r\n
+OK\r\n
+OK\r\n
关键点
| 特性 | 说明 |
|---|---|
| 不是新协议 | Pipeline 只是发送方式的改变,协议格式不变 |
| 不保证原子性 | Pipeline 中的命令可能被其他客户端的命令穿插 |
| 顺序保证 | Pipeline 中的命令按发送顺序执行,响应也按同样顺序返回 |
| 错误处理 | 单个命令失败不影响其他命令,每个响应独立 |
5.3 性能对比
基准测试
import socket
import time
def benchmark_single(n):
"""逐个发送命令"""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("127.0.0.1", 6379))
start = time.time()
for i in range(n):
cmd = f"*3\r\n$3\r\nSET\r\n$4\r\nkey{i}\r\n$5\r\nvalue\r\n".encode()
s.sendall(cmd)
s.recv(1024) # 等待响应
elapsed = time.time() - start
s.close()
return elapsed
def benchmark_pipeline(n, batch_size=1000):
"""批量发送命令"""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("127.0.0.1", 6379))
start = time.time()
for batch_start in range(0, n, batch_size):
batch_end = min(batch_start + batch_size, n)
# 一次发送多个命令
buf = b""
for i in range(batch_start, batch_end):
buf += f"*3\r\n$3\r\nSET\r\n$4\r\nkey{i}\r\n$5\r\nvalue\r\n".encode()
s.sendall(buf)
# 批量读取响应
received = 0
while received < batch_end - batch_start:
data = s.recv(4096)
received += data.count(b"+OK\r\n")
elapsed = time.time() - start
s.close()
return elapsed
n = 10000
t_single = benchmark_single(n)
t_pipeline = benchmark_pipeline(n)
print(f"逐个发送: {t_single:.3f}s ({n/t_single:.0f} ops/s)")
print(f"Pipeline: {t_pipeline:.3f}s ({n/t_pipeline:.0f} ops/s)")
print(f"加速比: {t_single/t_pipeline:.1f}x")
典型结果
| 模式 | 耗时 | QPS | 加速比 |
|---|---|---|---|
| 逐个发送 | 2.50s | 4,000 | 1x |
| Pipeline (batch=100) | 0.15s | 66,667 | 17x |
| Pipeline (batch=1000) | 0.08s | 125,000 | 31x |
注意:实际性能取决于网络延迟(RTT)和服务器处理能力。RTT 越高,Pipeline 的收益越大。
不同 RTT 下的性能
假设 10000 个命令,命令执行时间忽略不计:
| RTT | 逐个发送 | Pipeline (batch=1000) | 加速比 |
|---|---|---|---|
| 0.1ms (本地) | 1s | 0.08s | 12x |
| 1ms (同机房) | 10s | 0.08s | 125x |
| 10ms (跨机房) | 100s | 0.1s | 1000x |
| 100ms (跨地域) | 1000s | 0.5s | 2000x |
Pipeline 在高延迟场景下收益最显著。
5.4 实现方式
方式一:手动拼接 RESP
import socket
class ManualPipeline:
def __init__(self, host="127.0.0.1", port=6379):
self.sock = socket.create_connection((host, port))
self.commands = []
self.response_count = 0
def execute(self, *args):
"""添加命令到 Pipeline"""
parts = [f"*{len(args)}\r\n".encode()]
for arg in args:
if isinstance(arg, str):
arg = arg.encode("utf-8")
parts.append(f"${len(arg)}\r\n".encode())
parts.append(arg)
parts.append(b"\r\n")
self.commands.append(b"".join(parts))
self.response_count += 1
def flush(self):
"""发送所有命令并读取响应"""
# 一次发送所有命令
self.sock.sendall(b"".join(self.commands))
# 读取所有响应
responses = []
buf = b""
while len(responses) < self.response_count:
chunk = self.sock.recv(65536)
if not chunk:
raise ConnectionError("Connection closed")
buf += chunk
# 简单统计 +OK 响应数量
responses = buf.split(b"+OK\r\n")
# 最后一个可能是不完整的
if responses[-1]:
responses = responses[:-1]
self.commands.clear()
self.response_count = 0
return responses
# 使用
pipeline = ManualPipeline()
for i in range(10000):
pipeline.execute("SET", f"key:{i}", f"value:{i}")
results = pipeline.flush()
print(f"Sent {len(results)} commands")
方式二:使用 redis-py
import redis
r = redis.Redis(host="127.0.0.1", port=6379, decode_responses=True)
# 创建 Pipeline(默认使用事务)
pipe = r.pipeline(transaction=False)
# 添加命令
for i in range(10000):
pipe.set(f"key:{i}", f"value:{i}")
# 执行并获取结果
results = pipe.execute()
print(f"Executed {len(results)} commands")
方式三:使用 Jedis (Java)
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
try (Jedis jedis = new Jedis("127.0.0.1", 6379)) {
Pipeline pipeline = jedis.pipelined();
for (int i = 0; i < 10000; i++) {
pipeline.set("key:" + i, "value:" + i);
}
List<Object> results = pipeline.syncAndReturnAll();
System.out.println("Executed " + results.size() + " commands");
}
5.5 Pipeline 中的错误处理
Pipeline 中每个命令独立执行,单个命令失败不影响其他命令:
import redis
r = redis.Redis(host="127.0.0.1", port=6379, decode_responses=True)
# 设置一个字符串类型的 key
r.set("mykey", "hello")
pipe = r.pipeline(transaction=False)
pipe.set("key1", "value1") # 正常
pipe.lpush("mykey", "item") # 错误:类型不匹配
pipe.set("key2", "value2") # 正常
try:
results = pipe.execute()
print(results) # [True, ResponseError, True]
except redis.exceptions.ResponseError as e:
# 取决于 raise_on_error 参数
print(f"Error: {e}")
错误处理策略
| 策略 | 说明 |
|---|---|
raise_on_error=True(默认) | 遇到第一个错误时抛出异常 |
raise_on_error=False | 返回所有结果,错误位置为异常对象 |
# 不抛出异常,检查每个结果
pipe = r.pipeline(transaction=False)
pipe.set("key1", "value1")
pipe.lpush("mykey", "item")
pipe.set("key2", "value2")
results = pipe.execute(raise_on_error=False)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Command {i} failed: {result}")
else:
print(f"Command {i} succeeded: {result}")
5.6 Pipeline 的内存考量
发送缓冲区
Pipeline 的命令在发送前存储在客户端内存中。如果 Pipeline 过大,可能导致:
- 客户端内存溢出:百万级命令占用大量内存
- 服务器输入缓冲区溢出:
client-output-buffer-limit限制 - 网络拥塞:大量数据一次性发送
最佳实践:分批发送
def batch_pipeline(r, commands, batch_size=1000):
"""分批执行 Pipeline 命令"""
results = []
for i in range(0, len(commands), batch_size):
batch = commands[i:i + batch_size]
pipe = r.pipeline(transaction=False)
for cmd, args in batch:
getattr(pipe, cmd)(*args)
results.extend(pipe.execute())
return results
# 使用
commands = [("set", [f"key:{i}", f"value:{i}"]) for i in range(100000)]
results = batch_pipeline(r, commands, batch_size=5000)
缓冲区配置
# Redis 服务端配置
# 硬限制:达到此限制立即断开连接
client-output-buffer-limit normal 0 0 0
# 软限制 + 窗口时间
client-output-buffer-limit normal 256mb 128mb 60
5.7 Pipeline vs 事务 vs Lua 脚本
| 特性 | Pipeline | MULTI/EXEC | Lua 脚本 |
|---|---|---|---|
| 原子性 | ❌ | ⚠️ 伪原子 | ✅ 真原子 |
| 性能 | 最高 | 较高 | 较低(脚本编译) |
| 复杂逻辑 | ❌ | ❌ | ✅ |
| 网络往返 | 1 RTT | 1 RTT | 1 RTT |
| 错误处理 | 逐个检查 | 整体回滚 | 脚本内处理 |
| 适用场景 | 批量读写 | 需要原子保证 | 条件逻辑 |
组合使用
Pipeline 可以包含事务命令:
pipe = r.pipeline(transaction=True) # 使用事务
pipe.multi()
pipe.set("key1", "value1")
pipe.set("key2", "value2")
pipe.execute() # EXEC
这等价于在一个 Pipeline 中发送 MULTI、多个命令、EXEC。
5.8 异步 Pipeline
Python asyncio 实现
import asyncio
import redis.asyncio as aioredis
async def async_pipeline_demo():
r = aioredis.Redis(host="127.0.0.1", port=6379)
pipe = r.pipeline(transaction=False)
for i in range(10000):
await pipe.set(f"key:{i}", f"value:{i}")
results = await pipe.execute()
print(f"Executed {len(results)} commands")
await r.close()
asyncio.run(async_pipeline_demo())
Go 实现
package main
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
)
func main() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
pipe := rdb.Pipeline()
for i := 0; i < 10000; i++ {
pipe.Set(ctx, fmt.Sprintf("key:%d", i), fmt.Sprintf("value:%d", i), 0)
}
cmders, err := pipe.Exec(ctx)
if err != nil {
panic(err)
}
fmt.Printf("Executed %d commands\n", len(cmders))
}
5.9 性能优化技巧
技巧一:合并小命令
# 不推荐:10000 次 HSET
for i in range(10000):
pipe.hset("myhash", f"field:{i}", f"value:{i}")
# 推荐:使用 HMSET(单次命令设置多个字段)
fields = {f"field:{i}": f"value:{i}" for i in range(10000)}
pipe.hset("myhash", mapping=fields)
技巧二:使用 MSET 替代多次 SET
# 不推荐
for key, value in data.items():
pipe.set(key, value)
# 推荐(每 1000 个一批)
items = list(data.items())
for i in range(0, len(items), 1000):
batch = dict(items[i:i+1000])
r.mset(batch)
技巧三:避免 Pipeline 中的读写依赖
# 不推荐:Pipeline 中有依赖关系
pipe.get("counter") # 读
pipe.incr("counter") # 写(依赖读的结果)
pipe.set("result", ...) # 依赖 incr 的结果
# 推荐:使用 Lua 脚本处理有依赖的操作
lua_script = """
local counter = redis.call('INCR', KEYS[1])
redis.call('SET', KEYS[2], counter * 2)
return counter
"""
r.eval(lua_script, 2, "counter", "result")
5.10 注意事项
⚠️ Pipeline 不保证原子性 Pipeline 中的命令可能被其他客户端的命令穿插执行。如果需要原子性保证,请使用 MULTI/EXEC 或 Lua 脚本。
⚠️ Pipeline 大小要适中 建议每批 1000-10000 个命令。太小收益不明显,太大可能导致内存问题。
⚠️ 连接池中的 Pipeline Pipeline 必须在同一个连接上执行。使用连接池时,Pipeline 会占用一个连接直到执行完成。
⚠️ 网络超时 大 Pipeline 的发送和接收可能需要较长时间,确保客户端的超时设置足够。
5.11 扩展阅读
| 资源 | 说明 |
|---|---|
| Redis Pipeline 文档 | 官方文档 |
| redis-py Pipeline | Python 客户端 Pipeline |
| Jedis Pipeline | Java 客户端 Pipeline |