强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

第 6 章:生产者开发

第 6 章:生产者开发

生产者是消息的源头。本章将深入讲解如何可靠地将消息发送到 RabbitMQ,确保消息不丢失、不重复。


6.1 生产者基础

发布消息的基本要素

要素说明是否必须
exchange目标交换机是(空字符串表示默认交换机)
routing_key路由键
body消息体(字节)
properties消息属性

消息属性(BasicProperties)

属性类型说明
delivery_modeint1=非持久,2=持久
content_typestringMIME 类型,如 application/json
content_encodingstring编码方式
priorityint消息优先级(0-255)
correlation_idstring关联 ID(RPC 模式)
reply_tostring回复队列名(RPC 模式)
expirationstring消息 TTL(毫秒)
message_idstring消息唯一标识
timestampint消息时间戳
typestring消息类型标识
app_idstring应用标识
headersdict自定义头部

Python 基础发布

import pika
import json
import uuid
from datetime import datetime

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='orders', exchange_type='direct', durable=True)
channel.queue_declare(queue='order_process', durable=True)
channel.queue_bind(exchange='orders', queue='order_process', routing_key='order.create')

# 发布消息
message = {
    'message_id': str(uuid.uuid4()),
    'order_id': 'ORD-20260510-001',
    'user_id': 'U1001',
    'amount': 299.9,
    'items': [{'sku': 'A001', 'qty': 2, 'price': 149.95}],
    'created_at': datetime.now().isoformat()
}

channel.basic_publish(
    exchange='orders',
    routing_key='order.create',
    body=json.dumps(message),
    properties=pika.BasicProperties(
        delivery_mode=2,
        content_type='application/json',
        message_id=str(uuid.uuid4()),
        timestamp=int(datetime.now().timestamp()),
        app_id='order-service'
    )
)

print(f"[x] 订单消息已发布: {message['order_id']}")
connection.close()

6.2 消息可靠性保证

消息丢失的三个阶段

Producer --1--> Broker (Exchange) --2--> Queue --3--> Consumer
   │              │                        │
   │         消息可能丢失              消息可能丢失
   │                                       
   └── 消息可能丢失(网络/进程崩溃)
阶段风险解决方案
1. 生产者 → Broker网络故障、进程崩溃Publisher Confirm
2. Broker 存储服务器宕机持久化(Exchange + Queue + Message)
3. Broker → Consumer消费者崩溃Consumer ACK

6.3 事务模式(Transaction)

事务模式是最基础的消息确认方式,通过 txSelect/txCommit/txRollback 实现。

代码示例

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='tx_exchange', exchange_type='direct', durable=True)
channel.queue_declare(queue='tx_queue', durable=True)
channel.queue_bind(exchange='tx_exchange', queue='tx_queue', routing_key='task')

try:
    # 开启事务
    channel.tx_select()

    # 发送消息 1
    channel.basic_publish(
        exchange='tx_exchange',
        routing_key='task',
        body=b'Task 1',
        properties=pika.BasicProperties(delivery_mode=2)
    )

    # 发送消息 2
    channel.basic_publish(
        exchange='tx_exchange',
        routing_key='task',
        body=b'Task 2',
        properties=pika.BasicProperties(delivery_mode=2)
    )

    # 提交事务(所有消息一起写入)
    channel.tx_commit()
    print("[x] 事务提交成功")

except Exception as e:
    # 回滚事务
    channel.tx_rollback()
    print(f"[!] 事务回滚: {e}")

事务的性能问题

方面事务模式Publisher Confirm
吞吐量低(每批需等待同步确认)高(异步确认)
实现复杂度简单中等
适用场景极少量关键消息生产推荐

⚠️ 注意: 事务模式会显著降低吞吐量(约降低 2-10 倍),生产环境推荐使用 Publisher Confirm。


6.4 Publisher Confirm(发布确认)

Publisher Confirm 是 RabbitMQ 推荐的可靠发布方式。Broker 在收到消息后向生产者发送确认。

确认模式

模式说明性能
同步单条确认每发送一条等待确认最低
同步批量确认发送一批后等待确认中等
异步回调确认发送后继续发送,通过回调收到确认最高

