Architecture · #message-queue#kafka#rocketmq#pulsar

消息队列对比:Kafka vs RocketMQ vs Pulsar

2024.03.20 7 min 2.8k
// 目录 · contents

前言

消息队列是分布式系统中实现异步通信、削峰填谷和系统解耦的关键组件。Kafka、RocketMQ 和 Pulsar 是当前最主流的三个消息队列系统,它们各有特色和适用场景。本文将从架构、消息模型、存储机制、消费模式、可靠性保证等维度进行深入对比,帮助你做出合适的技术选型。

架构对比

Kafka 架构

graph TB
    Producer[Producer] --> Broker1[Broker 1]
    Producer --> Broker2[Broker 2]
    Producer --> Broker3[Broker 3]

    subgraph Kafka Cluster
        Broker1 --> |Partition 0 Leader| P0L[P0]
        Broker1 --> |Partition 2 Follower| P2F[P2]
        Broker2 --> |Partition 1 Leader| P1L[P1]
        Broker2 --> |Partition 0 Follower| P0F[P0]
        Broker3 --> |Partition 2 Leader| P2L[P2]
        Broker3 --> |Partition 1 Follower| P1F[P1]
    end

    ZK[ZooKeeper / KRaft] <--> Broker1
    ZK <--> Broker2
    ZK <--> Broker3

    Broker1 --> Consumer[Consumer Group]
    Broker2 --> Consumer
    Broker3 --> Consumer

Kafka 的核心特点: - Broker 即存储:消息持久化在 Broker 本地磁盘 - Partition 分区:Topic 分为多个 Partition,实现并行处理 - ISR 机制:In-Sync Replicas 保证副本一致性 - KRaft 模式:新版本用 Raft 协议替代 ZooKeeper

RocketMQ 架构

graph TB
    Producer[Producer] --> NameServer[NameServer 集群]
    Consumer[Consumer] --> NameServer

    NameServer --> |路由信息| Producer
    NameServer --> |路由信息| Consumer

    subgraph Broker Cluster
        subgraph Broker Group 1
            BM1[Broker-a Master]
            BS1[Broker-a Slave]
            BM1 -->|同步| BS1
        end
        subgraph Broker Group 2
            BM2[Broker-b Master]
            BS2[Broker-b Slave]
            BM2 -->|同步| BS2
        end
    end

    Producer -->|发送消息| BM1
    Producer -->|发送消息| BM2
    Consumer -->|消费消息| BM1
    Consumer -->|消费消息| BM2

RocketMQ 的核心特点: - NameServer:轻量级路由注册中心,无状态 - CommitLog + ConsumeQueue:存储计算分离的两级存储 - 主从复制:同步/异步两种复制模式 - 事务消息:原生支持分布式事务消息

Pulsar 架构

graph TB
    Producer[Producer] --> Broker1[Broker 1]
    Producer --> Broker2[Broker 2]
    Consumer[Consumer] --> Broker1
    Consumer --> Broker2

    subgraph Broker Layer(无状态)
        Broker1[Broker 1<br/>计算层]
        Broker2[Broker 2<br/>计算层]
    end

    Broker1 --> BK1[Bookie 1]
    Broker1 --> BK2[Bookie 2]
    Broker2 --> BK2
    Broker2 --> BK3[Bookie 3]

    subgraph BookKeeper(存储层)
        BK1[Bookie 1]
        BK2[Bookie 2]
        BK3[Bookie 3]
    end

    ZK[ZooKeeper] <--> Broker1
    ZK <--> Broker2
    ZK <--> BK1

Pulsar 的核心特点: - 存储计算分离:Broker 无状态,存储由 BookKeeper 负责 - 分层存储:热数据在 BookKeeper,冷数据可卸载到对象存储 - 多租户:原生支持租户/命名空间隔离 - 统一消息模型:同时支持流式和队列语义

消息模型对比

Kafka

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// Kafka Producer
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
props.put("acks", "all"); // 等待所有 ISR 副本确认
props.put("retries", 3);
props.put("enable.idempotence", true); // 幂等生产者

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

ProducerRecord<String, String> record = new ProducerRecord<>(
"order-topic", // topic
order.getId(), // key (决定分区)
JsonUtils.toJson(order) // value
);

producer.send(record, (metadata, exception) -> {
if (exception == null) {
log.info("Sent to partition={}, offset={}",
metadata.partition(), metadata.offset());
} else {
log.error("Send failed", exception);
}
});

// Kafka Consumer
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "kafka1:9092");
consumerProps.put("group.id", "order-processor");
consumerProps.put("enable.auto.commit", false);
consumerProps.put("auto.offset.reset", "earliest");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
consumer.subscribe(List.of("order-topic"));

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processOrder(record.value());
}
consumer.commitSync(); // 手动提交 offset
}

RocketMQ

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// RocketMQ Producer
DefaultMQProducer producer = new DefaultMQProducer("order-producer-group");
producer.setNamesrvAddr("namesrv1:9876;namesrv2:9876");
producer.setSendMsgTimeout(3000);
producer.setRetryTimesWhenSendFailed(3);
producer.start();

