强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

06 - 发布订阅协议

发布订阅协议

6.1 Pub/Sub 概述

Redis Pub/Sub 是一种消息通信模式:发布者(Publisher)向频道(Channel)发送消息,订阅者(Subscriber)接收该频道的所有消息。

┌───────────┐   PUBLISH channel msg   ┌──────────┐
│ Publisher  │ ──────────────────────→ │  Redis   │
└───────────┘                          │  Server  │
                                       └────┬─────┘
                                            │
                            ┌───────────────┼───────────────┐
                            ↓               ↓               ↓
                      ┌──────────┐   ┌──────────┐   ┌──────────┐
                      │ Sub 1    │   │ Sub 2    │   │ Sub 3    │
                      └──────────┘   └──────────┘   └──────────┘

核心特征

特性说明
模式发布-订阅(Fire and Forget)
持久化❌ 消息不持久化,离线订阅者会丢失消息
确认机制❌ 无 ACK
消息路由频道匹配(精确 + 模式)
协议变化进入订阅模式后,连接行为改变

6.2 SUBSCRIBE 命令

命令格式

SUBSCRIBE channel [channel ...]

RESP 编码

*3\r\n
$9\r\n
SUBSCRIBE\r\n
$7\r\n
channel\r\n
$11\r\n
channel-two\r\n

响应格式

订阅成功后,服务器返回 3 元素数组:

*3\r\n
$9\r\n
subscribe\r\n
$7\r\n
channel\r\n
:1\r\n          ← 当前订阅数

结构化表示:

[
  "subscribe",    ← 消息类型
  "channel",      ← 频道名
  1               ← 当前连接的订阅数
]

多频道订阅

$ telnet 127.0.0.1 6379
*3\r\n$9\r\nSUBSCRIBE\r\n$4\r\nnews\r\n$5\r\ncache\r\n

# 响应(每个频道一个确认)
*3\r\n$9\r\nsubscribe\r\n$4\r\nnews\r\n:1\r\n
*3\r\n$9\r\nsubscribe\r\n$5\r\ncache\r\n:2\r\n

6.3 订阅模式下的连接行为

重要规则

进入订阅模式后,连接的行为发生根本改变:

行为普通模式订阅模式
执行命令✅ 允许❌ 只允许 SUBSCRIBE/UNSUBSCRIBE/PSUBSCRIBE/PUNSUBSCRIBE/PING/QUIT
响应格式命令的响应订阅确认 / 消息 / 退订确认
主动推送✅ 服务器推送消息
Pipeline⚠️ 有限制

订阅模式中允许的命令

命令说明
SUBSCRIBE订阅新频道
UNSUBSCRIBE退订频道
PSUBSCRIBE模式订阅
PUNSUBSCRIBE退订模式
PING心跳检测(返回正常 PONG)
QUIT关闭连接
RESETRedis 7.0+,重置连接状态

非法命令示例

# 在订阅模式中执行 GET
*2\r\n$3\r\nGET\r\n$3\r\nkey\r\n
-ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT / RESET are allowed in this context\r\n

6.4 PUBLISH 命令

命令格式

PUBLISH channel message

响应

PUBLISH 返回接收到消息的订阅者数量(整数):

→ PUBLISH news "hello"
← :3          ← 3 个订阅者收到了消息

PUBLISH 的协议细节

import socket

s = socket.create_connection(("127.0.0.1", 6379))

# PUBLISH 本身是一个普通命令
cmd = "*3\r\n$7\r\nPUBLISH\r\n$4\r\nnews\r\n$5\r\nhello\r\n"
s.sendall(cmd.encode())
resp = s.recv(1024)
print(resp)  # b':0\r\n'(没有订阅者时返回 0)

6.5 消息接收格式

当频道有新消息时,服务器向所有订阅者推送:

精确订阅的消息格式

*3\r\n
$7\r\n
message\r\n
$7\r\n
channel\r\n
$5\r\n
hello\r\n

结构化表示:

[
  "message",      ← 固定标识
  "channel",      ← 频道名
  "hello"         ← 消息内容
]

消息内容的二进制安全

消息内容使用 Bulk String 编码,支持任意二进制数据:

*3\r\n
$7\r\n
message\r\n
$4\r\n
logs\r\n
$11\r\n
\x00\x01\x02hello\r\n

6.6 PSUBSCRIBE 模式订阅

命令格式