模式一:同步单条确认

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 启用 Publisher Confirm
channel.confirm_delivery()

channel.exchange_declare(exchange='confirm_ex', exchange_type='direct', durable=True)
channel.queue_declare(queue='confirm_q', durable=True)
channel.queue_bind(exchange='confirm_ex', queue='confirm_q', routing_key='task')

try:
    channel.basic_publish(
        exchange='confirm_ex',
        routing_key='task',
        body=b'Important task',
        properties=pika.BasicProperties(delivery_mode=2)
    )
    print("[x] 消息已被 Broker 确认")
except pika.exceptions.UnroutableError:
    print("[!] 消息无法路由")
except pika.exceptions.NackError:
    print("[!] Broker 拒绝了消息")

模式二:批量确认(Python pika 中使用 publish_mandatory + returns

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.confirm_delivery()

channel.exchange_declare(exchange='batch_ex', exchange_type='direct', durable=True)
channel.queue_declare(queue='batch_q', durable=True)
channel.queue_bind(exchange='batch_ex', queue='batch_q', routing_key='task')

# 批量发送后统一看是否被确认
messages = [f'Task {i}' for i in range(100)]
unconfirmed = set()

for msg in messages:
    try:
        channel.basic_publish(
            exchange='batch_ex',
            routing_key='task',
            body=msg.encode(),
            properties=pika.BasicProperties(delivery_mode=2)
        )
    except pika.exceptions.UnroutableError:
        print(f"[!] 消息路由失败: {msg}")

print(f"[x] 批量发送完成")

模式三:异步确认(Java 示例,推荐生产使用)

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

public class AsyncProducer {
    private static final String EXCHANGE = "async_ex";
    private static final String QUEUE = "async_q";
    private static final String ROUTING_KEY = "task";

    // 用于跟踪未确认的消息
    private static final ConcurrentNavigableMap<Long, String> outstandingConfirms =
            new ConcurrentSkipListMap<>();

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin123");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, true);
        channel.queueDeclare(QUEUE, true, false, false, null);
        channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);

        // 启用 Publisher Confirm
        channel.confirmSelect();

        // 异步确认回调
        ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
            if (multiple) {
                // 批量确认:移除所有 <= deliveryTag 的消息
                ConcurrentNavigableMap<Long, String> confirmed =
                        outstandingConfirms.headMap(deliveryTag, true);
                confirmed.clear();
            } else {
                outstandingConfirms.remove(deliveryTag);
            }
            System.out.println("[✓] 确认 tag=" + deliveryTag + " multiple=" + multiple);
        };

        ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
            String message = outstandingConfirms.get(deliveryTag);
            System.out.println("[✗] 拒绝 tag=" + deliveryTag + " message=" + message);
            // 处理被拒绝的消息:重试、记录日志、告警等
            if (multiple) {
                ConcurrentNavigableMap<Long, String> nacked =
                        outstandingConfirms.headMap(deliveryTag, true);
                nacked.clear();
            } else {
                outstandingConfirms.remove(deliveryTag);
            }
        };

        channel.addConfirmListener(ackCallback, nackCallback);

        // 批量发送消息
        for (int i = 0; i < 1000; i++) {
            String message = "Task " + i;
            long seqNo = channel.getNextPublishSeqNo();
            outstandingConfirms.put(seqNo, message);

            channel.basicPublish(EXCHANGE, ROUTING_KEY,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes());
        }

        System.out.println("[x] 1000 条消息已发送,等待确认...");
        Thread.sleep(5000);  // 等待所有确认
        connection.close();
    }
}

Go 异步确认示例

package main

import (
    "log"
    "github.com/rabbitmq/amqp091-go"
)