Message msg = new Message(
"order-topic", // topic
"OrderCreated", // tag
order.getId(), // key
JsonUtils.toJson(order).getBytes(StandardCharsets.UTF_8) // body
);

// 同步发送
SendResult result = producer.send(msg);
log.info("Send result: {}, msgId: {}", result.getSendStatus(), result.getMsgId());

// 延迟消息
msg.setDelayTimeLevel(3); // 延迟10s (1s 5s 10s 30s ...)
producer.send(msg);

// RocketMQ Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order-consumer-group");
consumer.setNamesrvAddr("namesrv1:9876");
consumer.subscribe("order-topic", "OrderCreated || OrderPaid");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
try {
Order order = JsonUtils.fromJson(new String(msg.getBody()), Order.class);
processOrder(order);
} catch (Exception e) {
log.error("Process message failed, msgId={}", msg.getMsgId(), e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 稍后重试
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

consumer.start();

Pulsar

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// Pulsar Producer
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://pulsar1:6650,pulsar2:6650")
.build();

Producer<Order> producer = client.newProducer(Schema.JSON(Order.class))
.topic("persistent://public/default/orders")
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
.sendTimeout(30, TimeUnit.SECONDS)
.blockIfQueueFull(true)
.create();

// 异步发送
producer.sendAsync(order).thenAccept(msgId -> {
log.info("Published message: {}", msgId);
}).exceptionally(ex -> {
log.error("Publish failed", ex);
return null;
});

// Pulsar Consumer - 支持多种订阅模式
Consumer<Order> consumer = client.newConsumer(Schema.JSON(Order.class))
.topic("persistent://public/default/orders")
.subscriptionName("order-processor")
.subscriptionType(SubscriptionType.Key_Shared) // 按 key 分发
.ackTimeout(30, TimeUnit.SECONDS)
.subscribe();

while (true) {
Message<Order> msg = consumer.receive();
try {
processOrder(msg.getValue());
consumer.acknowledge(msg);
} catch (Exception e) {
consumer.negativeAcknowledge(msg); // 否定确认,触发重投
}
}

存储机制对比

graph TB
    subgraph "Kafka 存储"
        KTopic[Topic] --> KP1[Partition 0]
        KTopic --> KP2[Partition 1]
        KP1 --> KSeg1[Segment 0<br/>.log + .index]
        KP1 --> KSeg2[Segment 1<br/>.log + .index]
    end

    subgraph "RocketMQ 存储"
        RCL[CommitLog<br/>所有消息顺序写入] --> RCQ1[ConsumeQueue<br/>Topic-A Queue 0]
        RCL --> RCQ2[ConsumeQueue<br/>Topic-A Queue 1]
        RCL --> RCQ3[ConsumeQueue<br/>Topic-B Queue 0]
    end

    subgraph "Pulsar 存储"
        PTopic[Topic] --> PLedger1[Ledger 1]
        PTopic --> PLedger2[Ledger 2]
        PLedger1 --> PEntry[Entry/Fragment]
        PEntry --> PBK1[Bookie 1]
        PEntry --> PBK2[Bookie 2]
    end
维度 Kafka RocketMQ Pulsar
存储引擎 分区日志文件 CommitLog + ConsumeQueue BookKeeper (Ledger)
写入方式 分区级顺序写 全局顺序写 CommitLog Ledger 顺序写
存储与计算 耦合(Broker 即存储) 耦合 分离
分层存储 Tiered Storage(较新) 不支持 原生支持(S3/HDFS)
数据保留 基于时间/大小 基于时间 基于时间/大小/策略

消费模式

graph TB
    subgraph Kafka消费模式
        KCG[Consumer Group] --> KC1[Consumer 1<br/>Partition 0,1]
        KCG --> KC2[Consumer 2<br/>Partition 2,3]
        Note1[一个 Partition 只能被<br/>组内一个 Consumer 消费]
    end

    subgraph Pulsar订阅模式
        direction TB
        PE[Exclusive<br/>独占] --> PE1[1个Consumer]
        PF[Failover<br/>灾备] --> PF1[主Consumer]
        PF --> PF2[备Consumer]
        PS[Shared<br/>共享] --> PS1[Consumer 1]
        PS --> PS2[Consumer 2]
        PS --> PS3[Consumer 3]
        PKS[Key_Shared<br/>键共享] --> PKS1[Consumer 1<br/>key-a,key-b]
        PKS --> PKS2[Consumer 2<br/>key-c,key-d]
    end

顺序保证

graph LR
    subgraph Kafka
        KP[Producer] -->|相同Key| KPart[同一Partition]
        KPart --> KC[单Consumer]
        Note2[Partition内有序]
    end

    subgraph RocketMQ
        RP[Producer] -->|MessageQueueSelector| RQ[同一Queue]
        RQ --> RC[单线程消费]
        Note3[Queue内有序]
    end
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Kafka 顺序消息
producer.send(new ProducerRecord<>(
"order-topic",
orderId, // 相同 orderId 路由到同一 partition
orderEvent
));

// RocketMQ 顺序消息
producer.send(msg, (mqs, msg1, arg) -> {
String orderId = (String) arg;
int index = Math.abs(orderId.hashCode()) % mqs.size();
return mqs.get(index); // 相同 orderId 路由到同一 queue
}, orderId);

// 顺序消费
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
for (MessageExt msg : msgs) {
processOrderEvent(msg);
}
return ConsumeOrderlyStatus.SUCCESS;
});

