强曰为道

与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

第 7 章:消费者开发

第 7 章:消费者开发

消费者负责接收和处理消息。本章将详细讲解消费者的核心机制:确认模式、预取控制、消息拒绝和重试策略。


7.1 消费模式对比

模式说明推荐场景
Push(推模式)Broker 主动推送消息给消费者事件驱动、实时处理
Pull(拉模式)消费者主动拉取消息批量处理、低优先级任务

Push 模式(basic.consume)

# 持续订阅,等待 Broker 推送
def callback(ch, method, properties, body):
    print(f"[x] 收到: {body.decode()}")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()  # 阻塞等待

Pull 模式(basic.get)

# 主动拉取单条消息
method, properties, body = channel.basic_get(queue='task_queue', auto_ack=False)
if method:
    print(f"[x] 收到: {body.decode()}")
    channel.basic_ack(delivery_tag=method.delivery_tag)
else:
    print("[*] 队列为空")

💡 提示: 大多数场景推荐使用 Push 模式,它更高效且支持流控。Pull 模式适合轮询场景。


7.2 确认模式详解

自动确认(Auto Ack)

channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=True)
特点说明
优点性能最高,无额外开销
缺点消息发出即删除,消费者崩溃会丢消息
适用允许少量丢失的场景(日志、监控)

⚠️ 注意: 生产环境不建议使用自动确认,消息可能在处理完成前就被删除。

手动确认(Manual Ack)

def callback(ch, method, properties, body):
    try:
        process_message(body)
        # 确认消息(成功处理)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except RetryableException:
        # 可重试的异常 - 重新入队
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
    except PermanentException:
        # 永久性异常 - 不重新入队(进入死信)
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
    except Exception:
        # 未知异常 - 重新入队
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=False)

确认方法对比

方法说明消息去向
basic_ack确认成功处理从队列删除
basic_nack(requeue=True)拒绝并重新入队回到队列头部
basic_nack(requeue=False)拒绝不入队进入死信队列(如果配置了 DLX)
basic_reject同 nack,但只能拒绝单条同 nack

批量确认

# 确认 delivery_tag 及之前的所有消息
ch.basic_ack(delivery_tag=method.delivery_tag, multiple=True)

7.3 QoS / Prefetch(预取)

预取限制消费者同时持有的未确认消息数量,实现负载均衡。

设置预取

# 每次最多预取 10 条未确认消息
channel.basic_qos(prefetch_count=10)

# 基于字节的预取(RabbitMQ 3.x 支持有限)
channel.basic_qos(prefetch_size=0, prefetch_count=10, global_qos=False)

预取对负载均衡的影响

场景: 2 个消费者,prefetch_count=1

Queue: [1][2][3][4][5][6]
         │       │
         v       v
    Consumer A  Consumer B
    (处理[1])   (处理[2])
    
→ 每个消费者一次只处理一条,处理完才获取下一条
→ 消费者之间严格轮询,适合处理时间相近的场景
场景: 2 个消费者,prefetch_count=5

Queue: [1][2][3][4][5][6]
         │  │
         v  v
    Consumer A  Consumer B
    (处理 1-5)  (处理 6)
    
→ 处理快的消费者会获得更多消息
→ 更好的负载均衡效果

预取值推荐

场景推荐 prefetch_count说明
处理时间短(<100ms)20-50提高吞吐
处理时间中等(100ms-1s)5-10平衡吞吐和延迟
处理时间长(>1s)1-3避免消息长时间被占用
消费者不稳定1快速故障转移

7.4 消费者示例(完整)

Python 完整消费者

