强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

RabbitMQ 消息队列完全教程 / 第 10 章:插件生态

第 10 章:插件生态

RabbitMQ 的插件系统极大地扩展了其功能。本章将介绍最常用和最重要的插件及其配置方法。


10.1 插件管理基础

插件管理命令

# 列出所有插件
rabbitmq-plugins list

# 启用插件
rabbitmq-plugins enable <plugin_name>

# 禁用插件
rabbitmq-plugins disable <plugin_name>

# 启用所有插件(不推荐生产)
rabbitmq-plugins enable --all

# 禁用所有插件
rabbitmq-plugins disable --all

插件状态说明

状态符号说明
enabledE已启用,运行中
disabledD已禁用
running*正在运行
# 列出并过滤插件状态
rabbitmq-plugins list -e enabled    # 仅显示已启用
rabbitmq-plugins list -m available  # 仅显示可用

插件安装路径

系统路径
Linux (包管理)/usr/lib/rabbitmq/plugins/
Docker/opt/rabbitmq/plugins/
macOS (Homebrew)$(brew --prefix)/opt/rabbitmq/plugins/
# 手动安装第三方插件
cp rabbitmq_custom_plugin.ez /usr/lib/rabbitmq/plugins/
rabbitmq-plugins enable rabbitmq_custom_plugin

10.2 Management UI 插件

rabbitmq_management

rabbitmq-plugins enable rabbitmq_management

管理界面功能

功能路径说明
概览/系统总览
连接/#/connections所有客户端连接
通道/#/channels所有通道
交换机/#/exchanges交换机管理
队列/#/queues队列管理
管理/#/admin用户、策略、参数管理

HTTP API 使用

# 系统概览
curl -u admin:admin123 http://localhost:15672/api/overview

# 列出所有队列
curl -u admin:admin123 http://localhost:15672/api/queues

# 列出所有连接
curl -u admin:admin123 http://localhost:15672/api/connections

# 获取队列详情
curl -u admin:admin123 http://localhost:15672/api/queues/%2F/my_queue

# 发送测试消息
curl -u admin:admin123 -X POST http://localhost:15672/api/exchanges/%2F/amq.default/publish \
  -H "Content-Type: application/json" \
  -d '{
    "properties": {},
    "routing_key": "my_queue",
    "payload": "test message",
    "payload_encoding": "string"
  }'

# 消费消息
curl -u admin:admin123 -X POST http://localhost:15672/api/queues/%2F/my_queue/get \
  -H "Content-Type: application/json" \
  -d '{"count":1,"requeue":false,"encoding":"auto","ackmode":"ack_requeue_true"}'

rabbitmqadmin 命令行工具

# 下载
curl -o rabbitmqadmin http://localhost:15672/cli/rabbitmqadmin
chmod +x rabbitmqadmin

# 列出队列
./rabbitmqadmin list queues name messages consumers

# 发布消息
./rabbitmqadmin publish exchange=amq.default routing_key=test payload="hello"

# 获取消息
./rabbitmqadmin get queue=test ackmode=ack_requeue_true count=5

# 导出定义(配置备份)
./rabbitmqadmin export rabbitmq_definitions.json

# 导入定义
./rabbitmqadmin import rabbitmq_definitions.json

10.3 Delayed Message Exchange 插件

安装

# 下载与 RabbitMQ 版本匹配的插件
# https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez

# 复制到插件目录
cp rabbitmq_delayed_message_exchange-3.13.0.ez /usr/lib/rabbitmq/plugins/

# 启用
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

使用方式

import pika
import json

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 声明延迟交换机
channel.exchange_declare(
    exchange='delayed_orders',
    exchange_type='x-delayed-message',
    durable=True,
    arguments={'x-delayed-type': 'direct'}  # 内部路由类型
)

channel.queue_declare(queue='order_timeout', durable=True)
channel.queue_bind(exchange='delayed_orders', queue='order_timeout', routing_key='timeout')

# 发送延迟消息
for delay_ms, message in [
    (5000,   {'order': '001', 'action': 'remind_payment'}),      # 5秒
    (1800000, {'order': '002', 'action': 'auto_cancel'}),         # 30分钟
    (86400000, {'order': '003', 'action': 'auto_complete'}),      # 24小时
]:
    channel.basic_publish(
        exchange='delayed_orders',
        routing_key='timeout',
        body=json.dumps(message),
        properties=pika.BasicProperties(
            delivery_mode=2,
            headers={'x-delay': delay_ms}
        )
    )
    print(f"[x] 延迟 {delay_ms/1000}s: {message}")

connection.close()

修改延迟时间

# 发送新消息时,使用相同 message_id 覆盖旧消息
channel.basic_publish(
    exchange='delayed_orders',
    routing_key='timeout',
    body=json.dumps({'action': 'updated_timeout'}),
    properties=pika.BasicProperties(
        delivery_mode=2,
        message_id='order_001_timeout',  # 用于去重/覆盖
        headers={'x-delay': 60000}
    )
)