func main() {
    conn, _ := amqp.Dial("amqp://admin:admin123@localhost:5672/")
    defer conn.Close()

    ch, _ := conn.Channel()
    defer ch.Close()

    // 启用 Confirm 模式
    ch.Confirm(false)

    confirms := ch.NotifyPublish(make(chan amqp.Confirmation, 100))

    // 异步处理确认
    go func() {
        for conf := range confirms {
            if conf.Ack {
                log.Printf("[✓] 确认 tag=%d", conf.DeliveryTag)
            } else {
                log.Printf("[✗] 拒绝 tag=%d", conf.DeliveryTag)
            }
        }
    }()

    ch.ExchangeDeclare("go_ex", "direct", true, false, false, false, nil)
    ch.QueueDeclare("go_q", true, false, false, nil)
    ch.QueueBind("go_q", "task", "go_ex", false, nil)

    // 发送消息
    for i := 0; i < 100; i++ {
        ch.Publish("go_ex", "task", false, false,
            amqp.Publishing{
                DeliveryMode: amqp.Persistent,
                ContentType:  "text/plain",
                Body:         []byte("Task from Go"),
            })
    }

    log.Println("[x] 100 条消息已发送")
    select {} // 阻塞等待
}

6.5 消息属性最佳实践

消息结构规范

import uuid
from datetime import datetime

def create_message(event_type, payload):
    """创建标准化的消息"""
    return {
        'header': {
            'message_id': str(uuid.uuid4()),
            'event_type': event_type,
            'timestamp': int(datetime.now().timestamp() * 1000),
            'source': 'order-service',
            'version': '1.0',
            'trace_id': str(uuid.uuid4()),  # 用于分布式追踪
        },
        'body': payload
    }

# 使用
message = create_message('order.created', {
    'order_id': 'ORD-001',
    'user_id': 'U1001',
    'amount': 299.9
})

channel.basic_publish(
    exchange='events',
    routing_key='order.created',
    body=json.dumps(message),
    properties=pika.BasicProperties(
        delivery_mode=2,
        content_type='application/json',
        message_id=message['header']['message_id'],
        type='order.created',
        app_id='order-service',
        timestamp=int(datetime.now().timestamp()),
        headers={
            'trace_id': message['header']['trace_id'],
            'source': message['header']['source']
        }
    )
)

6.6 Mandatory 标志

当消息无法路由到任何队列时,mandatory=True 会将消息退还给生产者。

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 设置 return 回调
def on_return(ch, method, properties, body):
    print(f"[!] 消息被退回: reply_code={method.reply_code}, "
          f"reply_text={method.reply_text}, routing_key={method.routing_key}")
    print(f"    消息内容: {body.decode()}")

channel.add_on_return_callback(on_return)

# mandatory=True 表示消息必须路由到队列
channel.basic_publish(
    exchange='some_exchange',
    routing_key='nonexistent_key',
    body=b'important message',
    properties=pika.BasicProperties(delivery_mode=2),
    mandatory=True  # 无法路由时退回
)

connection.close()

Mandatory vs Alternate Exchange

方式优点缺点
mandatory=True生产者直接感知需要处理退回逻辑
alternate-exchange透明处理,生产者无需感知生产者不直接知道消息被退回

6.7 消息去重

RabbitMQ 本身不提供消息去重功能,需要应用层实现。

方案一:消息 ID 去重

import redis
import uuid
import pika

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def publish_deduplicated(channel, exchange, routing_key, body, message_id=None):
    """带去重的消息发布"""
    msg_id = message_id or str(uuid.uuid4())
    
    # 检查是否已发送(Redis SET NX)
    if redis_client.set(f'mq:dedup:{msg_id}', '1', nx=True, ex=3600):
        channel.basic_publish(
            exchange=exchange,
            routing_key=routing_key,
            body=body,
            properties=pika.BasicProperties(
                delivery_mode=2,
                message_id=msg_id
            )
        )
        return True
    else:
        print(f"[!] 重复消息,已跳过: {msg_id}")
        return False

方案二:幂等消费端去重

import redis

redis_client = redis.Redis(host='localhost', port=6379, db=0)

def idempotent_consume(ch, method, properties, body):
    """幂等消费者"""
    msg_id = properties.message_id
    
    # 检查是否已处理
    if redis_client.sismember('mq:processed', msg_id):
        print(f"[!] 重复消息,跳过: {msg_id}")
        ch.basic_ack(delivery_tag=method.delivery_tag)
        return
    
    try:
        # 处理消息
        process(body)
        # 标记为已处理
        redis_client.sadd('mq:processed', msg_id)
        redis_client.expire('mq:processed', 86400 * 7)  # 7 天过期
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