import pika
import json
import signal
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class MessageConsumer:
    def __init__(self, host, queue, prefetch_count=10):
        self.host = host
        self.queue = queue
        self.prefetch_count = prefetch_count
        self.should_stop = False
        
        signal.signal(signal.SIGINT, self._handle_signal)
        signal.signal(signal.SIGTERM, self._handle_signal)
    
    def _handle_signal(self, signum, frame):
        logger.info("收到停止信号,准备优雅关闭...")
        self.should_stop = True
        self.channel.stop_consuming()
    
    def _connect(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(
                host=self.host,
                heartbeat=60,
                blocked_connection_timeout=300
            )
        )
        self.channel = self.connection.channel()
        self.channel.basic_qos(prefetch_count=self.prefetch_count)
    
    def _process_message(self, ch, method, properties, body):
        try:
            message = json.loads(body)
            logger.info(f"处理消息: {message}")
            
            # 业务逻辑
            self.handle_business(message)
            
            # 确认消息
            ch.basic_ack(delivery_tag=method.delivery_tag)
            logger.info(f"消息处理成功: {properties.message_id}")
            
        except json.JSONDecodeError:
            logger.error(f"消息格式错误: {body}")
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
            
        except Exception as e:
            logger.error(f"消息处理失败: {e}")
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
    
    def handle_business(self, message):
        """业务处理逻辑(子类重写)"""
        raise NotImplementedError
    
    def start(self):
        self._connect()
        self.channel.basic_consume(
            queue=self.queue,
            on_message_callback=self._process_message,
            auto_ack=False
        )
        logger.info(f"消费者已启动,队列: {self.queue}")
        
        while not self.should_stop:
            try:
                self.channel.start_consuming()
            except pika.exceptions.AMQPConnectionError:
                logger.warning("连接断开,5 秒后重连...")
                import time
                time.sleep(5)
                self._connect()
                self.channel.basic_consume(
                    queue=self.queue,
                    on_message_callback=self._process_message,
                    auto_ack=False
                )

# 使用
class OrderConsumer(MessageConsumer):
    def handle_business(self, message):
        order_id = message.get('order_id')
        logger.info(f"处理订单: {order_id}")
        # ... 业务逻辑

consumer = OrderConsumer('localhost', 'order_queue', prefetch_count=5)
consumer.start()

Java 消费者

import com.rabbitmq.client.*;

public class OrderConsumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("admin");
        factory.setPassword("admin123");
        factory.setAutomaticRecoveryEnabled(true);  // 自动重连
        factory.setNetworkRecoveryInterval(5000);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 设置 QoS
        channel.basicQos(10);

        // 消费消息
        DeliverCallback deliverCallback = (tag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            try {
                System.out.println("[x] 处理: " + message);
                // 业务逻辑...
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            } catch (Exception e) {
                System.err.println("[!] 处理失败: " + e.getMessage());
                channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
            }
        };

        channel.basicConsume("order_queue", false, deliverCallback, tag -> {
            System.out.println("[!] 消费被取消: " + tag);
        });

        System.out.println("[*] 等待消息...");
        Thread.currentThread().join();  // 阻塞等待
    }
}

7.5 消息拒绝与重试

拒绝策略

def process_with_retry(ch, method, properties, body):
    retry_count = 0
    if properties.headers:
        retry_count = properties.headers.get('x-retry-count', 0)
    
    try:
        handle_message(body)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        if retry_count < 3:
            # 重试:发布到延迟重试交换机
            headers = dict(properties.headers or {})
            headers['x-retry-count'] = retry_count + 1
            headers['x-original-queue'] = method.routing_key
            
            ch.basic_publish(
                exchange='retry_exchange',
                routing_key='retry',
                body=body,
                properties=pika.BasicProperties(
                    delivery_mode=2,
                    headers=headers
                )
            )
            ch.basic_ack(delivery_tag=method.delivery_tag)
        else:
            # 超过重试次数 - 进入死信
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

指数退避重试

import time
import pika

def retry_with_backoff(ch, method, properties, body):
    retry_count = properties.headers.get('x-retry-count', 0) if properties.headers else 0
    
    try:
        process(body)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception:
        if retry_count < 5:
            # 指数退避延迟
            delay = min(1000 * (2 ** retry_count), 30000)  # 1s, 2s, 4s, 8s, 16s, 30s(max)
            
            headers = dict(properties.headers or {})
            headers['x-retry-count'] = retry_count + 1
            
            ch.basic_publish(
                exchange='retry_delayed_exchange',
                routing_key='retry',
                body=body,
                properties=pika.BasicProperties(
                    delivery_mode=2,
                    headers={'x-delay': delay, **headers}
                )
            )
            ch.basic_ack(delivery_tag=method.delivery_tag)
        else:
            # 进入死信队列
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