注意事项

⚠️ 延迟消息存储在 Mnesia/ETS 中,不适合大量延迟消息。
⚠️ 节点重启后未投递的延迟消息会丢失。
⚠️ 延迟时间精度为秒级,不保证精确到毫秒。


10.4 Shovel 插件

Shovel 用于在 RabbitMQ 实例之间单向转发消息。

架构

Source Broker ──Shovel──> Destination Broker
(Node A)                  (Node B)

启用 Shovel

rabbitmq-plugins enable rabbitmq_shovel
rabbitmq-plugins enable rabbitmq_shovel_management  # 管理界面(可选)

静态 Shovel 配置

# /etc/rabbitmq/rabbitmq.conf 中添加 advanced.config
%% advanced.config
[{rabbitmq_shovel,
  [{shovels,
    [{my_shovel,
      [{source,
        [{protocol, amqp091},
         {uris, ["amqp://admin:admin123@source-host:5672"]},
         {queue, <<"source_queue">>}]},
       {destination,
        [{protocol, amqp091},
         {uris, ["amqp://admin:admin123@dest-host:5672"]},
         {exchange, <<"dest_exchange">>},
         {publish_properties, [{delivery_mode, 2}]}}]},
       {ack_mode, on_confirm},
       {reconnect_delay, 5}
      ]}
    ]}
  ]}
].

动态 Shovel 配置

# 通过 rabbitmqctl 创建动态 Shovel
rabbitmqctl set_parameter shovel my_shovel \
  '{"src-protocol":"amqp091","src-uri":"amqp://admin:admin123@source:5672","src-queue":"source_queue","dest-protocol":"amqp091","dest-uri":"amqp://admin:admin123@dest:5672","dest-exchange":"dest_exchange","ack-mode":"on-confirm","reconnect-delay":5}'

# 查看 Shovel 状态
rabbitmqctl list_shovels

# 删除 Shovel
rabbitmqctl clear_parameter shovel my_shovel

动态 Shovel(Python)

import requests
import json

# 创建动态 Shovel
response = requests.put(
    'http://localhost:15672/api/parameters/shovel/%2F/my_shovel',
    auth=('admin', 'admin123'),
    json={
        'value': {
            'src-protocol': 'amqp091',
            'src-uri': 'amqp://admin:admin123@source:5672',
            'src-queue': 'source_queue',
            'dest-protocol': 'amqp091',
            'dest-uri': 'amqp://admin:admin123@dest:5672',
            'dest-exchange': 'dest_exchange',
            'ack-mode': 'on-confirm',
            'reconnect-delay': 5
        }
    }
)
print(response.status_code)

Shovel 适用场景

场景说明
跨数据中心消息同步从 A 机房转发到 B 机房
消息迁移迁移到新集群
消息桥接连接不同版本/配置的 RabbitMQ
灾难恢复备份消息到远程集群

10.5 Federation 插件

Federation 用于在 RabbitMQ 之间建立联邦交换机/队列,支持多向消息同步。

Federation vs Shovel

特性FederationShovel
方向多向(支持环形)单向
配置方式策略(Policy)参数(Parameter)
适用场景多数据中心点对点转发
交换机联邦
队列联邦
自动发现

启用 Federation

rabbitmq-plugins enable rabbitmq_federation
rabbitmq-plugins enable rabbitmq_federation_management  # 管理界面

配置 Federation Upstream

# 设置 upstream
rabbitmqctl set_parameter federation-upstream my_upstream \
  '{"uri":"amqp://admin:admin123@upstream-host:5672","prefetch-count":1000,"reconnect-delay":5,"ack-mode":"on-confirm"}'

# 设置策略(将特定交换机联邦到 upstream)
rabbitmqctl set_policy federate-exchanges "^federated\." \
  '{"federation-upstream-set":"my_upstream"}' \
  --apply-to exchanges

# 设置联邦队列策略
rabbitmqctl set_policy federate-queues "^federated\." \
  '{"federation-upstream-set":"my_upstream"}' \
  --apply-to queues

Federation 架构

Upstream Broker                    Downstream Broker
┌─────────────┐                   ┌─────────────┐
│ Exchange:   │   Federation      │ Exchange:   │
│ orders.ex   │ ──────────────>   │ orders.ex   │
│             │   (通过 AMQP)     │ (federated) │
└─────────────┘                   └─────────────┘
                                         │
                                    Binding
                                         │
                                    ┌────▼────┐
                                    │  Queue  │
                                    └─────────┘

多数据中心 Federation

# DC1 的 upstream 指向 DC2
rabbitmqctl set_parameter federation-upstream dc2_upstream \
  '{"uri":"amqp://admin:admin123@dc2-rabbit:5672"}'