PSUBSCRIBE pattern [pattern ...]

模式语法

模式匹配示例
*所有频道PSUBSCRIBE *
news.*news. 开头的频道匹配 news.sportnews.tech
news.[ie]*news.i 或 news.e 开头匹配 news.intlnews.eu
\\*字面量 *匹配名为 * 的频道

响应格式

→ PSUBSCRIBE news.*
← *3
← $10
← psubscribe
← $7
← news.*
← :1

模式消息格式

通过模式订阅收到的消息格式略有不同(4 元素数组):

*4\r\n
$8\r\n
pmessage\r\n
$7\r\n
news.*\r\n       ← 匹配的模式
$9\r\n
news.sport\r\n   ← 实际频道名
$5\r\n
match!\r\n       ← 消息内容

结构化表示:

[
  "pmessage",     ← 固定标识(注意是 pmessage 不是 message)
  "news.*",       ← 匹配的模式
  "news.sport",   ← 实际频道名
  "match!"        ← 消息内容
]

精确 vs 模式订阅对比

特性SUBSCRIBEPSUBSCRIBE
匹配方式精确匹配glob 模式匹配
消息格式3 元素数组4 元素数组
消息类型messagepmessage
性能更高较低(需要模式匹配)
多模式匹配N/A一个频道可能匹配多个模式,收到多条消息

重复消息问题

如果客户端同时通过 SUBSCRIBEPSUBSCRIBE 订阅了同一个频道,或者一个频道匹配了多个模式,可能会收到重复消息:

SUBSCRIBE news
PSUBSCRIBE news
PSUBSCRIBE n*

# 发布 PUBLISH news "hello"
# 客户端收到 3 条消息:
# 1. [message, news, hello]       ← 来自 SUBSCRIBE
# 2. [pmessage, news, news, hello] ← 来自 PSUBSCRIBE news
# 3. [pmessage, n*, news, hello]   ← 来自 PSUBSCRIBE n*

客户端需要自行去重或避免重叠订阅。


6.7 UNSUBSCRIBE 与 PUNSUBSCRIBE

UNSUBSCRIBE

# 退订特定频道
→ UNSUBSCRIBE news
← *3
← $11
← unsubscribe
← $4
← news
← :0          ← 剩余订阅数

# 退订所有频道(无参数)
→ UNSUBSCRIBE
← *3
← $11
← unsubscribe
← $-1         ← NULL(频道名)
← :0

PUNSUBSCRIBE

# 退订特定模式
→ PUNSUBSCRIBE news.*
← *3
← $12
← punsubscribe
← $7
← news.*
← :0

6.8 Pub/Sub 与连接池

Pub/Sub 对连接池有特殊影响:

问题

大多数连接池假设连接是"通用"的——任何命令都可以在任何连接上执行。但 Pub/Sub 连接处于特殊模式,只能执行有限的命令。

解决方案

import redis

class PubSubManager:
    def __init__(self, host="127.0.0.1", port=6379):
        # 独立的连接,不使用连接池
        self.redis = redis.Redis(host=host, port=port)
        self.pubsub = self.redis.pubsub()

    def subscribe(self, channel, callback):
        """订阅频道并注册回调"""
        self.pubsub.subscribe(**{channel: callback})
        # 启动独立线程监听消息
        thread = self.pubsub.run_in_thread(sleep_time=0.001)
        return thread

    def close(self):
        self.pubsub.close()
        self.redis.close()

# 使用
manager = PubSubManager()
def handle_message(message):
    if message["type"] == "message":
        print(f"Received: {message['data']}")

thread = manager.subscribe("news", handle_message)

6.9 完整 Pub/Sub 客户端实现

import socket
import threading
from typing import Callable, Dict