重试架构图

Producer --> Main Queue --> Consumer
                 │              │
                 │           处理失败
                 │              │
                 v              v
          Retry Exchange (delayed)
                 │
                 │ 延迟 N 秒
                 v
          Main Queue (重新入队)
                 │
                 │ (retry_count < max)
                 │
                 v
              Consumer (重试)
                 │
                 │ (retry_count >= max)
                 v
          DLX Exchange --> Dead Letter Queue --> 告警/人工处理

7.6 消费者并发

多线程消费者

import pika
import threading
import json

class MultiThreadConsumer:
    def __init__(self, host, queue, num_threads=4):
        self.host = host
        self.queue = queue
        self.num_threads = num_threads
    
    def _worker(self, worker_id):
        connection = pika.BlockingConnection(
            pika.ConnectionParameters(self.host)
        )
        channel = connection.channel()
        channel.basic_qos(prefetch_count=5)
        
        def callback(ch, method, properties, body):
            print(f"[Worker-{worker_id}] 处理: {body.decode()}")
            # 模拟处理
            import time
            time.sleep(0.1)
            ch.basic_ack(delivery_tag=method.delivery_tag)
        
        channel.basic_consume(
            queue=self.queue,
            on_message_callback=callback,
            auto_ack=False
        )
        print(f"[Worker-{worker_id}] 已启动")
        channel.start_consuming()
    
    def start(self):
        threads = []
        for i in range(self.num_threads):
            t = threading.Thread(target=self._worker, args=(i,))
            t.daemon = True
            t.start()
            threads.append(t)
        
        print(f"已启动 {self.num_threads} 个消费者线程")
        for t in threads:
            t.join()

# 使用
consumer = MultiThreadConsumer('localhost', 'task_queue', num_threads=4)
consumer.start()

进程级并发

# 使用多个进程消费同一个队列
for i in $(seq 1 4); do
    python consumer.py &
done

7.7 消费者优雅关闭

import signal
import pika

class GracefulConsumer:
    def __init__(self):
        self.connection = None
        self.channel = None
        self._closing = False
        
        signal.signal(signal.SIGINT, self._signal_handler)
        signal.signal(signal.SIGTERM, self._signal_handler)
    
    def _signal_handler(self, signum, frame):
        print(f"\n收到信号 {signum},正在优雅关闭...")
        self._closing = True
        if self.channel:
            self.channel.stop_consuming()
    
    def start(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters('localhost')
        )
        self.channel = self.connection.channel()
        self.channel.basic_qos(prefetch_count=10)
        
        self.channel.basic_consume(
            queue='task_queue',
            on_message_callback=self._on_message
        )
        
        try:
            self.channel.start_consuming()
        except Exception:
            pass
        finally:
            if self.connection and self.connection.is_open:
                self.connection.close()
                print("连接已关闭")
    
    def _on_message(self, ch, method, properties, body):
        try:
            # 处理消息
            process(body)
            ch.basic_ack(delivery_tag=method.delivery_tag)
        except Exception as e:
            # 处理失败,重新入队
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

7.8 注意事项

⚠️ 不要在回调中执行长时间阻塞操作

长时间阻塞会阻止 Channel 接收新的消息和心跳,导致连接超时。

⚠️ 手动确认时注意异常处理

如果消费者崩溃且未发送 ACK,消息会保持 unacked 状态直到连接超时后重新投递。

⚠️ prefetch_count 不宜过大

过大的预取值会导致消息堆积在消费者内存中,影响故障转移速度。

⚠️ 注意消息重复消费

消息可能被多次投递(网络抖动、消费者超时等),消费者必须实现幂等处理。

🔥 最佳实践: 使用手动确认 + 合理的预取值 + 幂等消费 + 优雅关闭。


7.9 扩展阅读


下一章: 第 8 章:消息路由 — 深入掌握消息路由的各种策略和高级用法。