Architecture · #distributed-transaction#saga#2pc#tcc

分布式事务解决方案全景分析

2022.11.08 11 min 4.3k
// 目录 · contents

前言

在微服务架构中,一个业务操作往往需要跨多个服务完成,每个服务有独立的数据库。传统的数据库事务(ACID)无法跨服务使用,如何保证跨服务数据的一致性,是分布式系统中最具挑战性的问题之一。本文将全面分析主流的分布式事务解决方案,帮助你根据业务场景选择合适的方案。

分布式事务的挑战

典型场景:电商下单

graph LR
    Client[用户] --> OrderService[订单服务]
    OrderService --> |1. 创建订单| OrderDB[(订单DB)]
    OrderService --> |2. 扣减库存| InventoryService[库存服务]
    InventoryService --> InventoryDB[(库存DB)]
    OrderService --> |3. 扣款| PaymentService[支付服务]
    PaymentService --> PaymentDB[(支付DB)]
    OrderService --> |4. 增加积分| PointService[积分服务]
    PointService --> PointDB[(积分DB)]

这个流程中,如果支付成功但库存扣减失败,系统就会处于不一致状态。我们需要一种机制来保证”要么全部成功,要么全部回滚”。

CAP 与 BASE

根据 CAP 定理,分布式系统无法同时满足一致性(C)、可用性(A)和分区容忍性(P)。在实际系统中,网络分区不可避免(P 必须保证),因此需要在 C 和 A 之间取舍。

大多数分布式事务方案选择了 BASE 模型: - Basically Available:基本可用 - Soft state:软状态,允许中间状态 - Eventually consistent:最终一致性

2PC(两阶段提交)

协议流程

sequenceDiagram
    participant TM as 事务管理器(协调者)
    participant RM1 as 资源管理器1
    participant RM2 as 资源管理器2
    participant RM3 as 资源管理器3

    Note over TM: Phase 1: Prepare(投票)
    TM->>RM1: Prepare
    TM->>RM2: Prepare
    TM->>RM3: Prepare

    RM1-->>TM: Yes (已写undo/redo log)
    RM2-->>TM: Yes
    RM3-->>TM: Yes

    Note over TM: 所有参与者都投Yes
    Note over TM: Phase 2: Commit

    TM->>RM1: Commit
    TM->>RM2: Commit
    TM->>RM3: Commit

    RM1-->>TM: ACK
    RM2-->>TM: ACK
    RM3-->>TM: ACK

失败回滚场景

sequenceDiagram
    participant TM as 事务管理器
    participant RM1 as 资源管理器1
    participant RM2 as 资源管理器2
    participant RM3 as 资源管理器3

    Note over TM: Phase 1: Prepare
    TM->>RM1: Prepare
    TM->>RM2: Prepare
    TM->>RM3: Prepare

    RM1-->>TM: Yes
    RM2-->>TM: No (资源不足)
    RM3-->>TM: Yes

    Note over TM: 有参与者投No
    Note over TM: Phase 2: Rollback

    TM->>RM1: Rollback
    TM->>RM2: Rollback
    TM->>RM3: Rollback

2PC 的问题

  1. 同步阻塞:Prepare 后资源被锁定,直到 Commit/Rollback
  2. 单点故障:协调者宕机,参与者无法推进
  3. 数据不一致:Phase 2 中部分参与者收到 Commit,部分未收到
  4. 性能差:全局锁导致并发能力低

3PC(三阶段提交)

3PC 在 2PC 基础上增加了 CanCommit 阶段和超时机制。

sequenceDiagram
    participant TM as 事务管理器
    participant RM1 as 参与者1
    participant RM2 as 参与者2

    Note over TM: Phase 1: CanCommit(探测)
    TM->>RM1: CanCommit?
    TM->>RM2: CanCommit?
    RM1-->>TM: Yes
    RM2-->>TM: Yes

    Note over TM: Phase 2: PreCommit(预提交)
    TM->>RM1: PreCommit
    TM->>RM2: PreCommit
    RM1-->>TM: ACK (写undo/redo log)
    RM2-->>TM: ACK

    Note over TM: Phase 3: DoCommit(正式提交)
    TM->>RM1: DoCommit
    TM->>RM2: DoCommit
    RM1-->>TM: Done
    RM2-->>TM: Done

