强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

第 11 章:RabbitMQ Streams

第 11 章:RabbitMQ Streams

RabbitMQ Streams 是 3.9 引入的新型队列类型,基于追加日志(Append-only Log)结构,提供类 Kafka 的流式消费能力。


11.1 Streams 概述

与传统队列的区别

特性Classic/Quorum QueueStream Queue
消息模型消费即删除追加日志,永久保留
消费模式Push(推)Pull(拉)
消息回溯❌ 不支持✅ 支持任意 Offset
消息重复消费❌ 不支持✅ 支持
顺序保证单队列有序单 Stream 有序
存储结构Erlang Term二进制追加日志
消费者组支持(类似 Kafka Consumer Group)
吞吐量万级/秒百万级/秒

Streams 架构

                    ┌─────────────────────────┐
                    │     Stream Queue         │
                    │                          │
 Producer ──>       │ [msg1][msg2][msg3]...    │
                    │  ▲                   ▲   │
                    │  │                   │   │
                    │  offset=0        offset=100
                    │                          │
                    └─────────────────────────┘
                       │                │
                    Consumer A       Consumer B
                    (offset=50)      (offset=100)

11.2 创建 Stream Queue

Python 客户端

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明 Stream Queue
channel.queue_declare(
    queue='events_stream',
    durable=True,
    arguments={
        'x-queue-type': 'stream',
        'x-stream-max-segment-size-bytes': 50000000,   # 段大小 50MB
        'x-stream-max-length-bytes': 5000000000,       # 最大 5GB
        'x-max-age': '7D',                              # 保留 7 天
        'x-queue-leader-locator': 'balanced'            # Leader 分配策略
    }
)

# 发布消息到 Stream
for i in range(1000):
    channel.basic_publish(
        exchange='',
        routing_key='events_stream',
        body=f'Event {i}'.encode(),
        properties=pika.BasicProperties(delivery_mode=2)
    )

print("[x] 1000 条事件已发布")
connection.close()

管理命令

# 查看 Stream 状态
rabbitmq-streams stream_status events_stream

# 查看所有 Stream
rabbitmq-queues list_queues name type messages | grep stream

# 删除 Stream
rabbitmq-queues delete_queue events_stream

11.3 Stream 消费者

使用 Stream 协议消费(Java)

import com.rabbitmq.stream.*;

public class StreamConsumer {
    public static void main(String[] args) {
        Environment environment = Environment.builder()
            .host("localhost")
            .port(5552)
            .username("admin")
            .password("admin123")
            .build();

        // 从头开始消费
        environment.consumerBuilder()
            .queue("events_stream")
            .offset(OffsetSpecification.first())  // 从头开始
            // .offset(OffsetSpecification.last()) // 从最新开始
            // .offset(OffsetSpecification.offset(100)) // 从指定 offset 开始
            .messageHandler((context, message) -> {
                String body = message.getBody().toString();
                System.out.println("[x] offset=" + context.offset() + " body=" + body);
                context.storeOffset();  // 存储消费位移
            })
            .build();

        System.out.println("[*] 等待 Stream 消息...");
    }
}

使用 AMQP 协议消费(兼容模式)

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 设置 QoS(对 Stream 队列必须设置)
channel.basic_qos(prefetch_count=10)

def callback(ch, method, properties, body):
    print(f"[x] 收到: {body.decode()}")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='events_stream', on_message_callback=callback)
channel.start_consuming()

消费者组

// 多个消费者使用相同的 stream name(消费者组)
environment.consumerBuilder()
    .queue("events_stream")
    .name("my-consumer-group")          // 消费者组名
    .autoCommitStrategy()               // 自动提交 offset
        .messageCountBeforeCommit(100)  // 每 100 条提交一次
        .build()
    .messageHandler((context, message) -> {
        // 处理消息
    })
    .build();

11.4 Stream 内部结构

分段存储

Stream Queue: events_stream
│
├── Segment 0 (0-999)        [追加写入]
├── Segment 1 (1000-1999)    [追加写入]
├── Segment 2 (2000-2999)    [追加写入]
└── ...

每个 Segment 是一个独立文件
x-stream-max-segment-size-bytes 控制段大小

消息格式

