第 08 章:消息队列
第 08 章:消息队列
消息队列不只是"异步处理"的工具,它是微服务解耦、削峰填谷、最终一致性的基础设施。
8.1 为什么需要消息队列
8.1.1 同步通信的痛点
问题1:强耦合
──────────────
订单服务 ──直接调用──▶ 支付服务
订单服务 ──直接调用──▶ 库存服务
订单服务 ──直接调用──▶ 通知服务
(任何一个服务不可用,订单创建就失败)
问题2:性能瓶颈
──────────────
创建订单 = 调用支付(100ms) + 调用库存(50ms) + 调用通知(200ms)
总延迟 = 350ms(串行) 用户等待时间长
问题3:流量洪峰
──────────────
秒杀活动 → 10000 QPS → 直接打到数据库 → 数据库崩溃
8.1.2 消息队列的三大作用
┌──────────────────────────────────────────────────────────────┐
│ 消息队列核心价值 │
├──────────────────────────────────────────────────────────────┤
│ │
│ 1. 异步处理 (Async Processing) │
│ ┌──────────┐ ┌─────┐ ┌──────────┐ │
│ │ 订单服务 │───▶│ MQ │───▶│ 通知服务 │ │
│ └──────────┘ └─────┘ └──────────┘ │
│ 订单创建后立即返回,通知异步发送 │
│ │
│ 2. 应用解耦 (Decoupling) │
│ ┌──────────┐ ┌─────┐ ┌──────────┐ │
│ │ 订单服务 │───▶│ MQ │───▶│ 支付服务 │ │
│ └──────────┘ │ │ └──────────┘ │
│ │ │───▶ ┌──────────┐ │
│ │ │ │ 库存服务 │ │
│ │ │ └──────────┘ │
│ └─────┘ │
│ 新增消费者不需要修改生产者 │
│ │
│ 3. 削峰填谷 (Load Leveling) │
│ ┌──────────┐ ┌─────────────────┐ ┌──────────┐ │
│ │ 10000QPS │───▶│ MQ (缓冲 100万) │───▶│ 1000QPS │ │
│ │ 峰值流量 │ │ 慢慢消费 │ │ 稳定处理 │ │
│ └──────────┘ └─────────────────┘ └──────────┘ │
└──────────────────────────────────────────────────────────────┘
8.2 消息队列选型
8.2.1 主流 MQ 对比
| 维度 | Kafka | RabbitMQ | RocketMQ | Pulsar |
|---|
| 开发语言 | Java/Scala | Erlang | Java | Java |
| 吞吐量 | 极高 (百万级/秒) | 中 (万级/秒) | 高 (十万级/秒) | 极高 |
| 延迟 | ms 级 | μs 级 | ms 级 | ms 级 |
| 消息模型 | 发布-订阅 (Log) | 多种 (Queue/Topic) | 发布-订阅 | 发布-订阅 |
| 消息顺序 | 分区内有序 | 队列内有序 | 队列内有序 | 分区内有序 |
| 消息回溯 | ✅ 支持 | ❌ 不支持 | ✅ 支持 | ✅ 支持 |
| 延迟消息 | ❌ 需插件 | ✅ 原生支持 | ✅ 原生支持 | ✅ 原生支持 |
| 事务消息 | ✅ 支持 | ✅ 支持 | ✅ 支持 | ✅ 支持 |
| 社区活跃度 | 极高 | 高 | 高 | 高 |
| 适用场景 | 大数据/日志/事件流 | 业务消息/RPC | 电商/金融 | 多租户/云原生 |
8.2.2 选型建议
需求分析 → 选型
大数据/日志/流处理 → Kafka
复杂路由/企业集成 → RabbitMQ
电商/金融/事务消息 → RocketMQ
云原生/多租户 → Pulsar
简单决策:
┌────────────────────────────────────────┐
│ 吞吐量要求 > 10万/秒? │
│ 是 → Kafka / Pulsar │
│ 否 → 需要复杂路由? │
│ 是 → RabbitMQ │
│ 否 → 金融场景? │
│ 是 → RocketMQ │
│ 否 → Kafka (通用选择) │
└────────────────────────────────────────┘
8.3 Kafka 深入
8.3.1 Kafka 架构
┌──────────────────────────────────────────────────────────────┐
│ Kafka 架构 │
├──────────────────────────────────────────────────────────────┤
│ │
│ Producer (生产者) │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 订单服务 │ │ 用户服务 │ │ 商品服务 │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ │
│ └────────────┼────────────┘ │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Kafka Cluster │ │
│ │ │ │
│ │ ┌──────────────────────────────────────────────┐ │ │
│ │ │ Topic: order-events │ │ │
│ │ │ ┌──────────┬──────────┬──────────┐ │ │ │
│ │ │ │Partition0│Partition1│Partition2│ │ │ │
│ │ │ │ msg,msg │ msg,msg │ msg,msg │ │ │ │
│ │ │ └──────────┴──────────┴──────────┘ │ │ │
│ │ └──────────────────────────────────────────────┘ │ │
│ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ │ │
│ │ │ Broker 1 │ │ Broker 2 │ ... │ │
│ │ └──────────────┘ └──────────────┘ │ │
│ │ │ │
│ │ ┌──────────────┐ │ │
│ │ │ ZooKeeper / │ │ │
│ │ │ KRaft (元数据)│ │ │
│ │ └──────────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ ┌────────────┼────────────┐ │
│ ▼ ▼ ▼ │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ 支付服务 │ │ 库存服务 │ │ 通知服务 │ Consumer Group │
│ └─────────┘ └─────────┘ └─────────┘ │
└──────────────────────────────────────────────────────────────┘
8.3.2 核心概念
| 概念 | 说明 |
|---|
| Topic | 消息的逻辑分类(如 order-events) |
| Partition | Topic 的物理分区,保证分区内有序 |
| Broker | Kafka 服务器节点 |
| Producer | 消息生产者 |
| Consumer | 消息消费者 |
| Consumer Group | 消费者组,组内每个分区只被一个消费者消费 |
| Offset | 消费者在分区中的消费位置 |
| Replication | 分区副本,保证高可用 |
8.3.3 Kafka 生产者配置
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 可靠性配置
props.put("acks", "all"); // 所有副本确认
props.put("retries", 3); // 重试次数
props.put("enable.idempotence", true); // 幂等生产者
// 性能配置
props.put("batch.size", 16384); // 批量大小 16KB
props.put("linger.ms", 10); // 等待时间 10ms
props.put("compression.type", "lz4"); // 压缩算法
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 发送消息
ProducerRecord<String, String> record = new ProducerRecord<>(
"order-events",
orderId, // key(同一 key 的消息进入同一分区)
jsonPayload // value
);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
log.info("消息发送成功: partition={}, offset={}",
metadata.partition(), metadata.offset());
} else {
log.error("消息发送失败", exception);
}
});
8.3.4 消费者组与分区再平衡
消费者组 (Consumer Group)
─────────────────────────
Topic: order-events (3 个分区)
┌──────────┐ ┌──────────┐ ┌──────────┐
│Partition0│ │Partition1│ │Partition2│
└─────┬────┘ └─────┬────┘ └─────┬────┘
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│Consumer A│ │Consumer B│ │Consumer C│
└──────────┘ └──────────┘ └──────────┘
(同一 Consumer Group)
当 Consumer C 挂掉时:
┌──────────┐ ┌──────────┐ ┌──────────┐
│Partition0│ │Partition1│ │Partition2│
└─────┬────┘ └─────┬────┘ └─────┬────┘
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐
│Consumer A│ │Consumer B│ ← Consumer C 挂掉
│ P0 + P2 │ │ P1 │ P2 重新分配给 A
└──────────┘ └──────────┘
8.4 RabbitMQ
8.4.1 RabbitMQ 模型
┌──────────────────────────────────────────────────────────┐
│ RabbitMQ 消息模型 │
├──────────────────────────────────────────────────────────┤
│ │
│ Producer ──▶ Exchange ──binding──▶ Queue ──▶ Consumer │
│ │
│ Exchange 类型: │
│ │
│ 1. Direct (直连) │
│ Exchange ──routing_key="order"──▶ Queue A │
│ │
│ 2. Fanout (广播) │
│ Exchange ─────────────────────▶ Queue A │
│ Exchange ─────────────────────▶ Queue B │
│ Exchange ─────────────────────▶ Queue C │
│ │
│ 3. Topic (主题) │
│ Exchange ──"order.*"──────────▶ Queue A │
│ Exchange ──"order.created"────▶ Queue B │
│ Exchange ──"#.payment"────────▶ Queue C │
│ │
│ 4. Headers (头部) │
│ Exchange ──按 Header 匹配────▶ Queue │
└──────────────────────────────────────────────────────────┘
8.4.2 RabbitMQ vs Kafka 选型
| 场景 | Kafka | RabbitMQ |
|---|
| 日志收集/流处理 | ✅ 首选 | ❌ |
| 大数据管道 | ✅ 首选 | ❌ |
| 复杂路由规则 | ⚠️ 需要设计 | ✅ 原生支持 |
| 请求-应答模式 | ⚠️ 不原生 | ✅ 原生支持 |
| 延迟队列 | ⚠️ 需插件 | ✅ 原生支持 |
| 消息确认机制 | ✅ Offset | ✅ ACK |
| 低延迟(μs 级) | ❌ ms 级 | ✅ μs 级 |
8.5 事件驱动架构(EDA)
8.5.1 事件驱动的核心思想
命令式(Imperative) 事件驱动(Event-Driven)
─────────────────── ──────────────────────
订单服务 订单服务
"你应该扣库存" "订单已创建"(不知道谁关心)
│ │
▼ ▼
库存服务 [事件总线]
(强耦合,订单服务知道库存服务) ├──▶ 库存服务(自己订阅)
├──▶ 支付服务(自己订阅)
└──▶ 通知服务(自己订阅)
(松耦合,生产者不知道消费者)
8.5.2 事件设计原则
| 原则 | 说明 | 示例 |
|---|
| 事件自描述 | 事件包含足够的上下文 | OrderCreated{orderId, items, amount, customer} |
| 事件不可变 | 事件一旦发布不能修改 | 使用过去时命名 |
| 事件可重放 | 新消费者可以从头消费 | Kafka 保留期 |
| 事件版本化 | 事件结构变更需要版本管理 | OrderCreated_v2 |
| 携带因果 ID | 便于追踪事件链 | correlationId |
8.5.3 事件 Schema 管理
// Schema Registry 中注册的事件 Schema
{
"type": "record",
"name": "OrderCreated",
"namespace": "com.example.events",
"version": "2",
"fields": [
{"name": "eventId", "type": "string"},
{"name": "eventType", "type": "string", "default": "OrderCreated"},
{"name": "timestamp", "type": "long"},
{"name": "orderId", "type": "string"},
{"name": "customerId", "type": "string"},
{"name": "totalAmount", "type": "double"},
{"name": "currency", "type": "string", "default": "CNY"},
{
"name": "items",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "OrderItem",
"fields": [
{"name": "productId", "type": "string"},
{"name": "quantity", "type": "int"},
{"name": "price", "type": "double"}
]
}
}
}
]
}
8.6 最终一致性模式
8.6.1 事务性发件箱(Transactional Outbox)
问题:如何保证"写数据库"和"发消息"同时成功?
❌ 不可靠的方式:
写数据库 ──成功──▶ 发消息 ──失败──▶ 数据库已写,消息没发(不一致)
✅ 事务性发件箱:
┌─────────────────────────────────────────────────┐
│ 数据库事务 │
│ ┌────────────────────────────────────────────┐ │
│ │ INSERT INTO orders ... │ │
│ │ INSERT INTO outbox (event_type, payload) │ │ ← 同一个事务
│ └────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────┐
│ Outbox Poller / CDC │
│ 读取 outbox 表 → 发送到消息队列 │
└──────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────┐
│ 消息队列 (Kafka) │
└──────────────────────────────────────────────────┘
8.6.2 实现代码示例
@Transactional
public Order createOrder(CreateOrderCommand command) {
// 1. 创建订单(写入 orders 表)
Order order = orderRepository.save(new Order(command));
// 2. 将事件写入 outbox 表(同一个事务)
OutboxEvent event = new OutboxEvent(
"OrderCreated",
orderId,
objectMapper.writeValueAsString(new OrderCreatedEvent(order))
);
outboxRepository.save(event);
// 事务提交后,outbox poller 会读取并发送到 Kafka
return order;
}
8.6.3 消费者幂等处理
消息队列至少投递一次(At-Least-Once),消费者必须幂等:
public void handleOrderCreated(OrderCreatedEvent event) {
// 幂等检查:根据消息 ID 判断是否已处理
if (messageProcessedRepository.exists(event.getEventId())) {
log.info("消息已处理,跳过: {}", event.getEventId());
return;
}
try {
// 业务处理
inventoryService.deductStock(event.getItems());
// 记录已处理的消息
messageProcessedRepository.save(new ProcessedMessage(event.getEventId()));
} catch (Exception e) {
log.error("处理失败,等待重试", e);
throw e; // 抛出异常,消息会重新投递
}
}
8.7 业务场景:电商订单事件流
┌──────────────────────────────────────────────────────────────┐
│ 订单创建完整事件流 │
├──────────────────────────────────────────────────────────────┤
│ │
│ 1. 用户下单 │
│ ┌──────────┐ │
│ │ 订单服务 │ 写入订单 + 写入 outbox │
│ └────┬─────┘ │
│ │ OrderCreated │
│ ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Kafka: order-events │ │
│ └──┬──────────┬──────────┬──────────┬────────────────┘ │
│ │ │ │ │ │
│ ▼ ▼ ▼ ▼ │
│ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │支付 │ │库存 │ │通知 │ │积分 │ │
│ │服务 │ │服务 │ │服务 │ │服务 │ │
│ └──┬───┘ └──┬───┘ └──┬───┘ └──┬───┘ │
│ │ │ │ │ │
│ │ PaymentCompleted │ │ │
│ ▼ │ │ │ │
│ ┌────────┐ │ │ │ │
│ │ order- │ │ │ │ │
│ │ events │ │ │ │ │
│ └──┬─────┘ │ │ │ │
│ │ ▼ ▼ │ │
│ │ ┌──────┐ ┌──────┐ │ │
│ │ │库存 │ │订单 │ │ │
│ │ │扣减 │ │更新 │ │ │
│ │ └──────┘ └──────┘ │ │
│ │ │ │
│ │ OrderShipped │ │
│ ▼ ▼ │
│ ┌──────┐ ┌──────┐ │
│ │通知 │ │积分 │ │
│ │服务 │ │增加 │ │
│ └──────┘ └──────┘ │
└──────────────────────────────────────────────────────────┘
⚠️ 注意事项
- 消息不是万能的——不要把所有通信都改成异步,有些场景需要同步响应
- 消费者必须幂等——At-Least-Once 语义下消息可能重复
- 消息顺序性——Kafka 只保证分区内有序,需要有序的消息用相同 key
- 死信队列——消费多次失败的消息要进入死信队列,不能无限重试
- 消息体大小——大消息(> 1MB)考虑存外部存储,消息只传引用
- 监控消费延迟——Consumer Lag 是重要的运维指标
📖 扩展阅读
- Apache Kafka Documentation (kafka.apache.org) — Kafka 官方文档
- RabbitMQ Documentation (rabbitmq.com) — RabbitMQ 官方文档
- Designing Event-Driven Systems — Ben Stopford (Confluent) — 免费电子书
- Microservices Patterns Chapter 6 — Chris Richardson — 事务性发件箱模式
- Building Event-Driven Microservices — Adam Bellemare — 事件驱动架构实践
本章小结
| 要点 | 说明 |
|---|
| MQ 作用 | 异步处理、应用解耦、削峰填谷 |
| Kafka vs RabbitMQ | Kafka 适合高吞吐/流处理,RabbitMQ 适合复杂路由 |
| 事件设计 | 自描述、不可变、可重放、版本化 |
| 最终一致性 | 事务性发件箱 + 消费者幂等 |
| 关键指标 | 消费延迟 (Lag)、消息积压、消费失败率 |
📌 下一章:第 09 章:服务网格 — 用服务网格(Istio/Linkerd)管理服务间通信。