强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

第 5 章:队列详解

第 5 章:队列详解

队列是消息的最终归宿,也是消费者获取消息的来源。本章将全面解析队列的类型、特性和高级用法。


5.1 队列类型总览

类型说明推荐版本
Classic Queue经典队列,基础队列类型3.x / 4.x
Quorum Queue仲裁队列,基于 Raft 共识3.8+(推荐)
Stream流队列,类 Kafka 日志3.9+
Priority Queue优先级队列经典队列扩展
Lazy Queue惰性队列,消息优先写磁盘3.6+(3.12 后为默认行为)
Exclusive Queue排他队列,仅限当前连接所有版本
Auto-delete Queue自动删除队列所有版本

5.2 Classic Queue(经典队列)

经典队列是 RabbitMQ 最基础的队列类型,使用 Erlang Mnesia 数据库存储元数据。

声明经典队列

channel.queue_declare(
    queue='classic_queue',
    durable=True,        # 持久化队列
    exclusive=False,     # 非排他
    auto_delete=False,   # 不自动删除
    arguments={
        'x-queue-type': 'classic'  # 显式指定类型
    }
)

队列属性详解

属性类型说明
durablebool队列元数据持久化到磁盘
exclusivebool仅限声明连接使用
auto_deletebool最后消费者断开后自动删除
x-queue-typestring队列类型标识
x-max-lengthint队列最大消息数
x-max-length-bytesint队列最大字节数
x-overflowstring溢出策略:drop-head / reject-publish
x-message-ttlint消息默认 TTL(毫秒)
x-expiresint队列无消费者后的自动删除时间(毫秒)

5.3 持久化(Durability)

持久化是保证消息不丢失的核心机制,需要三个层面同时设置。

持久化三要素

消息不丢失 = Exchange 持久化 + Queue 持久化 + 消息持久化
层面设置方式说明
Exchangedurable=True交换机元数据写入磁盘
Queuedurable=True队列元数据和索引写入磁盘
Messagedelivery_mode=2消息体写入磁盘

完整持久化示例

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 1. 持久化交换机
channel.exchange_declare(
    exchange='persistent_exchange',
    exchange_type='direct',
    durable=True  # 持久化
)

# 2. 持久化队列
channel.queue_declare(
    queue='persistent_queue',
    durable=True  # 持久化
)

channel.queue_bind(
    exchange='persistent_exchange',
    queue='persistent_queue',
    routing_key='task'
)

# 3. 持久化消息
channel.basic_publish(
    exchange='persistent_exchange',
    routing_key='task',
    body='Important data',
    properties=pika.BasicProperties(
        delivery_mode=2  # 持久化消息
    )
)

持久化的性能影响

操作性能影响说明
持久化队列声明轻微仅一次性的元数据写入
持久化消息发布中等每条消息写入磁盘
消费确认轻微更新消息状态
批量持久化较低RabbitMQ 会批量 fsync

💡 提示: RabbitMQ 不会在每条消息后立即 fsync,而是采用批量写入策略,因此持久化的性能影响比预期小。


5.4 排他队列(Exclusive Queue)

排他队列仅限声明它的连接使用,连接断开后自动删除。

特性

特性说明
绑定连接仅创建它的 Connection 可以消费
自动删除Connection 关闭后队列自动删除
不可被其他连接访问其他连接无法声明同名排他队列
不支持持久化durable=True 对排他队列无效

使用场景

  • 临时回调队列(RPC 模式)
  • Fanout 消费者的临时订阅队列
  • 请求-响应模式中的应答队列

RPC 回调队列示例

# 客户端
result = channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queue

# 发送 RPC 请求
channel.basic_publish(
    exchange='',
    routing_key='rpc_queue',
    properties=pika.BasicProperties(
        reply_to=callback_queue,        # 告知服务端回复到此队列
        correlation_id=str(uuid.uuid4()) # 关联 ID
    ),
    body=json.dumps({'method': 'calculate', 'params': [1, 2, 3]})
)

# 等待回复
channel.basic_consume(queue=callback_queue, on_message_callback=on_response)

5.5 自动删除队列(Auto-delete Queue)

最后一个消费者取消订阅或断开连接后,队列自动删除。

channel.queue_declare(
    queue='temp_notification',
    auto_delete=True
)

⚠️ 注意: 队列在最后一个消费者断开后才会删除,而非在声明时。如果没有消费者,队列会一直存在。


5.6 死信队列(Dead Letter Queue)

死信队列用于收集无法正常消费的消息。

配置模式