# DC2 的 upstream 指向 DC1(形成环形)
rabbitmqctl set_parameter federation-upstream dc1_upstream \
  '{"uri":"amqp://admin:admin123@dc1-rabbit:5672"}'

10.6 Prometheus 监控插件

启用 Prometheus 插件

rabbitmq-plugins enable rabbitmq_prometheus

端口和端点

端点端口说明
/metrics15692Prometheus 格式指标
/metrics/per-object15692每个对象的详细指标
/metrics/detailed15692详细指标(高开销)

Prometheus 配置

# prometheus.yml
scrape_configs:
  - job_name: 'rabbitmq'
    scrape_interval: 15s
    static_configs:
      - targets: ['rabbitmq-host:15692']
    metrics_path: /metrics

关键指标

指标说明告警阈值
rabbitmq_queues队列数量> 10000
rabbitmq_connections连接数> 5000
rabbitmq_channels通道数> 10000
rabbitmq_queue_messages队列消息总数> 1000000
rabbitmq_queue_messages_ready就绪消息数持续增长
rabbitmq_queue_messages_unacked未确认消息数> prefetch * 消费者数
rabbitmq_process_resident_memory_bytes进程内存> 内存阈值 80%
rabbitmq_disk_space_available_bytes可用磁盘空间< 2GB
rabbitmq_channel_messages_published_total发布速率无固定阈值
rabbitmq_channel_messages_delivered_total投递速率无固定阈值

Grafana Dashboard

{
  "dashboard": {
    "title": "RabbitMQ Overview",
    "panels": [
      {
        "title": "Queue Depth",
        "targets": [{
          "expr": "rabbitmq_queue_messages",
          "legendFormat": "{{queue}}"
        }]
      },
      {
        "title": "Message Rate",
        "targets": [
          {"expr": "rate(rabbitmq_channel_messages_published_total[5m])", "legendFormat": "publish"},
          {"expr": "rate(rabbitmq_channel_messages_delivered_total[5m])", "legendFormat": "deliver"}
        ]
      },
      {
        "title": "Connections",
        "targets": [{"expr": "rabbitmq_connections"}]
      }
    ]
  }
}

10.7 其他重要插件

MQTT 插件

rabbitmq-plugins enable rabbitmq_mqtt
# 端口: 1883 (TCP), 8883 (TLS)

STOMP 插件

rabbitmq-plugins enable rabbitmq_stomp
# 端口: 61613 (TCP), 61614 (TLS)

Web STOMP

rabbitmq-plugins enable rabbitmq_web_stomp
# 提供 WebSocket 端点

Auth Backend HTTP

rabbitmq-plugins enable rabbitmq_auth_backend_http
# 使用 HTTP 服务进行认证
# 配置 HTTP 认证
auth_backends.1 = http
auth_http.http_method   = post
auth_http.user_path     = http://auth-service/rabbitmq/user
auth_http.vhost_path    = http://auth-service/rabbitmq/vhost
auth_http.resource_path = http://auth-service/rabbitmq/resource
auth_http.topic_path    = http://auth-service/rabbitmq/topic

Top 插件

rabbitmq-plugins enable rabbitmq_top
# 在管理界面显示进程资源使用排名

10.8 自定义插件开发

插件目录结构

rabbitmq_custom_plugin/
├── src/
│   ├── rabbit_custom_plugin.erl
│   └── rabbit_custom_plugin_app.erl
├── include/
├── priv/
├── Makefile
└── rabbitmq_custom_plugin.app.src

💡 提示: 自定义插件需要 Erlang 开发能力。大多数场景下,使用现有插件 + HTTP API 即可满足需求。


10.9 插件选择建议

需求推荐插件说明
管理界面rabbitmq_management必装
监控rabbitmq_prometheus推荐
延迟消息rabbitmq_delayed_message_exchange推荐
跨集群同步rabbitmq_federation / rabbitmq_shovel按需
MQTT 支持rabbitmq_mqttIoT 场景
WebSocketrabbitmq_web_stomp浏览器场景
认证集成rabbitmq_auth_backend_http企业场景

10.10 注意事项

⚠️ 插件版本兼容性

第三方插件版本必须与 RabbitMQ 主版本匹配,否则无法加载。

⚠️ 插件会影响性能

每个启用的插件都会消耗一定资源,生产环境仅启用必要插件。

⚠️ Prometheus 高开销指标

/metrics/detailed 端点会为每个队列/连接/通道生成指标,高基数场景下会产生大量数据。

🔥 最佳实践: 生产环境推荐启用 rabbitmq_management + rabbitmq_prometheus + 按需的业务插件。


10.11 扩展阅读


下一章: 第 11 章:RabbitMQ Streams — 了解 RabbitMQ 的流式处理能力。