3PC 引入超时机制:如果参与者在 PreCommit 后超时未收到 DoCommit,会自动提交。这降低了阻塞概率,但在网络分区时仍可能导致不一致。实际应用中 3PC 很少被采用。

TCC 模式

TCC(Try-Confirm-Cancel)是应用层的两阶段提交,将事务逻辑交给业务代码实现。

三个阶段

sequenceDiagram
    participant App as 业务应用
    participant Order as 订单服务
    participant Payment as 支付服务
    participant Inventory as 库存服务

    Note over App: Try 阶段: 资源预留
    App->>Order: try: 创建订单(TRYING状态)
    App->>Payment: try: 冻结金额
    App->>Inventory: try: 冻结库存

    alt 全部Try成功
        Note over App: Confirm 阶段: 确认执行
        App->>Order: confirm: 订单→CONFIRMED
        App->>Payment: confirm: 扣除冻结金额
        App->>Inventory: confirm: 扣除冻结库存
    else 任一Try失败
        Note over App: Cancel 阶段: 回滚释放
        App->>Order: cancel: 取消订单
        App->>Payment: cancel: 解冻金额
        App->>Inventory: cancel: 释放冻结库存
    end

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// TCC 接口定义
public interface InventoryTccService {
/**
* Try: 冻结库存
* 可用库存 -N, 冻结库存 +N
*/
@TwoPhaseBusinessAction(name = "inventoryTry",
commitMethod = "confirm", rollbackMethod = "cancel")
boolean tryFreezeStock(BusinessActionContext ctx,
@BusinessActionContextParameter(paramName = "productId") String productId,
@BusinessActionContextParameter(paramName = "quantity") int quantity);

/**
* Confirm: 扣除冻结库存
* 冻结库存 -N
*/
boolean confirm(BusinessActionContext ctx);

/**
* Cancel: 释放冻结库存
* 可用库存 +N, 冻结库存 -N
*/
boolean cancel(BusinessActionContext ctx);
}