class SimplePubSub:
    def __init__(self, host="127.0.0.1", port=6379):
        self.sock = socket.create_connection((host, port))
        self.buffer = b""
        self.handlers: Dict[str, Callable] = {}
        self.pattern_handlers: Dict[str, Callable] = {}
        self.running = False
        self._lock = threading.Lock()

    def _read_line(self) -> bytes:
        while b"\r\n" not in self.buffer:
            chunk = self.sock.recv(4096)
            if not chunk:
                raise ConnectionError("Connection closed")
            self.buffer += chunk
        pos = self.buffer.index(b"\r\n")
        line = self.buffer[:pos]
        self.buffer = self.buffer[pos + 2:]
        return line

    def _read_bulk_string(self) -> bytes | None:
        line = self._read_line()
        length = int(line[1:])
        if length == -1:
            return None
        while len(self.buffer) < length + 2:
            chunk = self.sock.recv(4096)
            if not chunk:
                raise ConnectionError("Connection closed")
            self.buffer += chunk
        data = self.buffer[:length]
        self.buffer = self.buffer[length + 2:]
        return data

    def _parse_response(self):
        line = self._read_line()
        prefix = chr(line[0])

        if prefix == ":":
            return int(line[1:])
        elif prefix == "$":
            length = int(line[1:])
            if length == -1:
                return None
            while len(self.buffer) < length + 2:
                chunk = self.sock.recv(4096)
                self.buffer += chunk
            data = self.buffer[:length]
            self.buffer = self.buffer[length + 2:]
            return data
        elif prefix == "*":
            count = int(line[1:])
            return [self._parse_response() for _ in range(count)]
        else:
            return line

    def subscribe(self, *channels: str):
        """订阅频道"""
        args = ["SUBSCRIBE"] + list(channels)
        parts = [f"*{len(args)}\r\n".encode()]
        for arg in args:
            arg_bytes = arg.encode()
            parts.append(f"${len(arg_bytes)}\r\n".encode())
            parts.append(arg_bytes)
            parts.append(b"\r\n")
        self.sock.sendall(b"".join(parts))

    def on(self, channel: str, callback: Callable):
        """注册消息处理器"""
        self.handlers[channel] = callback

    def listen(self):
        """监听消息(阻塞)"""
        self.running = True
        while self.running:
            msg = self._parse_response()
            if isinstance(msg, list) and len(msg) >= 3:
                msg_type = msg[0].decode() if isinstance(msg[0], bytes) else msg[0]

                if msg_type == "message":
                    channel = msg[1].decode()
                    data = msg[2].decode() if isinstance(msg[2], bytes) else msg[2]
                    if channel in self.handlers:
                        self.handlers[channel](channel, data)

                elif msg_type == "pmessage":
                    pattern = msg[1].decode()
                    channel = msg[2].decode()
                    data = msg[3].decode() if isinstance(msg[3], bytes) else msg[3]
                    if pattern in self.pattern_handlers:
                        self.pattern_handlers[pattern](pattern, channel, data)

    def close(self):
        self.running = False
        self.sock.close()


# 使用示例
def on_news(channel, message):
    print(f"[{channel}] {message}")

ps = SimplePubSub()
ps.on("news", on_news)
ps.subscribe("news", "alerts")
ps.listen()

6.10 Pub/Sub 的局限性

局限说明替代方案
消息不持久化订阅者离线时消息丢失Redis Streams
无消费者组不能负载均衡消费Redis Streams + XREADGROUP
无确认机制不知道消息是否被处理Redis Streams + XACK
消息堆积发布速度 > 消费速度时,输出缓冲区可能溢出Redis Streams
跨节点集群中 PUBLISH 会广播到所有节点限制使用场景

Redis Streams vs Pub/Sub

特性Pub/SubStreams
消息持久化
消费者组
消息确认
历史回放
性能更高略低
适用场景实时通知、事件广播任务队列、事件溯源

6.11 注意事项

⚠️ 输出缓冲区 订阅连接的输出缓冲区有特殊限制:client-output-buffer-limit pubsub 32mb 8mb 60。如果消费速度跟不上发布速度,连接会被断开。

⚠️ 集群中的 Pub/Sub 在 Redis 集群中,PUBLISH 命令会广播到所有节点。这意味着订阅者可以在任意节点上接收消息,但会增加集群的网络开销。

⚠️ 连接断开后重连 Pub/Sub 连接断开后,订阅关系会丢失。客户端需要实现重连和重新订阅逻辑。

def reconnect_and_resubscribe(pubsub, channels):
    """重连并重新订阅"""
    while True:
        try:
            pubsub.subscribe(*channels)
            pubsub.listen()
        except ConnectionError:
            time.sleep(1)
            # 重新连接并订阅
            pubsub.reconnect()

6.12 扩展阅读

资源说明
Redis Pub/Sub 文档官方文档
Redis Streams 文档Pub/Sub 的替代方案
SSUBSCRIBERedis 7.0+ Shard Pub/Sub

上一章:Pipeline 管道机制 | 下一章:事务协议