强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

RabbitMQ 消息队列完全教程 / 第 11 章:RabbitMQ Streams

第 11 章:RabbitMQ Streams

RabbitMQ Streams 是 3.9 引入的新型队列类型,基于追加日志(Append-only Log)结构,提供类 Kafka 的流式消费能力。


11.1 Streams 概述

与传统队列的区别

特性 Classic/Quorum Queue Stream Queue
消息模型 消费即删除 追加日志,永久保留
消费模式 Push(推) Pull(拉)
消息回溯 ❌ 不支持 ✅ 支持任意 Offset
消息重复消费 ❌ 不支持 ✅ 支持
顺序保证 单队列有序 单 Stream 有序
存储结构 Erlang Term 二进制追加日志
消费者组 支持(类似 Kafka Consumer Group)
吞吐量 万级/秒 百万级/秒

Streams 架构

                    ┌─────────────────────────┐
                    │     Stream Queue         │
                    │                          │
 Producer ──>       │ [msg1][msg2][msg3]...    │
                    │  ▲                   ▲   │
                    │  │                   │   │
                    │  offset=0        offset=100
                    │                          │
                    └─────────────────────────┘
                       │                │
                    Consumer A       Consumer B
                    (offset=50)      (offset=100)

11.2 创建 Stream Queue

Python 客户端

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明 Stream Queue
channel.queue_declare(
    queue='events_stream',
    durable=True,
    arguments={
        'x-queue-type': 'stream',
        'x-stream-max-segment-size-bytes': 50000000,   # 段大小 50MB
        'x-stream-max-length-bytes': 5000000000,       # 最大 5GB
        'x-max-age': '7D',                              # 保留 7 天
        'x-queue-leader-locator': 'balanced'            # Leader 分配策略
    }
)

# 发布消息到 Stream
for i in range(1000):
    channel.basic_publish(
        exchange='',
        routing_key='events_stream',
        body=f'Event {i}'.encode(),
        properties=pika.BasicProperties(delivery_mode=2)
    )

print("[x] 1000 条事件已发布")
connection.close()

管理命令

# 查看 Stream 状态
rabbitmq-streams stream_status events_stream

# 查看所有 Stream
rabbitmq-queues list_queues name type messages | grep stream

# 删除 Stream
rabbitmq-queues delete_queue events_stream

11.3 Stream 消费者

使用 Stream 协议消费(Java)

import com.rabbitmq.stream.*;

public class StreamConsumer {
    public static void main(String[] args) {
        Environment environment = Environment.builder()
            .host("localhost")
            .port(5552)
            .username("admin")
            .password("admin123")
            .build();

        // 从头开始消费
        environment.consumerBuilder()
            .queue("events_stream")
            .offset(OffsetSpecification.first())  // 从头开始
            // .offset(OffsetSpecification.last()) // 从最新开始
            // .offset(OffsetSpecification.offset(100)) // 从指定 offset 开始
            .messageHandler((context, message) -> {
                String body = message.getBody().toString();
                System.out.println("[x] offset=" + context.offset() + " body=" + body);
                context.storeOffset();  // 存储消费位移
            })
            .build();

        System.out.println("[*] 等待 Stream 消息...");
    }
}

使用 AMQP 协议消费(兼容模式)

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 设置 QoS(对 Stream 队列必须设置)
channel.basic_qos(prefetch_count=10)

def callback(ch, method, properties, body):
    print(f"[x] 收到: {body.decode()}")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='events_stream', on_message_callback=callback)
channel.start_consuming()

消费者组

// 多个消费者使用相同的 stream name(消费者组)
environment.consumerBuilder()
    .queue("events_stream")
    .name("my-consumer-group")          // 消费者组名
    .autoCommitStrategy()               // 自动提交 offset
        .messageCountBeforeCommit(100)  // 每 100 条提交一次
        .build()
    .messageHandler((context, message) -> {
        // 处理消息
    })
    .build();

11.4 Stream 内部结构

分段存储

Stream Queue: events_stream
│
├── Segment 0 (0-999)        [追加写入]
├── Segment 1 (1000-1999)    [追加写入]
├── Segment 2 (2000-2999)    [追加写入]
└── ...

每个 Segment 是一个独立文件
x-stream-max-segment-size-bytes 控制段大小

消息格式

