第 11 章:RabbitMQ Streams
第 11 章:RabbitMQ Streams
RabbitMQ Streams 是 3.9 引入的新型队列类型,基于追加日志(Append-only Log)结构,提供类 Kafka 的流式消费能力。
11.1 Streams 概述
与传统队列的区别
| 特性 | Classic/Quorum Queue | Stream Queue |
|---|---|---|
| 消息模型 | 消费即删除 | 追加日志,永久保留 |
| 消费模式 | Push(推) | Pull(拉) |
| 消息回溯 | ❌ 不支持 | ✅ 支持任意 Offset |
| 消息重复消费 | ❌ 不支持 | ✅ 支持 |
| 顺序保证 | 单队列有序 | 单 Stream 有序 |
| 存储结构 | Erlang Term | 二进制追加日志 |
| 消费者组 | — | 支持(类似 Kafka Consumer Group) |
| 吞吐量 | 万级/秒 | 百万级/秒 |
Streams 架构
┌─────────────────────────┐
│ Stream Queue │
│ │
Producer ──> │ [msg1][msg2][msg3]... │
│ ▲ ▲ │
│ │ │ │
│ offset=0 offset=100
│ │
└─────────────────────────┘
│ │
Consumer A Consumer B
(offset=50) (offset=100)
11.2 创建 Stream Queue
Python 客户端
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明 Stream Queue
channel.queue_declare(
queue='events_stream',
durable=True,
arguments={
'x-queue-type': 'stream',
'x-stream-max-segment-size-bytes': 50000000, # 段大小 50MB
'x-stream-max-length-bytes': 5000000000, # 最大 5GB
'x-max-age': '7D', # 保留 7 天
'x-queue-leader-locator': 'balanced' # Leader 分配策略
}
)
# 发布消息到 Stream
for i in range(1000):
channel.basic_publish(
exchange='',
routing_key='events_stream',
body=f'Event {i}'.encode(),
properties=pika.BasicProperties(delivery_mode=2)
)
print("[x] 1000 条事件已发布")
connection.close()
管理命令
# 查看 Stream 状态
rabbitmq-streams stream_status events_stream
# 查看所有 Stream
rabbitmq-queues list_queues name type messages | grep stream
# 删除 Stream
rabbitmq-queues delete_queue events_stream
11.3 Stream 消费者
使用 Stream 协议消费(Java)
import com.rabbitmq.stream.*;
public class StreamConsumer {
public static void main(String[] args) {
Environment environment = Environment.builder()
.host("localhost")
.port(5552)
.username("admin")
.password("admin123")
.build();
// 从头开始消费
environment.consumerBuilder()
.queue("events_stream")
.offset(OffsetSpecification.first()) // 从头开始
// .offset(OffsetSpecification.last()) // 从最新开始
// .offset(OffsetSpecification.offset(100)) // 从指定 offset 开始
.messageHandler((context, message) -> {
String body = message.getBody().toString();
System.out.println("[x] offset=" + context.offset() + " body=" + body);
context.storeOffset(); // 存储消费位移
})
.build();
System.out.println("[*] 等待 Stream 消息...");
}
}
使用 AMQP 协议消费(兼容模式)
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 设置 QoS(对 Stream 队列必须设置)
channel.basic_qos(prefetch_count=10)
def callback(ch, method, properties, body):
print(f"[x] 收到: {body.decode()}")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='events_stream', on_message_callback=callback)
channel.start_consuming()
消费者组
// 多个消费者使用相同的 stream name(消费者组)
environment.consumerBuilder()
.queue("events_stream")
.name("my-consumer-group") // 消费者组名
.autoCommitStrategy() // 自动提交 offset
.messageCountBeforeCommit(100) // 每 100 条提交一次
.build()
.messageHandler((context, message) -> {
// 处理消息
})
.build();
11.4 Stream 内部结构
分段存储
Stream Queue: events_stream
│
├── Segment 0 (0-999) [追加写入]
├── Segment 1 (1000-1999) [追加写入]
├── Segment 2 (2000-2999) [追加写入]
└── ...
每个 Segment 是一个独立文件
x-stream-max-segment-size-bytes 控制段大小
消息格式
┌────────────────────────────────────┐
│ Stream Entry │
├────────────────────────────────────┤
│ Timestamp (8 bytes) │
│ Offset (8 bytes) │
│ Message ID (8 bytes) │
│ Header Size (4 bytes) │
│ Header Data (variable) │
│ Body Size (4 bytes) │
│ Body Data (variable) │
└────────────────────────────────────┘
11.5 Streams vs Kafka
| 维度 | RabbitMQ Streams | Apache Kafka |
|---|---|---|
| 独立部署 | 嵌入 RabbitMQ | 独立集群 |
| 协议 | AMQP / Stream 专用协议 | Kafka 自定义协议 |
| 分区 | 单 Stream(无分区概念) | Topic 多 Partition |
| 吞吐量 | 百万级/秒 | 百万级/秒 |
| 延迟 | 微秒级 | 毫秒级 |
| 消费者组 | 支持 | 支持 |
| 消息保留 | 可配置时间/大小 | 可配置时间/大小 |
| 回溯消费 | ✅ | ✅ |
| 事务 | ✅ | ✅(有限) |
| 生态 | 较小 | 丰富(Kafka Connect/Streams/ksqlDB) |
| 运维复杂度 | 低(复用 RabbitMQ) | 高(独立组件 + ZooKeeper/KRaft) |
何时选择 Streams
| 场景 | 推荐 |
|---|---|
| 已有 RabbitMQ,需要流式消费 | RabbitMQ Streams |
| 独立的大数据流处理平台 | Kafka |
| 需要跨 Stream 聚合 | Kafka |
| 需要 ksqlDB 流处理 | Kafka |
| 需要轻量级追加日志 | RabbitMQ Streams |
| 需要兼容现有 AMQP 消费者 | RabbitMQ Streams |
11.6 Stream 高级配置
保留策略
channel.queue_declare(
queue='bounded_stream',
durable=True,
arguments={
'x-queue-type': 'stream',
'x-max-age': '24H', # 保留 24 小时
'x-stream-max-segment-size-bytes': 100000000, # 段大小 100MB
'x-stream-max-length-bytes': 10000000000 # 最大 10GB
}
)
Leader 选举策略
| 策略 | 说明 |
|---|---|
balanced | 自动分配到负载最低的节点 |
client-local | 优先分配到声明队列的节点 |
Stream 副本
# Stream 默认复制到所有集群节点
# 可通过策略控制副本数
rabbitmqctl set_policy stream-replicas "^stream\." \
'{"queue-type":"stream"}' --apply-to queues
11.7 性能调优
发布优化
Environment environment = Environment.builder()
.host("localhost")
.port(5552)
.maxUnconfirmedMessages(10000) // 最大未确认消息数
.build();
Producer producer = environment.producerBuilder()
.queue("events_stream")
.batchSize(100) // 批量大小
.build();
// 批量发布
for (int i = 0; i < 100000; i++) {
Message message = environment.messageBuilder()
.addData(("Event " + i).getBytes())
.properties()
.messageId(i)
.build();
producer.send(message, confirmationStatus -> {
// 确认回调
});
}
消费优化
environment.consumerBuilder()
.queue("events_stream")
.autoCommitStrategy()
.messageCountBeforeCommit(1000) // 每 1000 条自动提交
.flushInterval(Duration.ofSeconds(5))
.build()
.maxReadyMessages(5000) // 预取消息数
.messageHandler((context, message) -> {
// 批量处理逻辑
})
.build();
11.8 适用场景
📌 业务场景:
| 场景 | 说明 | 使用方式 |
|---|---|---|
| 事件溯源 | 完整的事件历史 | 追加写入 + 任意 Offset 回溯 |
| 审计日志 | 不可变的操作记录 | 永久保留 + 全量消费 |
| 实时指标聚合 | 实时统计指标 | 最新消息消费 + 批量聚合 |
| 数据同步 | 数据库 CDC | 增量消费 + 去重 |
| 多消费者独立消费 | 同一事件被多个服务消费 | 独立 Offset |
11.9 注意事项
⚠️ Stream 不支持消息确认
Stream 中的消息不会被删除(除非保留策略到期),消费者通过 Offset 跟踪消费进度。
⚠️ Stream 不支持优先级
追加日志结构不支持消息重排。
⚠️ Stream 协议端口
Stream 专用协议使用端口 5552,需要额外开放。
⚠️ Stream 不支持 dead-lettering
消息不会因消费失败而转移到死信队列。
🔥 最佳实践: 如果只需要消息队列功能,使用 Quorum Queue;如果需要事件回溯/多消费者独立消费,使用 Stream。
11.10 扩展阅读
下一章: 第 12 章:监控与告警 — 建立完善的 RabbitMQ 监控体系。