@Service
public class InventoryTccServiceImpl implements InventoryTccService {
@Autowired
private InventoryMapper inventoryMapper;

@Override
@Transactional
public boolean tryFreezeStock(BusinessActionContext ctx,
String productId, int quantity) {
// 检查并冻结库存
int affected = inventoryMapper.freezeStock(productId, quantity);
if (affected == 0) {
throw new StockNotEnoughException(productId);
}
return true;
}

@Override
@Transactional
public boolean confirm(BusinessActionContext ctx) {
String productId = ctx.getActionContext("productId").toString();
int quantity = (int) ctx.getActionContext("quantity");

// 扣除冻结库存
inventoryMapper.deductFrozenStock(productId, quantity);
return true;
}

@Override
@Transactional
public boolean cancel(BusinessActionContext ctx) {
String productId = ctx.getActionContext("productId").toString();
int quantity = (int) ctx.getActionContext("quantity");

// 释放冻结库存
inventoryMapper.releaseFrozenStock(productId, quantity);
return true;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
-- 库存表需要支持 TCC 的冻结字段
CREATE TABLE inventory (
product_id VARCHAR(36) PRIMARY KEY,
available_stock INT NOT NULL,
frozen_stock INT NOT NULL DEFAULT 0,
version INT NOT NULL DEFAULT 0
);

-- Try: 冻结库存
UPDATE inventory
SET available_stock = available_stock - #{quantity},
frozen_stock = frozen_stock + #{quantity},
version = version + 1
WHERE product_id = #{productId}
AND available_stock >= #{quantity}
AND version = #{version};

-- Confirm: 扣除冻结库存
UPDATE inventory
SET frozen_stock = frozen_stock - #{quantity}
WHERE product_id = #{productId};

-- Cancel: 释放冻结库存
UPDATE inventory
SET available_stock = available_stock + #{quantity},
frozen_stock = frozen_stock - #{quantity}
WHERE product_id = #{productId};

TCC 的注意事项

  • 空回滚:Try 未执行但 Cancel 被调用,Cancel 需要识别并直接返回成功
  • 幂等性:Confirm 和 Cancel 可能被重复调用,必须保证幂等
  • 悬挂问题:Cancel 先于 Try 执行,后续 Try 不应再执行

Saga 模式

编排式 Saga(Orchestration)

graph TD
    Start[开始] --> S1[创建订单]
    S1 -->|成功| S2[扣减库存]
    S2 -->|成功| S3[扣款]
    S3 -->|成功| S4[发送通知]
    S4 --> End[完成]

    S1 -->|失败| End2[结束]
    S2 -->|失败| C1[取消订单]
    S3 -->|失败| C2[恢复库存]
    C2 --> C1
    C1 --> End2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// Saga 编排器定义
@Component
public class PlaceOrderSaga {

@Bean
public Saga<PlaceOrderState> placeOrderSaga() {
return Saga.<PlaceOrderState>builder()
.addStep("create-order")
.action(this::createOrder)
.compensation(this::cancelOrder)
.addStep("reserve-inventory")
.action(this::reserveInventory)
.compensation(this::releaseInventory)
.addStep("process-payment")
.action(this::processPayment)
.compensation(this::refundPayment)
.addStep("send-notification")
.action(this::sendNotification)
// 通知步骤无需补偿
.build();
}

private StepResult createOrder(PlaceOrderState state) {
String orderId = orderService.create(state.getCustomerId(), state.getItems());
state.setOrderId(orderId);
return StepResult.success();
}

private StepResult cancelOrder(PlaceOrderState state) {
orderService.cancel(state.getOrderId());
return StepResult.success();
}

private StepResult reserveInventory(PlaceOrderState state) {
boolean reserved = inventoryService.reserve(
state.getOrderId(), state.getItems());
return reserved ? StepResult.success() : StepResult.failure("Stock not enough");
}

private StepResult releaseInventory(PlaceOrderState state) {
inventoryService.release(state.getOrderId());
return StepResult.success();
}
}

协同式 Saga(Choreography)

sequenceDiagram
    participant OS as 订单服务
    participant MQ as 消息队列
    participant IS as 库存服务
    participant PS as 支付服务
    participant NS as 通知服务

    OS->>MQ: OrderCreated
    MQ->>IS: OrderCreated

    IS->>MQ: InventoryReserved
    MQ->>PS: InventoryReserved

    PS->>MQ: PaymentProcessed
    MQ->>OS: PaymentProcessed
    MQ->>NS: PaymentProcessed

    Note over OS: 更新订单状态为已完成
    Note over NS: 发送订单确认通知

    Note over PS: --- 如果支付失败 ---
    PS->>MQ: PaymentFailed
    MQ->>IS: PaymentFailed
    Note over IS: 释放库存
    MQ->>OS: PaymentFailed
    Note over OS: 取消订单

本地消息表

本地消息表是实现最终一致性的经典方案,通过将消息与业务操作放在同一个本地事务中来保证可靠性。

sequenceDiagram
    participant OrderService as 订单服务
    participant DB as 订单DB
    participant MsgTable as 消息表(同DB)
    participant Timer as 定时任务
    participant MQ as 消息队列
    participant InventoryService as 库存服务

    OrderService->>DB: BEGIN TRANSACTION
    OrderService->>DB: INSERT 订单
    OrderService->>MsgTable: INSERT 消息(status=NEW)
    OrderService->>DB: COMMIT

    Timer->>MsgTable: 扫描 status=NEW 的消息
    Timer->>MQ: 发送消息
    Timer->>MsgTable: 更新 status=SENT

    MQ->>InventoryService: 消费消息
    InventoryService->>InventoryService: 扣减库存
    InventoryService-->>MQ: ACK

    Note over Timer: 超时未ACK则重新发送
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Service
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private OutboxMessageMapper outboxMapper;

@Transactional
public String createOrder(CreateOrderRequest request) {
// 1. 创建订单
Order order = new Order(request);
orderMapper.insert(order);

// 2. 写入消息表(同一事务)
OutboxMessage message = new OutboxMessage();
message.setId(UUID.randomUUID().toString());
message.setTopic("order-events");
message.setKey(order.getId());
message.setPayload(JsonUtils.toJson(new OrderCreatedEvent(order)));
message.setStatus("NEW");
message.setCreatedAt(LocalDateTime.now());
outboxMapper.insert(message);

return order.getId();
}
}

// 定时任务扫描并发送消息
@Component
public class OutboxMessagePublisher {
@Scheduled(fixedDelay = 1000)
@Transactional
public void publishPendingMessages() {
List<OutboxMessage> messages = outboxMapper
.findByStatus("NEW", 100);

for (OutboxMessage msg : messages) {
try {
kafkaTemplate.send(msg.getTopic(), msg.getKey(), msg.getPayload());
outboxMapper.updateStatus(msg.getId(), "SENT");
} catch (Exception e) {
log.warn("Failed to send message: {}", msg.getId(), e);
outboxMapper.incrementRetryCount(msg.getId());
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
-- 本地消息表
CREATE TABLE outbox_message (
id VARCHAR(36) PRIMARY KEY,
topic VARCHAR(100) NOT NULL,
msg_key VARCHAR(100),
payload TEXT NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'NEW',
retry_count INT NOT NULL DEFAULT 0,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
INDEX idx_status_created (status, created_at)
);

MQ 事务消息

RocketMQ 的事务消息提供了类似本地消息表的能力,但无需自己维护消息表。

sequenceDiagram
    participant Producer as 生产者
    participant MQ as RocketMQ Broker
    participant DB as 本地数据库
    participant Consumer as 消费者

    Producer->>MQ: 1. 发送半消息(Half Message)
    MQ-->>Producer: 2. 半消息发送成功

    Producer->>DB: 3. 执行本地事务
    alt 本地事务成功
        Producer->>MQ: 4a. Commit
        MQ->>Consumer: 5. 投递消息
    else 本地事务失败
        Producer->>MQ: 4b. Rollback
        MQ->>MQ: 丢弃半消息
    end

    Note over MQ: 如果长时间未收到确认
    MQ->>Producer: 回查本地事务状态
    Producer->>DB: 查询事务执行结果
    Producer->>MQ: 返回 Commit/Rollback
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// RocketMQ 事务消息生产者
@Component
public class OrderTransactionProducer {
@Autowired
private TransactionMQProducer producer;

public void createOrderWithTransaction(CreateOrderRequest request) {
Message msg = new Message(
"order-events",
"OrderCreated",
JsonUtils.toJson(request).getBytes()
);

// 发送事务消息
producer.sendMessageInTransaction(msg, request);
}
}

// 事务监听器
@Component
public class OrderTransactionListener implements TransactionListener {
@Autowired
private OrderService orderService;

@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
CreateOrderRequest request = (CreateOrderRequest) arg;
try {
orderService.createOrder(request);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
log.error("Create order failed", e);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 回查本地事务状态
String orderId = msg.getProperty("orderId");
Order order = orderService.findById(orderId);
if (order != null) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.UNKNOW; // 让 Broker 继续回查
}
}

Seata AT 模式

Seata 是阿里巴巴开源的分布式事务框架,AT(Auto Transaction)模式通过自动生成回滚日志来实现分布式事务,对业务代码侵入最小。

sequenceDiagram
    participant TM as TM(事务管理器)
    participant TC as TC(事务协调器)
    participant RM1 as RM(订单服务)
    participant RM2 as RM(库存服务)

    TM->>TC: 1. Begin 全局事务, 获取 XID
    TC-->>TM: XID = 192.168.1.1:8091:123456

    TM->>RM1: 2. 调用订单服务(携带XID)
    RM1->>RM1: 执行SQL, 自动生成<br/>before/after image
    RM1->>TC: 3. 注册分支事务
    TC-->>RM1: Branch ID

    TM->>RM2: 4. 调用库存服务(携带XID)
    RM2->>RM2: 执行SQL, 自动生成<br/>before/after image
    RM2->>TC: 5. 注册分支事务
    TC-->>RM2: Branch ID

    alt 全部成功
        TM->>TC: 6a. Commit 全局事务
        TC->>RM1: 异步删除 undo log
        TC->>RM2: 异步删除 undo log
    else 任一失败
        TM->>TC: 6b. Rollback 全局事务
        TC->>RM1: 回滚: 用 before image 恢复数据
        TC->>RM2: 回滚: 用 before image 恢复数据
    end
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// Seata AT 模式使用 - 只需一个注解
@Service
public class OrderService {

@GlobalTransactional(name = "create-order", timeoutMills = 60000)
public String createOrder(CreateOrderRequest request) {
// 1. 创建订单(本地事务,Seata 自动拦截)
Order order = new Order(request);
orderMapper.insert(order);

// 2. 远程调用扣减库存
inventoryClient.deduct(order.getProductId(), order.getQuantity());

// 3. 远程调用扣款
paymentClient.pay(order.getCustomerId(), order.getTotalAmount());

return order.getId();
}
}

// application.yml
// seata:
// enabled: true
// tx-service-group: my_tx_group
// service:
// vgroup-mapping:
// my_tx_group: default
// registry:
// type: nacos
// nacos:
// server-addr: localhost:8848
1
2
3
4
5
6
7
8
9
10
11
-- Seata AT 模式需要在每个业务数据库中创建 undo_log 表
CREATE TABLE undo_log (
branch_id BIGINT NOT NULL COMMENT '分支事务ID',
xid VARCHAR(128) NOT NULL COMMENT '全局事务ID',
context VARCHAR(128) NOT NULL COMMENT '上下文',
rollback_info LONGBLOB NOT NULL COMMENT '回滚数据',
log_status INT NOT NULL COMMENT '状态 0-正常 1-已清理',
log_created DATETIME NOT NULL,
log_modified DATETIME NOT NULL,
UNIQUE KEY ux_undo_log (xid, branch_id)
) ENGINE = InnoDB;

方案对比与选型

方案 一致性 性能 复杂度 侵入性 适用场景
2PC 强一致 数据库层面,跨库事务
TCC 最终一致 资金、库存等核心场景
Saga 最终一致 长事务,多步骤业务流程
本地消息表 最终一致 异步通知场景
MQ事务消息 最终一致 基于消息的异步协作
Seata AT 最终一致 极低 快速接入,中等并发
graph TD
    Start[选择分布式事务方案] --> Q1{是否需要强一致性?}
    Q1 -->|是| TwoPC[2PC/XA]
    Q1 -->|否,最终一致即可| Q2{业务侵入可接受?}

    Q2 -->|接受高侵入| Q3{资金等关键场景?}
    Q3 -->|是| TCC[TCC模式]
    Q3 -->|否| Saga[Saga模式]

    Q2 -->|希望低侵入| Q4{是否使用MQ?}
    Q4 -->|是| MQTx[MQ事务消息]
    Q4 -->|否| Q5{想快速接入?}
    Q5 -->|是| Seata[Seata AT]
    Q5 -->|否| LocalMsg[本地消息表]

总结

没有万能的分布式事务方案,选择时需要综合考虑一致性要求、性能需求、开发复杂度和业务特点:

  • 强一致性场景(如跨库转账):2PC/XA
  • 资金核心场景(需要精确控制):TCC
  • 长流程业务(如订单履约):Saga
  • 异步解耦(不要求实时一致):本地消息表 / MQ 事务消息
  • 快速接入(低侵入、中等并发):Seata AT

在实践中,建议优先考虑最终一致性方案,只在业务确实需要时才引入强一致性方案。同时,要做好幂等设计和补偿机制,这是分布式事务可靠性的基石。


踩坑记录

订单服务拆分后,「下单 → 扣库存 → 扣积分」跨三个服务,我们最早引入了 Seata AT 模式。上线后压测发现 TPS 只有 300,远低于预期的 2000,瓶颈在库存服务大量出现「全局锁等待超时」。

根本原因:Seata AT 在 try 阶段会对修改的行加全局锁,高并发下锁竞争极其严重。我们当时用一个 inventory 表的同一行记录库存,所有订单都在抢同一把全局锁。

重构方案分两步:一是把「下单 + 扣库存」改为 TCC 模式,try 阶段只冻结库存(写入 freeze 字段),confirm 才真正扣减,cancel 释放冻结;二是「扣积分」改为本地消息表 + 异步消费,因为积分对实时性要求不高,允许最终一致。重构后 TPS 提升到 1800,全局锁等待彻底消失。

实测结果

8 核 16G,MySQL 5.7,并发下单压测

方案 TPS 全局锁超时率 数据一致性
Seata AT 模式 300 12% 强一致
TCC 模式(库存) 1,800 0% 强一致
本地消息表(积分) 不限流 不适用 最终一致,延迟 < 500ms

改造后稳定运行至今,数据不一致事件:0 起。

我的看法

大多数业务根本不需要强一致性的分布式事务。「下单扣库存」可以做成最终一致,「支付扣款」才需要强一致。

在引入 Seata 或 TCC 之前,先问自己:这个场景真的需要强一致吗? 能接受 500ms 内最终一致的话,本地消息表就够了,复杂度和性能代价比 2PC/TCC 小一个数量级。引入强一致性方案的代价不只是性能,还有代码复杂度和运维成本,团队要有能力维护才行。

作者 · authorzt
发布 · date2022-11-08
篇幅 · length4.3k 字 · 11 min
许可 · licenseCC BY-SA 4.0
$ echo "comments" · 评论