# 步骤 1: 创建死信交换机和死信队列
channel.exchange_declare(exchange='dlx', exchange_type='direct', durable=True)
channel.queue_declare(queue='dead_letters', durable=True)
channel.queue_bind(exchange='dlx', queue='dead_letters', routing_key='dead')

# 步骤 2: 业务队列绑定死信交换机
channel.queue_declare(
    queue='order_queue',
    durable=True,
    arguments={
        'x-dead-letter-exchange': 'dlx',
        'x-dead-letter-routing-key': 'dead',
        'x-message-ttl': 60000,           # 消息 TTL 60 秒
        'x-max-length': 5000,             # 最大 5000 条
        'x-overflow': 'reject-publish'    # 超出时拒绝发布
    }
)

死信处理架构

Producer --> Business Queue --> Consumer (成功: ACK)
                 │
                 ├── TTL 过期
                 ├── 消费者 NACK (requeue=false)
                 ├── 队列已满
                 │
                 v
           Dead Letter Exchange --> Dead Letter Queue --> DLQ Consumer
                                                            │
                                                        ┌───┴───┐
                                                        │       │
                                                     重试     告警
                                                   (有限次)

基于死信的消息重试机制

import pika
import json

MAX_RETRIES = 3

def process_message(ch, method, properties, body):
    message = json.loads(body)
    retry_count = properties.headers.get('x-retry-count', 0) if properties.headers else 0

    try:
        # 业务处理
        handle_business(message)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        if retry_count < MAX_RETRIES:
            # 重新发布,增加重试计数
            headers = dict(properties.headers or {})
            headers['x-retry-count'] = retry_count + 1
            ch.basic_publish(
                exchange='retry_exchange',
                routing_key='retry',
                body=body,
                properties=pika.BasicProperties(
                    delivery_mode=2,
                    headers=headers
                )
            )
            ch.basic_ack(delivery_tag=method.delivery_tag)
        else:
            # 超过重试次数,拒绝并进入死信队列
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

5.7 优先级队列(Priority Queue)

优先级队列允许高优先级消息优先被消费。

声明优先级队列

channel.queue_declare(
    queue='priority_queue',
    durable=True,
    arguments={
        'x-max-priority': 10  # 最大优先级(0-255,推荐 1-10)
    }
)

发送优先级消息

# 高优先级消息
channel.basic_publish(
    exchange='task_exchange',
    routing_key='task',
    body=json.dumps({'task': 'urgent_refund', 'priority': 'high'}),
    properties=pika.BasicProperties(
        delivery_mode=2,
        priority=10  # 高优先级
    )
)

# 低优先级消息
channel.basic_publish(
    exchange='task_exchange',
    routing_key='task',
    body=json.dumps({'task': 'generate_report', 'priority': 'low'}),
    properties=pika.BasicProperties(
        delivery_mode=2,
        priority=1  # 低优先级
    )
)

优先级队列注意事项

注意点说明
优先级范围建议使用 1-10,值越大优先级越高
存储开销每个优先级维护一个子队列,增加内存消耗
不保证严格顺序仅保证高优先级消息先于低优先级被消费
仅 Classic Queue 原生支持仲裁队列不支持 x-max-priority
默认优先级未设置 priority 的消息默认优先级为 0

📌 业务场景: 退款请求(高优先级)优先于报表生成(低优先级)处理。


5.8 惰性队列(Lazy Queue)

惰性队列将消息尽可能存储到磁盘,减少内存使用。

行为对比

行为Classic Queue(默认)Lazy Queue
消息存储位置优先内存,超出后写磁盘优先写磁盘
内存使用较高较低
消费延迟较低略高(需从磁盘读取)
适用场景消息量小、消费快消息堆积多

配置方式

# 方式 1: 队列声明时设置
channel.queue_declare(
    queue='lazy_queue',
    durable=True,
    arguments={'x-queue-mode': 'lazy'}
)

# 方式 2: 通过策略设置(推荐)
# rabbitmqctl set_policy lazy-queues "^lazy\." '{"queue-mode":"lazy"}' --apply-to queues

💡 提示: RabbitMQ 3.12+ 中,当消息内存使用超过阈值时,经典队列会自动将消息页出到磁盘,行为类似惰性队列。


5.9 Quorum Queue(仲裁队列)

仲裁队列是 RabbitMQ 3.8 引入的高可用队列类型,基于 Raft 共识算法 实现数据复制。

核心特性

特性说明
数据复制消息复制到集群中多数节点
Raft 共识使用 Raft 协议保证一致性
Leader-Follower一个 Leader,多个 Follower
自动 Leader 选举Leader 故障后自动选举新 Leader
消息确认多数节点确认后才算写入成功
不支持排他不支持 exclusive 模式
不支持优先级不支持 x-max-priority(4.0+ 已支持)