6.8 批量发送优化

高吞吐量生产者优化

import pika
import json
import time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.confirm_delivery()

# 优化 1: 使用较大的 frame_max
# 优化 2: 减少每条消息的额外属性
# 优化 3: 批量发送后统一处理确认

BATCH_SIZE = 500
messages = [json.dumps({'id': i, 'data': f'payload_{i}'}) for i in range(10000)]

start = time.time()
count = 0

for msg in messages:
    channel.basic_publish(
        exchange='batch_ex',
        routing_key='task',
        body=msg.encode(),
        properties=pika.BasicProperties(delivery_mode=2)
    )
    count += 1

elapsed = time.time() - start
print(f"[x] 发送 {count} 条消息,耗时 {elapsed:.2f}s,吞吐量 {count/elapsed:.0f} msg/s")

6.9 生产者常见问题

问题一:消息发送失败

# 完整的可靠发送模式
def reliable_publish(channel, exchange, routing_key, body, max_retries=3):
    for attempt in range(max_retries):
        try:
            channel.basic_publish(
                exchange=exchange,
                routing_key=routing_key,
                body=body,
                properties=pika.BasicProperties(delivery_mode=2),
                mandatory=True
            )
            return True
        except pika.exceptions.UnroutableError:
            print(f"[!] 路由失败,重试 {attempt + 1}/{max_retries}")
            time.sleep(0.5 * (attempt + 1))
        except pika.exceptions.ChannelClosedByBroker:
            print("[!] Channel 被关闭,重建连接")
            channel = create_channel()
        except Exception as e:
            print(f"[!] 发送异常: {e}")
            time.sleep(1)
    
    # 发送到死信队列或记录到数据库
    save_to_fallback(body)
    return False

问题二:连接断开后的消息缓冲

import collections
from threading import Lock

class BufferedProducer:
    """带本地缓冲的生产者"""
    
    def __init__(self, host, exchange, routing_key):
        self.host = host
        self.exchange = exchange
        self.routing_key = routing_key
        self.buffer = collections.deque(maxlen=100000)
        self.lock = Lock()
        self._connect()
    
    def _connect(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(self.host, heartbeat=60)
        )
        self.channel = self.connection.channel()
        self.channel.confirm_delivery()
    
    def publish(self, body):
        try:
            self.channel.basic_publish(
                exchange=self.exchange,
                routing_key=self.routing_key,
                body=body,
                properties=pika.BasicProperties(delivery_mode=2)
            )
            # 发送缓冲中的消息
            self._flush_buffer()
        except Exception:
            with self.lock:
                self.buffer.append(body)
            self._connect()
    
    def _flush_buffer(self):
        with self.lock:
            while self.buffer:
                body = self.buffer.popleft()
                self.channel.basic_publish(
                    exchange=self.exchange,
                    routing_key=self.routing_key,
                    body=body,
                    properties=pika.BasicProperties(delivery_mode=2)
                )

6.10 注意事项

⚠️ Publisher Confirm 不等于消息持久化

Broker 收到消息并写入内存缓冲后就会发送确认,此时消息可能还未持久化到磁盘。如果需要更强保证,可设置 publisher-confirm-type=casic(Java Spring AMQP)或配合事务。

⚠️ 每条消息都需要设置 delivery_mode=2

持久化是消息级别的设置,不会从 Exchange 或 Queue 继承。

⚠️ 避免发送过大的消息

建议消息体不超过 128KB。过大的消息会影响性能和内存使用。大文件应通过对象存储传输,消息中只传递引用 URL。

⚠️ Publisher Confirm 与事务不要混用

同时启用 confirm 和 transaction 会导致不可预测的行为。

🔥 最佳实践: 生产环境统一使用 Publisher Confirm(异步模式) + 消息持久化 + mandatory 标志。


6.11 扩展阅读


下一章: 第 7 章:消费者开发 — 掌握消费者的消息接收、确认机制和重试策略。