Exactly-Once 语义

级别 含义 实现方式
At-most-once 最多一次,可能丢失 发后即忘,自动提交 offset
At-least-once 至少一次,可能重复 消费后手动确认
Exactly-once 精确一次,不丢不重 幂等 + 事务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Kafka Exactly-Once (事务生产者 + 幂等消费者)
props.put("enable.idempotence", true);
props.put("transactional.id", "order-tx-001");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();

try {
producer.beginTransaction();

// 发送消息
producer.send(new ProducerRecord<>("output-topic", key, value));

// 提交消费者 offset(作为事务的一部分)
producer.sendOffsetsToTransaction(
Map.of(new TopicPartition("input-topic", 0),
new OffsetAndMetadata(offset + 1)),
consumerGroupMetadata
);

producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}

性能对比

基准测试参考数据

指标 Kafka RocketMQ Pulsar
单 Broker 吞吐 ~100万 msg/s ~20万 msg/s ~50万 msg/s
端到端延迟 (P99) ~5ms ~3ms ~5ms
Topic 数量影响 数百个后性能下降 可支持数万 支持百万级
消息大小 适合大消息 适合小消息 均衡
graph LR
    subgraph 吞吐量 (msg/s)
        K[Kafka: ~1M]
        R[RocketMQ: ~200K]
        P[Pulsar: ~500K]
    end

    subgraph Topic数量扩展性
        K2[Kafka: 数百]
        R2[RocketMQ: 数万]
        P2[Pulsar: 百万级]
    end

Kafka 在 Topic/Partition 数量较少时吞吐量最高,但大量 Topic 时性能会明显下降(因为每个 Partition 对应独立的日志文件,随机 IO 增加)。RocketMQ 使用全局 CommitLog,Topic 数量增长对写入性能影响小。Pulsar 的存储计算分离使其在 Topic 扩展性上最优。

特色功能对比

功能 Kafka RocketMQ Pulsar
延迟消息 不支持 支持(固定级别/任意延迟) 支持
事务消息 支持 支持(半消息) 支持
消息回溯 按 offset/时间 按时间 按 messageId/时间
死信队列 不支持 支持 支持
消息过滤 不支持 Tag + SQL92 不支持
Schema 管理 Schema Registry 不支持 原生 Schema
多租户 不支持 不支持 原生支持
Geo 复制 MirrorMaker 不支持 原生支持
流计算 Kafka Streams 不支持 Pulsar Functions

选型建议

graph TD
    Start[选择消息队列] --> Q1{主要用途?}

    Q1 -->|大数据/日志收集/流处理| Kafka[Apache Kafka]
    Q1 -->|在线业务/电商交易| Q2{需要哪些特性?}
    Q1 -->|云原生/多租户/海量Topic| Pulsar[Apache Pulsar]

    Q2 -->|延迟消息/事务消息/消息过滤| RocketMQ[Apache RocketMQ]
    Q2 -->|高吞吐/生态丰富| Kafka
    Q2 -->|存储计算分离/弹性扩展| Pulsar

详细选型指南

选择 Kafka: - 日志收集与分析(ELK 链路) - 流式数据处理(配合 Flink/Spark) - 事件驱动架构(高吞吐场景) - 已有 Kafka 生态和运维经验

选择 RocketMQ: - 电商交易系统(订单、支付) - 需要延迟消息(如超时未支付取消) - 需要事务消息(分布式事务) - 需要按 Tag 过滤消息 - 阿里云生态用户

选择 Pulsar: - 需要多租户隔离 - 海量 Topic 场景(IoT) - 需要分层存储(冷热分离) - 需要跨地域复制 - 追求存储计算分离的云原生架构

生产环境配置建议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Kafka 生产环境关键配置
# broker
num.partitions: 6
default.replication.factor: 3
min.insync.replicas: 2
unclean.leader.election.enable: false
log.retention.hours: 168 # 7天

# producer
acks: all
retries: 3
max.in.flight.requests.per.connection: 5
enable.idempotence: true

# consumer
enable.auto.commit: false
max.poll.records: 500
session.timeout.ms: 30000

总结

Kafka、RocketMQ 和 Pulsar 各有所长。Kafka 以极致吞吐和丰富的流处理生态见长,是大数据领域的事实标准。RocketMQ 在在线业务场景下表现出色,延迟消息和事务消息是其杀手锏。Pulsar 凭借存储计算分离的架构和原生多租户支持,在云原生场景下优势明显。选型时应根据业务需求、团队经验和现有基础设施综合考虑,避免盲目追新。

作者 · authorzt
发布 · date2024-03-20
篇幅 · length2.8k 字 · 7 min
许可 · licenseCC BY-SA 4.0
$ echo "comments" · 评论