声明仲裁队列

channel.queue_declare(
    queue='orders',
    durable=True,
    arguments={
        'x-queue-type': 'quorum',
        'x-quorum-initial-group-size': 3,   # 初始组大小(可选)
        'x-delivery-limit': 5,              # 消息最大投递次数
        'x-dead-letter-exchange': 'dlx',
        'x-dead-letter-routing-key': 'dead',
        'x-message-ttl': 300000,
        'x-max-length': 100000,
        'x-max-length-bytes': 1073741824    # 1GB
    }
)

仲裁队列 vs 镜像队列

特性镜像队列 (3.x)仲裁队列
一致性协议基于 GM(自研)Raft
数据一致性最终一致强一致
消息确认Leader 确认即可多数确认
故障恢复可能丢消息不丢消息
性能较高略低(多确认开销)
消息回溯不支持不支持
适用版本3.x(4.0 已移除)3.8+(推荐)

仲裁队列架构

        ┌─────────┐
        │  Leader  │  (node1)
        │  orders  │
        └────┬────┘
             │ Raft 复制
    ┌────────┼────────┐
    v                  v
┌─────────┐      ┌─────────┐
│Follower │      │Follower │
│ orders  │      │ orders  │
│ (node2) │      │ (node3) │
└─────────┘      └─────────┘

写入: Client → Leader → 多数确认 → 返回成功
读取: Client → Leader → 返回消息

仲裁队列管理

# 查看仲裁队列状态
rabbitmq-queues quorum_status "orders"

# 查看所有仲裁队列成员
rabbitmq-queues members "orders"

# 添加成员(扩展集群时)
rabbitmq-queues add_member orders rabbit@node4

# 移除成员
rabbitmq-queues remove_member orders rabbit@node3

# 检查集群健康
rabbitmq-diagnostics check_if_node_is_quorum_critical

5.10 消息 TTL(Time-To-Live)

设置方式

方式作用范围设置位置
队列 x-message-ttl队列中所有消息队列声明的 arguments
消息 expiration单条消息发布时的 properties

队列级别 TTL

channel.queue_declare(
    queue='ttl_queue',
    arguments={'x-message-ttl': 60000}  # 所有消息 60 秒后过期
)

消息级别 TTL

channel.basic_publish(
    exchange='',
    routing_key='ttl_queue',
    body='short-lived message',
    properties=pika.BasicProperties(
        expiration='10000'  # 这条消息 10 秒后过期(字符串,毫秒)
    )
)

⚠️ 注意: 消息级别 TTL 不会从队列级别 TTL 继承。如果两者都设置了,取较小值。


5.11 队列溢出策略

当队列达到 x-max-lengthx-max-length-bytes 限制时:

策略行为
drop-head(默认)丢弃队列头部(最早)的消息
reject-publish拒绝新消息的发布(生产者收到 nack)
reject-publish-dlx拒绝新消息并发送到死信交换机
channel.queue_declare(
    queue='bounded_queue',
    arguments={
        'x-max-length': 10000,
        'x-max-length-bytes': 104857600,  # 100MB
        'x-overflow': 'reject-publish'
    }
)

5.12 队列选择指南

需要高可用?
├── 是 --> 仲裁队列 (Quorum Queue)
│          ├── 需要流式消费? --> Stream Queue
│          └── 不需要流式消费? --> Quorum Queue
└── 否 --> 单节点
            ├── 需要优先级? --> Classic Queue + x-max-priority
            ├── 消息量大/堆积多? --> Lazy Queue
            └── 临时队列? --> Exclusive / Auto-delete Queue

5.13 注意事项

⚠️ 持久化队列 + 非持久化消息 = 消息可能丢失

即使队列持久化,如果消息未设置 delivery_mode=2,重启后消息丢失。

⚠️ 排他队列不能持久化

排他队列隐含 durable=False,设置 durable=True 会被忽略。

⚠️ 仲裁队列最少 3 节点

生产环境建议至少 3 个节点,确保 Leader 故障时仍有多数派可用。

⚠️ 仲裁队列不支持 x-max-priority

如果需要优先级功能,需使用经典队列或等 4.x 版本支持。

⚠️ 队列名冲突

同名队列不能以不同类型重新声明。如果队列已存在,声明参数必须完全一致,否则抛出 PRECONDITION_FAILED 错误。

🔥 最佳实践: 新项目一律使用仲裁队列,仅在需要特殊功能(优先级等)时才使用经典队列。


5.14 扩展阅读


下一章: 第 6 章:生产者开发 — 掌握生产者的消息发布方式、确认机制和最佳实践。