┌────────────────────────────────────┐
│ Stream Entry                       │
├────────────────────────────────────┤
│ Timestamp      (8 bytes)           │
│ Offset         (8 bytes)           │
│ Message ID     (8 bytes)           │
│ Header Size    (4 bytes)           │
│ Header Data    (variable)          │
│ Body Size      (4 bytes)           │
│ Body Data      (variable)          │
└────────────────────────────────────┘

11.5 Streams vs Kafka

维度 RabbitMQ Streams Apache Kafka
独立部署 嵌入 RabbitMQ 独立集群
协议 AMQP / Stream 专用协议 Kafka 自定义协议
分区 单 Stream(无分区概念) Topic 多 Partition
吞吐量 百万级/秒 百万级/秒
延迟 微秒级 毫秒级
消费者组 支持 支持
消息保留 可配置时间/大小 可配置时间/大小
回溯消费
事务 ✅(有限)
生态 较小 丰富(Kafka Connect/Streams/ksqlDB)
运维复杂度 低(复用 RabbitMQ) 高(独立组件 + ZooKeeper/KRaft)

何时选择 Streams

场景 推荐
已有 RabbitMQ,需要流式消费 RabbitMQ Streams
独立的大数据流处理平台 Kafka
需要跨 Stream 聚合 Kafka
需要 ksqlDB 流处理 Kafka
需要轻量级追加日志 RabbitMQ Streams
需要兼容现有 AMQP 消费者 RabbitMQ Streams

11.6 Stream 高级配置

保留策略

channel.queue_declare(
    queue='bounded_stream',
    durable=True,
    arguments={
        'x-queue-type': 'stream',
        'x-max-age': '24H',                              # 保留 24 小时
        'x-stream-max-segment-size-bytes': 100000000,     # 段大小 100MB
        'x-stream-max-length-bytes': 10000000000          # 最大 10GB
    }
)

Leader 选举策略

策略 说明
balanced 自动分配到负载最低的节点
client-local 优先分配到声明队列的节点

Stream 副本

# Stream 默认复制到所有集群节点
# 可通过策略控制副本数
rabbitmqctl set_policy stream-replicas "^stream\." \
  '{"queue-type":"stream"}' --apply-to queues

11.7 性能调优

发布优化

Environment environment = Environment.builder()
    .host("localhost")
    .port(5552)
    .maxUnconfirmedMessages(10000)  // 最大未确认消息数
    .build();

Producer producer = environment.producerBuilder()
    .queue("events_stream")
    .batchSize(100)                 // 批量大小
    .build();

// 批量发布
for (int i = 0; i < 100000; i++) {
    Message message = environment.messageBuilder()
        .addData(("Event " + i).getBytes())
        .properties()
        .messageId(i)
        .build();
    producer.send(message, confirmationStatus -> {
        // 确认回调
    });
}

消费优化

environment.consumerBuilder()
    .queue("events_stream")
    .autoCommitStrategy()
        .messageCountBeforeCommit(1000)  // 每 1000 条自动提交
        .flushInterval(Duration.ofSeconds(5))
        .build()
    .maxReadyMessages(5000)             // 预取消息数
    .messageHandler((context, message) -> {
        // 批量处理逻辑
    })
    .build();

11.8 适用场景

📌 业务场景:

场景 说明 使用方式
事件溯源 完整的事件历史 追加写入 + 任意 Offset 回溯
审计日志 不可变的操作记录 永久保留 + 全量消费
实时指标聚合 实时统计指标 最新消息消费 + 批量聚合
数据同步 数据库 CDC 增量消费 + 去重
多消费者独立消费 同一事件被多个服务消费 独立 Offset

11.9 注意事项

⚠️ Stream 不支持消息确认

Stream 中的消息不会被删除(除非保留策略到期),消费者通过 Offset 跟踪消费进度。

⚠️ Stream 不支持优先级

追加日志结构不支持消息重排。

⚠️ Stream 协议端口

Stream 专用协议使用端口 5552,需要额外开放。

⚠️ Stream 不支持 dead-lettering

消息不会因消费失败而转移到死信队列。

🔥 最佳实践: 如果只需要消息队列功能,使用 Quorum Queue;如果需要事件回溯/多消费者独立消费,使用 Stream。


11.10 扩展阅读


下一章: 第 12 章:监控与告警 — 建立完善的 RabbitMQ 监控体系。