第 6 章:生产者开发
第 6 章:生产者开发
生产者是消息的源头。本章将深入讲解如何可靠地将消息发送到 RabbitMQ,确保消息不丢失、不重复。
6.1 生产者基础
发布消息的基本要素
| 要素 | 说明 | 是否必须 |
|---|---|---|
exchange | 目标交换机 | 是(空字符串表示默认交换机) |
routing_key | 路由键 | 是 |
body | 消息体(字节) | 是 |
properties | 消息属性 | 否 |
消息属性(BasicProperties)
| 属性 | 类型 | 说明 |
|---|---|---|
delivery_mode | int | 1=非持久,2=持久 |
content_type | string | MIME 类型,如 application/json |
content_encoding | string | 编码方式 |
priority | int | 消息优先级(0-255) |
correlation_id | string | 关联 ID(RPC 模式) |
reply_to | string | 回复队列名(RPC 模式) |
expiration | string | 消息 TTL(毫秒) |
message_id | string | 消息唯一标识 |
timestamp | int | 消息时间戳 |
type | string | 消息类型标识 |
app_id | string | 应用标识 |
headers | dict | 自定义头部 |
Python 基础发布
import pika
import json
import uuid
from datetime import datetime
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='orders', exchange_type='direct', durable=True)
channel.queue_declare(queue='order_process', durable=True)
channel.queue_bind(exchange='orders', queue='order_process', routing_key='order.create')
# 发布消息
message = {
'message_id': str(uuid.uuid4()),
'order_id': 'ORD-20260510-001',
'user_id': 'U1001',
'amount': 299.9,
'items': [{'sku': 'A001', 'qty': 2, 'price': 149.95}],
'created_at': datetime.now().isoformat()
}
channel.basic_publish(
exchange='orders',
routing_key='order.create',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2,
content_type='application/json',
message_id=str(uuid.uuid4()),
timestamp=int(datetime.now().timestamp()),
app_id='order-service'
)
)
print(f"[x] 订单消息已发布: {message['order_id']}")
connection.close()
6.2 消息可靠性保证
消息丢失的三个阶段
Producer --1--> Broker (Exchange) --2--> Queue --3--> Consumer
│ │ │
│ 消息可能丢失 消息可能丢失
│
└── 消息可能丢失(网络/进程崩溃)
| 阶段 | 风险 | 解决方案 |
|---|---|---|
| 1. 生产者 → Broker | 网络故障、进程崩溃 | Publisher Confirm |
| 2. Broker 存储 | 服务器宕机 | 持久化(Exchange + Queue + Message) |
| 3. Broker → Consumer | 消费者崩溃 | Consumer ACK |
6.3 事务模式(Transaction)
事务模式是最基础的消息确认方式,通过 txSelect/txCommit/txRollback 实现。
代码示例
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='tx_exchange', exchange_type='direct', durable=True)
channel.queue_declare(queue='tx_queue', durable=True)
channel.queue_bind(exchange='tx_exchange', queue='tx_queue', routing_key='task')
try:
# 开启事务
channel.tx_select()
# 发送消息 1
channel.basic_publish(
exchange='tx_exchange',
routing_key='task',
body=b'Task 1',
properties=pika.BasicProperties(delivery_mode=2)
)
# 发送消息 2
channel.basic_publish(
exchange='tx_exchange',
routing_key='task',
body=b'Task 2',
properties=pika.BasicProperties(delivery_mode=2)
)
# 提交事务(所有消息一起写入)
channel.tx_commit()
print("[x] 事务提交成功")
except Exception as e:
# 回滚事务
channel.tx_rollback()
print(f"[!] 事务回滚: {e}")
事务的性能问题
| 方面 | 事务模式 | Publisher Confirm |
|---|---|---|
| 吞吐量 | 低(每批需等待同步确认) | 高(异步确认) |
| 实现复杂度 | 简单 | 中等 |
| 适用场景 | 极少量关键消息 | 生产推荐 |
⚠️ 注意: 事务模式会显著降低吞吐量(约降低 2-10 倍),生产环境推荐使用 Publisher Confirm。
6.4 Publisher Confirm(发布确认)
Publisher Confirm 是 RabbitMQ 推荐的可靠发布方式。Broker 在收到消息后向生产者发送确认。
确认模式
| 模式 | 说明 | 性能 |
|---|---|---|
| 同步单条确认 | 每发送一条等待确认 | 最低 |
| 同步批量确认 | 发送一批后等待确认 | 中等 |
| 异步回调确认 | 发送后继续发送,通过回调收到确认 | 最高 |
模式一:同步单条确认
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 启用 Publisher Confirm
channel.confirm_delivery()
channel.exchange_declare(exchange='confirm_ex', exchange_type='direct', durable=True)
channel.queue_declare(queue='confirm_q', durable=True)
channel.queue_bind(exchange='confirm_ex', queue='confirm_q', routing_key='task')
try:
channel.basic_publish(
exchange='confirm_ex',
routing_key='task',
body=b'Important task',
properties=pika.BasicProperties(delivery_mode=2)
)
print("[x] 消息已被 Broker 确认")
except pika.exceptions.UnroutableError:
print("[!] 消息无法路由")
except pika.exceptions.NackError:
print("[!] Broker 拒绝了消息")
模式二:批量确认(Python pika 中使用 publish_mandatory + returns)
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.confirm_delivery()
channel.exchange_declare(exchange='batch_ex', exchange_type='direct', durable=True)
channel.queue_declare(queue='batch_q', durable=True)
channel.queue_bind(exchange='batch_ex', queue='batch_q', routing_key='task')
# 批量发送后统一看是否被确认
messages = [f'Task {i}' for i in range(100)]
unconfirmed = set()
for msg in messages:
try:
channel.basic_publish(
exchange='batch_ex',
routing_key='task',
body=msg.encode(),
properties=pika.BasicProperties(delivery_mode=2)
)
except pika.exceptions.UnroutableError:
print(f"[!] 消息路由失败: {msg}")
print(f"[x] 批量发送完成")
模式三:异步确认(Java 示例,推荐生产使用)
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
public class AsyncProducer {
private static final String EXCHANGE = "async_ex";
private static final String QUEUE = "async_q";
private static final String ROUTING_KEY = "task";
// 用于跟踪未确认的消息
private static final ConcurrentNavigableMap<Long, String> outstandingConfirms =
new ConcurrentSkipListMap<>();
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, true);
channel.queueDeclare(QUEUE, true, false, false, null);
channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);
// 启用 Publisher Confirm
channel.confirmSelect();
// 异步确认回调
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
if (multiple) {
// 批量确认:移除所有 <= deliveryTag 的消息
ConcurrentNavigableMap<Long, String> confirmed =
outstandingConfirms.headMap(deliveryTag, true);
confirmed.clear();
} else {
outstandingConfirms.remove(deliveryTag);
}
System.out.println("[✓] 确认 tag=" + deliveryTag + " multiple=" + multiple);
};
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
String message = outstandingConfirms.get(deliveryTag);
System.out.println("[✗] 拒绝 tag=" + deliveryTag + " message=" + message);
// 处理被拒绝的消息:重试、记录日志、告警等
if (multiple) {
ConcurrentNavigableMap<Long, String> nacked =
outstandingConfirms.headMap(deliveryTag, true);
nacked.clear();
} else {
outstandingConfirms.remove(deliveryTag);
}
};
channel.addConfirmListener(ackCallback, nackCallback);
// 批量发送消息
for (int i = 0; i < 1000; i++) {
String message = "Task " + i;
long seqNo = channel.getNextPublishSeqNo();
outstandingConfirms.put(seqNo, message);
channel.basicPublish(EXCHANGE, ROUTING_KEY,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
}
System.out.println("[x] 1000 条消息已发送,等待确认...");
Thread.sleep(5000); // 等待所有确认
connection.close();
}
}
Go 异步确认示例
package main
import (
"log"
"github.com/rabbitmq/amqp091-go"
)
func main() {
conn, _ := amqp.Dial("amqp://admin:admin123@localhost:5672/")
defer conn.Close()
ch, _ := conn.Channel()
defer ch.Close()
// 启用 Confirm 模式
ch.Confirm(false)
confirms := ch.NotifyPublish(make(chan amqp.Confirmation, 100))
// 异步处理确认
go func() {
for conf := range confirms {
if conf.Ack {
log.Printf("[✓] 确认 tag=%d", conf.DeliveryTag)
} else {
log.Printf("[✗] 拒绝 tag=%d", conf.DeliveryTag)
}
}
}()
ch.ExchangeDeclare("go_ex", "direct", true, false, false, false, nil)
ch.QueueDeclare("go_q", true, false, false, nil)
ch.QueueBind("go_q", "task", "go_ex", false, nil)
// 发送消息
for i := 0; i < 100; i++ {
ch.Publish("go_ex", "task", false, false,
amqp.Publishing{
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte("Task from Go"),
})
}
log.Println("[x] 100 条消息已发送")
select {} // 阻塞等待
}
6.5 消息属性最佳实践
消息结构规范
import uuid
from datetime import datetime
def create_message(event_type, payload):
"""创建标准化的消息"""
return {
'header': {
'message_id': str(uuid.uuid4()),
'event_type': event_type,
'timestamp': int(datetime.now().timestamp() * 1000),
'source': 'order-service',
'version': '1.0',
'trace_id': str(uuid.uuid4()), # 用于分布式追踪
},
'body': payload
}
# 使用
message = create_message('order.created', {
'order_id': 'ORD-001',
'user_id': 'U1001',
'amount': 299.9
})
channel.basic_publish(
exchange='events',
routing_key='order.created',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2,
content_type='application/json',
message_id=message['header']['message_id'],
type='order.created',
app_id='order-service',
timestamp=int(datetime.now().timestamp()),
headers={
'trace_id': message['header']['trace_id'],
'source': message['header']['source']
}
)
)
6.6 Mandatory 标志
当消息无法路由到任何队列时,mandatory=True 会将消息退还给生产者。
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 设置 return 回调
def on_return(ch, method, properties, body):
print(f"[!] 消息被退回: reply_code={method.reply_code}, "
f"reply_text={method.reply_text}, routing_key={method.routing_key}")
print(f" 消息内容: {body.decode()}")
channel.add_on_return_callback(on_return)
# mandatory=True 表示消息必须路由到队列
channel.basic_publish(
exchange='some_exchange',
routing_key='nonexistent_key',
body=b'important message',
properties=pika.BasicProperties(delivery_mode=2),
mandatory=True # 无法路由时退回
)
connection.close()
Mandatory vs Alternate Exchange
| 方式 | 优点 | 缺点 |
|---|---|---|
mandatory=True | 生产者直接感知 | 需要处理退回逻辑 |
alternate-exchange | 透明处理,生产者无需感知 | 生产者不直接知道消息被退回 |
6.7 消息去重
RabbitMQ 本身不提供消息去重功能,需要应用层实现。
方案一:消息 ID 去重
import redis
import uuid
import pika
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def publish_deduplicated(channel, exchange, routing_key, body, message_id=None):
"""带去重的消息发布"""
msg_id = message_id or str(uuid.uuid4())
# 检查是否已发送(Redis SET NX)
if redis_client.set(f'mq:dedup:{msg_id}', '1', nx=True, ex=3600):
channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=body,
properties=pika.BasicProperties(
delivery_mode=2,
message_id=msg_id
)
)
return True
else:
print(f"[!] 重复消息,已跳过: {msg_id}")
return False
方案二:幂等消费端去重
import redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def idempotent_consume(ch, method, properties, body):
"""幂等消费者"""
msg_id = properties.message_id
# 检查是否已处理
if redis_client.sismember('mq:processed', msg_id):
print(f"[!] 重复消息,跳过: {msg_id}")
ch.basic_ack(delivery_tag=method.delivery_tag)
return
try:
# 处理消息
process(body)
# 标记为已处理
redis_client.sadd('mq:processed', msg_id)
redis_client.expire('mq:processed', 86400 * 7) # 7 天过期
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
6.8 批量发送优化
高吞吐量生产者优化
import pika
import json
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.confirm_delivery()
# 优化 1: 使用较大的 frame_max
# 优化 2: 减少每条消息的额外属性
# 优化 3: 批量发送后统一处理确认
BATCH_SIZE = 500
messages = [json.dumps({'id': i, 'data': f'payload_{i}'}) for i in range(10000)]
start = time.time()
count = 0
for msg in messages:
channel.basic_publish(
exchange='batch_ex',
routing_key='task',
body=msg.encode(),
properties=pika.BasicProperties(delivery_mode=2)
)
count += 1
elapsed = time.time() - start
print(f"[x] 发送 {count} 条消息,耗时 {elapsed:.2f}s,吞吐量 {count/elapsed:.0f} msg/s")
6.9 生产者常见问题
问题一:消息发送失败
# 完整的可靠发送模式
def reliable_publish(channel, exchange, routing_key, body, max_retries=3):
for attempt in range(max_retries):
try:
channel.basic_publish(
exchange=exchange,
routing_key=routing_key,
body=body,
properties=pika.BasicProperties(delivery_mode=2),
mandatory=True
)
return True
except pika.exceptions.UnroutableError:
print(f"[!] 路由失败,重试 {attempt + 1}/{max_retries}")
time.sleep(0.5 * (attempt + 1))
except pika.exceptions.ChannelClosedByBroker:
print("[!] Channel 被关闭,重建连接")
channel = create_channel()
except Exception as e:
print(f"[!] 发送异常: {e}")
time.sleep(1)
# 发送到死信队列或记录到数据库
save_to_fallback(body)
return False
问题二:连接断开后的消息缓冲
import collections
from threading import Lock
class BufferedProducer:
"""带本地缓冲的生产者"""
def __init__(self, host, exchange, routing_key):
self.host = host
self.exchange = exchange
self.routing_key = routing_key
self.buffer = collections.deque(maxlen=100000)
self.lock = Lock()
self._connect()
def _connect(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(self.host, heartbeat=60)
)
self.channel = self.connection.channel()
self.channel.confirm_delivery()
def publish(self, body):
try:
self.channel.basic_publish(
exchange=self.exchange,
routing_key=self.routing_key,
body=body,
properties=pika.BasicProperties(delivery_mode=2)
)
# 发送缓冲中的消息
self._flush_buffer()
except Exception:
with self.lock:
self.buffer.append(body)
self._connect()
def _flush_buffer(self):
with self.lock:
while self.buffer:
body = self.buffer.popleft()
self.channel.basic_publish(
exchange=self.exchange,
routing_key=self.routing_key,
body=body,
properties=pika.BasicProperties(delivery_mode=2)
)
6.10 注意事项
⚠️ Publisher Confirm 不等于消息持久化
Broker 收到消息并写入内存缓冲后就会发送确认,此时消息可能还未持久化到磁盘。如果需要更强保证,可设置 publisher-confirm-type=casic(Java Spring AMQP)或配合事务。
⚠️ 每条消息都需要设置 delivery_mode=2
持久化是消息级别的设置,不会从 Exchange 或 Queue 继承。
⚠️ 避免发送过大的消息
建议消息体不超过 128KB。过大的消息会影响性能和内存使用。大文件应通过对象存储传输,消息中只传递引用 URL。
⚠️ Publisher Confirm 与事务不要混用
同时启用 confirm 和 transaction 会导致不可预测的行为。
🔥 最佳实践: 生产环境统一使用 Publisher Confirm(异步模式) + 消息持久化 + mandatory 标志。
6.11 扩展阅读
下一章: 第 7 章:消费者开发 — 掌握消费者的消息接收、确认机制和重试策略。