┌────────────────────────────────────┐
│ Stream Entry                       │
├────────────────────────────────────┤
│ Timestamp      (8 bytes)           │
│ Offset         (8 bytes)           │
│ Message ID     (8 bytes)           │
│ Header Size    (4 bytes)           │
│ Header Data    (variable)          │
│ Body Size      (4 bytes)           │
│ Body Data      (variable)          │
└────────────────────────────────────┘

11.5 Streams vs Kafka

维度RabbitMQ StreamsApache Kafka
独立部署嵌入 RabbitMQ独立集群
协议AMQP / Stream 专用协议Kafka 自定义协议
分区单 Stream(无分区概念)Topic 多 Partition
吞吐量百万级/秒百万级/秒
延迟微秒级毫秒级
消费者组支持支持
消息保留可配置时间/大小可配置时间/大小
回溯消费
事务✅(有限)
生态较小丰富(Kafka Connect/Streams/ksqlDB)
运维复杂度低(复用 RabbitMQ)高(独立组件 + ZooKeeper/KRaft)

何时选择 Streams

场景推荐
已有 RabbitMQ,需要流式消费RabbitMQ Streams
独立的大数据流处理平台Kafka
需要跨 Stream 聚合Kafka
需要 ksqlDB 流处理Kafka
需要轻量级追加日志RabbitMQ Streams
需要兼容现有 AMQP 消费者RabbitMQ Streams

11.6 Stream 高级配置

保留策略

channel.queue_declare(
    queue='bounded_stream',
    durable=True,
    arguments={
        'x-queue-type': 'stream',
        'x-max-age': '24H',                              # 保留 24 小时
        'x-stream-max-segment-size-bytes': 100000000,     # 段大小 100MB
        'x-stream-max-length-bytes': 10000000000          # 最大 10GB
    }
)

Leader 选举策略

策略说明
balanced自动分配到负载最低的节点
client-local优先分配到声明队列的节点

Stream 副本

# Stream 默认复制到所有集群节点
# 可通过策略控制副本数
rabbitmqctl set_policy stream-replicas "^stream\." \
  '{"queue-type":"stream"}' --apply-to queues

11.7 性能调优

发布优化

Environment environment = Environment.builder()
    .host("localhost")
    .port(5552)
    .maxUnconfirmedMessages(10000)  // 最大未确认消息数
    .build();

Producer producer = environment.producerBuilder()
    .queue("events_stream")
    .batchSize(100)                 // 批量大小
    .build();

// 批量发布
for (int i = 0; i < 100000; i++) {
    Message message = environment.messageBuilder()
        .addData(("Event " + i).getBytes())
        .properties()
        .messageId(i)
        .build();
    producer.send(message, confirmationStatus -> {
        // 确认回调
    });
}

消费优化

environment.consumerBuilder()
    .queue("events_stream")
    .autoCommitStrategy()
        .messageCountBeforeCommit(1000)  // 每 1000 条自动提交
        .flushInterval(Duration.ofSeconds(5))
        .build()
    .maxReadyMessages(5000)             // 预取消息数
    .messageHandler((context, message) -> {
        // 批量处理逻辑
    })
    .build();

11.8 适用场景

📌 业务场景:

场景说明使用方式
事件溯源完整的事件历史追加写入 + 任意 Offset 回溯
审计日志不可变的操作记录永久保留 + 全量消费
实时指标聚合实时统计指标最新消息消费 + 批量聚合
数据同步数据库 CDC增量消费 + 去重
多消费者独立消费同一事件被多个服务消费独立 Offset

11.9 注意事项

⚠️ Stream 不支持消息确认

Stream 中的消息不会被删除(除非保留策略到期),消费者通过 Offset 跟踪消费进度。

⚠️ Stream 不支持优先级

追加日志结构不支持消息重排。

⚠️ Stream 协议端口

Stream 专用协议使用端口 5552,需要额外开放。

⚠️ Stream 不支持 dead-lettering

消息不会因消费失败而转移到死信队列。

🔥 最佳实践: 如果只需要消息队列功能,使用 Quorum Queue;如果需要事件回溯/多消费者独立消费,使用 Stream。


11.10 扩展阅读


下一章: 第 12 章:监控与告警 — 建立完善的 RabbitMQ 监控体系。