前言
消息队列是分布式系统中实现异步通信、削峰填谷和系统解耦的关键组件。Kafka、RocketMQ
和 Pulsar
是当前最主流的三个消息队列系统,它们各有特色和适用场景。本文将从架构、消息模型、存储机制、消费模式、可靠性保证等维度进行深入对比,帮助你做出合适的技术选型。
架构对比
Kafka 架构
graph TB
Producer[Producer] --> Broker1[Broker 1]
Producer --> Broker2[Broker 2]
Producer --> Broker3[Broker 3]
subgraph Kafka Cluster
Broker1 --> |Partition 0 Leader| P0L[P0]
Broker1 --> |Partition 2 Follower| P2F[P2]
Broker2 --> |Partition 1 Leader| P1L[P1]
Broker2 --> |Partition 0 Follower| P0F[P0]
Broker3 --> |Partition 2 Leader| P2L[P2]
Broker3 --> |Partition 1 Follower| P1F[P1]
end
ZK[ZooKeeper / KRaft] <--> Broker1
ZK <--> Broker2
ZK <--> Broker3
Broker1 --> Consumer[Consumer Group]
Broker2 --> Consumer
Broker3 --> Consumer
Kafka 的核心特点: - Broker 即存储:消息持久化在
Broker 本地磁盘 - Partition 分区:Topic 分为多个
Partition,实现并行处理 - ISR 机制:In-Sync Replicas
保证副本一致性 - KRaft 模式:新版本用 Raft 协议替代
ZooKeeper
RocketMQ 架构
graph TB
Producer[Producer] --> NameServer[NameServer 集群]
Consumer[Consumer] --> NameServer
NameServer --> |路由信息| Producer
NameServer --> |路由信息| Consumer
subgraph Broker Cluster
subgraph Broker Group 1
BM1[Broker-a Master]
BS1[Broker-a Slave]
BM1 -->|同步| BS1
end
subgraph Broker Group 2
BM2[Broker-b Master]
BS2[Broker-b Slave]
BM2 -->|同步| BS2
end
end
Producer -->|发送消息| BM1
Producer -->|发送消息| BM2
Consumer -->|消费消息| BM1
Consumer -->|消费消息| BM2
RocketMQ 的核心特点: -
NameServer:轻量级路由注册中心,无状态 -
CommitLog + ConsumeQueue:存储计算分离的两级存储 -
主从复制:同步/异步两种复制模式 -
事务消息:原生支持分布式事务消息
Pulsar 架构
graph TB
Producer[Producer] --> Broker1[Broker 1]
Producer --> Broker2[Broker 2]
Consumer[Consumer] --> Broker1
Consumer --> Broker2
subgraph Broker Layer(无状态)
Broker1[Broker 1<br/>计算层]
Broker2[Broker 2<br/>计算层]
end
Broker1 --> BK1[Bookie 1]
Broker1 --> BK2[Bookie 2]
Broker2 --> BK2
Broker2 --> BK3[Bookie 3]
subgraph BookKeeper(存储层)
BK1[Bookie 1]
BK2[Bookie 2]
BK3[Bookie 3]
end
ZK[ZooKeeper] <--> Broker1
ZK <--> Broker2
ZK <--> BK1
Pulsar 的核心特点: - 存储计算分离:Broker
无状态,存储由 BookKeeper 负责 - 分层存储:热数据在
BookKeeper,冷数据可卸载到对象存储 -
多租户:原生支持租户/命名空间隔离 -
统一消息模型:同时支持流式和队列语义
消息模型对比
Kafka
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| Properties props = new Properties(); props.put("bootstrap.servers", "kafka1:9092,kafka2:9092"); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName()); props.put("acks", "all"); props.put("retries", 3); props.put("enable.idempotence", true);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>( "order-topic", order.getId(), JsonUtils.toJson(order) );
producer.send(record, (metadata, exception) -> { if (exception == null) { log.info("Sent to partition={}, offset={}", metadata.partition(), metadata.offset()); } else { log.error("Send failed", exception); } });
Properties consumerProps = new Properties(); consumerProps.put("bootstrap.servers", "kafka1:9092"); consumerProps.put("group.id", "order-processor"); consumerProps.put("enable.auto.commit", false); consumerProps.put("auto.offset.reset", "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps); consumer.subscribe(List.of("order-topic"));
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { processOrder(record.value()); } consumer.commitSync(); }
|
RocketMQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| DefaultMQProducer producer = new DefaultMQProducer("order-producer-group"); producer.setNamesrvAddr("namesrv1:9876;namesrv2:9876"); producer.setSendMsgTimeout(3000); producer.setRetryTimesWhenSendFailed(3); producer.start();
Message msg = new Message( "order-topic", "OrderCreated", order.getId(), JsonUtils.toJson(order).getBytes(StandardCharsets.UTF_8) );
SendResult result = producer.send(msg); log.info("Send result: {}, msgId: {}", result.getSendStatus(), result.getMsgId());
msg.setDelayTimeLevel(3); producer.send(msg);
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order-consumer-group"); consumer.setNamesrvAddr("namesrv1:9876"); consumer.subscribe("order-topic", "OrderCreated || OrderPaid"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { for (MessageExt msg : msgs) { try { Order order = JsonUtils.fromJson(new String(msg.getBody()), Order.class); processOrder(order); } catch (Exception e) { log.error("Process message failed, msgId={}", msg.getMsgId(), e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; });
consumer.start();
|
Pulsar
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| PulsarClient client = PulsarClient.builder() .serviceUrl("pulsar://pulsar1:6650,pulsar2:6650") .build();
Producer<Order> producer = client.newProducer(Schema.JSON(Order.class)) .topic("persistent://public/default/orders") .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) .sendTimeout(30, TimeUnit.SECONDS) .blockIfQueueFull(true) .create();
producer.sendAsync(order).thenAccept(msgId -> { log.info("Published message: {}", msgId); }).exceptionally(ex -> { log.error("Publish failed", ex); return null; });
Consumer<Order> consumer = client.newConsumer(Schema.JSON(Order.class)) .topic("persistent://public/default/orders") .subscriptionName("order-processor") .subscriptionType(SubscriptionType.Key_Shared) .ackTimeout(30, TimeUnit.SECONDS) .subscribe();
while (true) { Message<Order> msg = consumer.receive(); try { processOrder(msg.getValue()); consumer.acknowledge(msg); } catch (Exception e) { consumer.negativeAcknowledge(msg); } }
|
存储机制对比
graph TB
subgraph "Kafka 存储"
KTopic[Topic] --> KP1[Partition 0]
KTopic --> KP2[Partition 1]
KP1 --> KSeg1[Segment 0<br/>.log + .index]
KP1 --> KSeg2[Segment 1<br/>.log + .index]
end
subgraph "RocketMQ 存储"
RCL[CommitLog<br/>所有消息顺序写入] --> RCQ1[ConsumeQueue<br/>Topic-A Queue 0]
RCL --> RCQ2[ConsumeQueue<br/>Topic-A Queue 1]
RCL --> RCQ3[ConsumeQueue<br/>Topic-B Queue 0]
end
subgraph "Pulsar 存储"
PTopic[Topic] --> PLedger1[Ledger 1]
PTopic --> PLedger2[Ledger 2]
PLedger1 --> PEntry[Entry/Fragment]
PEntry --> PBK1[Bookie 1]
PEntry --> PBK2[Bookie 2]
end
| 存储引擎 |
分区日志文件 |
CommitLog + ConsumeQueue |
BookKeeper (Ledger) |
| 写入方式 |
分区级顺序写 |
全局顺序写 CommitLog |
Ledger 顺序写 |
| 存储与计算 |
耦合(Broker 即存储) |
耦合 |
分离 |
| 分层存储 |
Tiered Storage(较新) |
不支持 |
原生支持(S3/HDFS) |
| 数据保留 |
基于时间/大小 |
基于时间 |
基于时间/大小/策略 |
消费模式
graph TB
subgraph Kafka消费模式
KCG[Consumer Group] --> KC1[Consumer 1<br/>Partition 0,1]
KCG --> KC2[Consumer 2<br/>Partition 2,3]
Note1[一个 Partition 只能被<br/>组内一个 Consumer 消费]
end
subgraph Pulsar订阅模式
direction TB
PE[Exclusive<br/>独占] --> PE1[1个Consumer]
PF[Failover<br/>灾备] --> PF1[主Consumer]
PF --> PF2[备Consumer]
PS[Shared<br/>共享] --> PS1[Consumer 1]
PS --> PS2[Consumer 2]
PS --> PS3[Consumer 3]
PKS[Key_Shared<br/>键共享] --> PKS1[Consumer 1<br/>key-a,key-b]
PKS --> PKS2[Consumer 2<br/>key-c,key-d]
end
顺序保证
graph LR
subgraph Kafka
KP[Producer] -->|相同Key| KPart[同一Partition]
KPart --> KC[单Consumer]
Note2[Partition内有序]
end
subgraph RocketMQ
RP[Producer] -->|MessageQueueSelector| RQ[同一Queue]
RQ --> RC[单线程消费]
Note3[Queue内有序]
end
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| producer.send(new ProducerRecord<>( "order-topic", orderId, orderEvent ));
producer.send(msg, (mqs, msg1, arg) -> { String orderId = (String) arg; int index = Math.abs(orderId.hashCode()) % mqs.size(); return mqs.get(index); }, orderId);
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> { for (MessageExt msg : msgs) { processOrderEvent(msg); } return ConsumeOrderlyStatus.SUCCESS; });
|
Exactly-Once 语义
| At-most-once |
最多一次,可能丢失 |
发后即忘,自动提交 offset |
| At-least-once |
至少一次,可能重复 |
消费后手动确认 |
| Exactly-once |
精确一次,不丢不重 |
幂等 + 事务 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| props.put("enable.idempotence", true); props.put("transactional.id", "order-tx-001");
KafkaProducer<String, String> producer = new KafkaProducer<>(props); producer.initTransactions();
try { producer.beginTransaction();
producer.send(new ProducerRecord<>("output-topic", key, value));
producer.sendOffsetsToTransaction( Map.of(new TopicPartition("input-topic", 0), new OffsetAndMetadata(offset + 1)), consumerGroupMetadata );
producer.commitTransaction(); } catch (Exception e) { producer.abortTransaction(); }
|
性能对比
基准测试参考数据
| 单 Broker 吞吐 |
~100万 msg/s |
~20万 msg/s |
~50万 msg/s |
| 端到端延迟 (P99) |
~5ms |
~3ms |
~5ms |
| Topic 数量影响 |
数百个后性能下降 |
可支持数万 |
支持百万级 |
| 消息大小 |
适合大消息 |
适合小消息 |
均衡 |
graph LR
subgraph 吞吐量 (msg/s)
K[Kafka: ~1M]
R[RocketMQ: ~200K]
P[Pulsar: ~500K]
end
subgraph Topic数量扩展性
K2[Kafka: 数百]
R2[RocketMQ: 数万]
P2[Pulsar: 百万级]
end
Kafka 在 Topic/Partition 数量较少时吞吐量最高,但大量 Topic
时性能会明显下降(因为每个 Partition 对应独立的日志文件,随机 IO
增加)。RocketMQ 使用全局 CommitLog,Topic
数量增长对写入性能影响小。Pulsar 的存储计算分离使其在 Topic
扩展性上最优。
特色功能对比
| 延迟消息 |
不支持 |
支持(固定级别/任意延迟) |
支持 |
| 事务消息 |
支持 |
支持(半消息) |
支持 |
| 消息回溯 |
按 offset/时间 |
按时间 |
按 messageId/时间 |
| 死信队列 |
不支持 |
支持 |
支持 |
| 消息过滤 |
不支持 |
Tag + SQL92 |
不支持 |
| Schema 管理 |
Schema Registry |
不支持 |
原生 Schema |
| 多租户 |
不支持 |
不支持 |
原生支持 |
| Geo 复制 |
MirrorMaker |
不支持 |
原生支持 |
| 流计算 |
Kafka Streams |
不支持 |
Pulsar Functions |
选型建议
graph TD
Start[选择消息队列] --> Q1{主要用途?}
Q1 -->|大数据/日志收集/流处理| Kafka[Apache Kafka]
Q1 -->|在线业务/电商交易| Q2{需要哪些特性?}
Q1 -->|云原生/多租户/海量Topic| Pulsar[Apache Pulsar]
Q2 -->|延迟消息/事务消息/消息过滤| RocketMQ[Apache RocketMQ]
Q2 -->|高吞吐/生态丰富| Kafka
Q2 -->|存储计算分离/弹性扩展| Pulsar
详细选型指南
选择 Kafka: - 日志收集与分析(ELK 链路) -
流式数据处理(配合 Flink/Spark) - 事件驱动架构(高吞吐场景) - 已有
Kafka 生态和运维经验
选择 RocketMQ: - 电商交易系统(订单、支付) -
需要延迟消息(如超时未支付取消) - 需要事务消息(分布式事务) - 需要按
Tag 过滤消息 - 阿里云生态用户
选择 Pulsar: - 需要多租户隔离 - 海量 Topic
场景(IoT) - 需要分层存储(冷热分离) - 需要跨地域复制 -
追求存储计算分离的云原生架构
生产环境配置建议
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
num.partitions: 6 default.replication.factor: 3 min.insync.replicas: 2 unclean.leader.election.enable: false log.retention.hours: 168
acks: all retries: 3 max.in.flight.requests.per.connection: 5 enable.idempotence: true
enable.auto.commit: false max.poll.records: 500 session.timeout.ms: 30000
|
总结
Kafka、RocketMQ 和 Pulsar 各有所长。Kafka
以极致吞吐和丰富的流处理生态见长,是大数据领域的事实标准。RocketMQ
在在线业务场景下表现出色,延迟消息和事务消息是其杀手锏。Pulsar
凭借存储计算分离的架构和原生多租户支持,在云原生场景下优势明显。选型时应根据业务需求、团队经验和现有基础设施综合考虑,避免盲目追新。