Architecture · #architecture#cqrs#event-sourcing

CQRS与事件溯源架构实践

2023.12.06 8 min 3.1k
// 目录 · contents

前言

传统的 CRUD 模式将读写操作混合在同一个模型中,当系统复杂度增长时,读写需求的差异会导致模型变得臃肿。CQRS(Command Query Responsibility Segregation)通过分离读写模型来解决这个问题,而事件溯源(Event Sourcing)则提供了一种全新的数据持久化范式——不存储状态,而是存储导致状态变化的事件序列。两者结合,构成了处理复杂业务领域的强大架构模式。

CQRS 模式

传统 CRUD 的问题

在传统架构中,同一个模型(通常是 ORM Entity)既负责处理写操作,又负责响应查询请求。

graph LR
    Client[客户端] --> API[API Layer]
    API --> Service[Service Layer]
    Service --> Repo[Repository]
    Repo --> DB[(数据库)]

    style DB fill:#f96

这种方式的问题在于:

  1. 读写冲突:复杂查询需要关联多个表,而写操作需要保持数据一致性,两者的优化方向相反
  2. 模型臃肿:领域模型为了满足查询需求而添加了大量计算属性
  3. 扩展受限:读写无法独立扩缩容

CQRS 架构

CQRS 的核心思想是将一个系统的操作分为两类:改变状态的命令(Command)和返回数据的查询(Query),使用不同的模型来处理它们。

graph TB
    Client[客户端]

    Client -->|写操作| CommandAPI[Command API]
    Client -->|读操作| QueryAPI[Query API]

    CommandAPI --> CommandHandler[Command Handler]
    CommandHandler --> WriteModel[写模型/领域模型]
    WriteModel --> WriteDB[(写数据库<br/>MySQL)]

    WriteDB -->|数据同步| ReadDB[(读数据库<br/>Elasticsearch)]

    QueryAPI --> QueryHandler[Query Handler]
    QueryHandler --> ReadModel[读模型/视图模型]
    ReadModel --> ReadDB

    style WriteDB fill:#f96
    style ReadDB fill:#6f9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// Command 定义
public record CreateOrderCommand(
String customerId,
List<OrderItemDTO> items,
String shippingAddress
) {}

public record CancelOrderCommand(String orderId, String reason) {}

// Command Handler
@Service
public class OrderCommandHandler {
private final OrderRepository orderRepository;
private final EventPublisher eventPublisher;

@CommandHandler
@Transactional
public String handle(CreateOrderCommand cmd) {
Order order = Order.create(
cmd.customerId(),
cmd.items(),
cmd.shippingAddress()
);
orderRepository.save(order);
eventPublisher.publish(order.getDomainEvents());
return order.getId();
}

@CommandHandler
@Transactional
public void handle(CancelOrderCommand cmd) {
Order order = orderRepository.findById(cmd.orderId())
.orElseThrow(() -> new OrderNotFoundException(cmd.orderId()));
order.cancel(cmd.reason());
orderRepository.save(order);
eventPublisher.publish(order.getDomainEvents());
}
}

// Query 定义与处理
public record GetOrderDetailQuery(String orderId) {}

@Service
public class OrderQueryHandler {
private final OrderReadRepository readRepository;

@QueryHandler
public OrderDetailView handle(GetOrderDetailQuery query) {
return readRepository.findById(query.orderId())
.orElseThrow(() -> new OrderNotFoundException(query.orderId()));
}
}

// 读模型 - 针对查询优化的扁平化视图
@Document(indexName = "orders")
public class OrderDetailView {
private String orderId;
private String customerName;
private String customerEmail;
private BigDecimal totalAmount;
private String status;
private List<OrderItemView> items;
private String shippingAddress;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
}

事件溯源(Event Sourcing)

核心理念

传统的数据存储方式是保存实体的最新状态。事件溯源反其道而行——不保存状态,只保存导致状态变化的事件。实体的当前状态通过重放所有历史事件来计算得出。

graph LR
    subgraph 传统方式
        A[Account] -->|UPDATE balance=800| DB1[(DB)]
    end

    subgraph 事件溯源
        E1[AccountCreated<br/>balance=1000] --> E2[MoneyWithdrawn<br/>amount=200]
        E2 --> E3[MoneyDeposited<br/>amount=500]
        E3 --> E4[MoneyWithdrawn<br/>amount=300]
        E4 --> Store[(Event Store)]
        Store -->|重放事件| State[当前余额: 1000]
    end

事件存储设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
-- Event Store 表结构
CREATE TABLE event_store (
id BIGSERIAL PRIMARY KEY,
aggregate_type VARCHAR(255) NOT NULL,
aggregate_id VARCHAR(36) NOT NULL,
sequence_number BIGINT NOT NULL,
event_type VARCHAR(255) NOT NULL,
event_data JSONB NOT NULL,
metadata JSONB,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),

UNIQUE (aggregate_id, sequence_number)
);

CREATE INDEX idx_event_store_aggregate
ON event_store (aggregate_id, sequence_number);

-- 快照表
CREATE TABLE snapshots (
aggregate_id VARCHAR(36) PRIMARY KEY,
aggregate_type VARCHAR(255) NOT NULL,
sequence_number BIGINT NOT NULL,
snapshot_data JSONB NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// 事件溯源聚合根
public abstract class EventSourcedAggregate {
private final List<DomainEvent> uncommittedEvents = new ArrayList<>();
private long version = -1;

// 从事件流中重建聚合
public static <T extends EventSourcedAggregate> T reconstitute(
Class<T> type, List<DomainEvent> events) {
T aggregate = instantiate(type);
for (DomainEvent event : events) {
aggregate.apply(event, false);
}
return aggregate;
}

// 应用新事件
protected void raiseEvent(DomainEvent event) {
apply(event, true);
}

private void apply(DomainEvent event, boolean isNew) {
// 通过反射调用对应的 when 方法
invokeWhenMethod(event);
version++;
if (isNew) {
uncommittedEvents.add(event);
}
}

public List<DomainEvent> getUncommittedEvents() {
return Collections.unmodifiableList(uncommittedEvents);
}

public void markEventsAsCommitted() {
uncommittedEvents.clear();
}
}

// 订单聚合根 - 事件溯源版本
public class Order extends EventSourcedAggregate {
private String orderId;
private String customerId;
private List<OrderItem> items;
private OrderStatus status;
private Money totalAmount;

public static Order create(String customerId, List<OrderItem> items) {
Order order = new Order();
order.raiseEvent(new OrderCreatedEvent(
UUID.randomUUID().toString(),
customerId,
items,
calculateTotal(items),
Instant.now()
));
return order;
}

public void cancel(String reason) {
if (status != OrderStatus.PLACED) {
throw new IllegalStateException("Cannot cancel order in status: " + status);
}
raiseEvent(new OrderCancelledEvent(orderId, reason, Instant.now()));
}

// Event handlers - 纯状态变更,无副作用
@SuppressWarnings("unused")
private void when(OrderCreatedEvent event) {
this.orderId = event.orderId();
this.customerId = event.customerId();
this.items = new ArrayList<>(event.items());
this.totalAmount = event.totalAmount();
this.status = OrderStatus.PLACED;
}

@SuppressWarnings("unused")
private void when(OrderCancelledEvent event) {
this.status = OrderStatus.CANCELLED;
}
}

Event Store 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Repository
public class JdbcEventStore implements EventStore {
private final JdbcTemplate jdbc;
private final ObjectMapper objectMapper;

@Override
@Transactional
public void saveEvents(String aggregateId, List<DomainEvent> events,
long expectedVersion) {
// 乐观并发控制
long currentVersion = getCurrentVersion(aggregateId);
if (currentVersion != expectedVersion) {
throw new ConcurrencyException(
"Expected version " + expectedVersion +
" but found " + currentVersion);
}

long seq = expectedVersion + 1;
for (DomainEvent event : events) {
jdbc.update("""
INSERT INTO event_store
(aggregate_type, aggregate_id, sequence_number,
event_type, event_data, metadata, created_at)
VALUES (?, ?, ?, ?, ?::jsonb, ?::jsonb, ?)
""",
event.getAggregateType(),
aggregateId,
seq++,
event.getClass().getSimpleName(),
objectMapper.writeValueAsString(event),
objectMapper.writeValueAsString(event.getMetadata()),
Instant.now()
);
}
}

@Override
public List<DomainEvent> getEvents(String aggregateId) {
return jdbc.query("""
SELECT event_type, event_data FROM event_store
WHERE aggregate_id = ?
ORDER BY sequence_number ASC
""",
(rs, rowNum) -> deserializeEvent(
rs.getString("event_type"),
rs.getString("event_data")
),
aggregateId
);
}
}

投影(Projection)

投影是将事件流转换为查询优化的读模型的过程。每个投影关注特定的事件子集,维护特定的视图。

graph LR
    ES[(Event Store)] --> P1[订单列表投影]
    ES --> P2[客户订单统计投影]
    ES --> P3[商品销量投影]

    P1 --> V1[(orders_list_view<br/>MySQL)]
    P2 --> V2[(customer_stats<br/>Redis)]
    P3 --> V3[(product_sales<br/>Elasticsearch)]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// 投影处理器
@Component
public class OrderListProjection {
private final OrderReadRepository readRepo;

@EventHandler
public void on(OrderCreatedEvent event) {
OrderListView view = new OrderListView();
view.setOrderId(event.orderId());
view.setCustomerId(event.customerId());
view.setTotalAmount(event.totalAmount().value());
view.setStatus("PLACED");
view.setItemCount(event.items().size());
view.setCreatedAt(event.occurredAt());
readRepo.save(view);
}

@EventHandler
public void on(OrderPaidEvent event) {
readRepo.updateStatus(event.orderId(), "PAID");
}

@EventHandler
public void on(OrderCancelledEvent event) {
readRepo.updateStatus(event.orderId(), "CANCELLED");
}

@EventHandler
public void on(OrderShippedEvent event) {
readRepo.updateStatusAndTracking(
event.orderId(), "SHIPPED", event.trackingNumber());
}
}

// 客户统计投影
@Component
public class CustomerStatsProjection {
private final RedisTemplate<String, Object> redis;

@EventHandler
public void on(OrderCreatedEvent event) {
String key = "customer:stats:" + event.customerId();
redis.opsForHash().increment(key, "totalOrders", 1);
redis.opsForHash().increment(key, "totalAmount",
event.totalAmount().value().longValue());
}

@EventHandler
public void on(OrderCancelledEvent event) {
String key = "customer:stats:" + event.customerId();
redis.opsForHash().increment(key, "cancelledOrders", 1);
}
}

快照机制

当聚合的事件数量很多时,每次重放所有事件会很慢。快照机制定期保存聚合的状态快照,重建时只需要从最近的快照开始重放后续事件。

sequenceDiagram
    participant Client
    participant Repo as Repository
    participant SS as Snapshot Store
    participant ES as Event Store

    Client->>Repo: 加载聚合(id=123)
    Repo->>SS: 获取最新快照
    SS-->>Repo: 快照(version=100, state)

    Repo->>ES: 获取version>100的事件
    ES-->>Repo: events[101..105]

    Repo->>Repo: 从快照恢复状态<br/>重放5个事件
    Repo-->>Client: 聚合(version=105)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class SnapshotRepository<T extends EventSourcedAggregate> {
private static final int SNAPSHOT_INTERVAL = 100;
private final EventStore eventStore;
private final SnapshotStore snapshotStore;

public T load(String aggregateId, Class<T> type) {
// 1. 尝试加载快照
Optional<Snapshot> snapshot = snapshotStore.getLatest(aggregateId);

List<DomainEvent> events;
T aggregate;

if (snapshot.isPresent()) {
// 2. 从快照恢复
aggregate = deserialize(snapshot.get().getData(), type);
// 3. 只加载快照之后的事件
events = eventStore.getEventsAfter(
aggregateId, snapshot.get().getVersion());
} else {
aggregate = instantiate(type);
events = eventStore.getEvents(aggregateId);
}

// 4. 重放事件
for (DomainEvent event : events) {
aggregate.apply(event, false);
}

return aggregate;
}

public void save(T aggregate) {
eventStore.saveEvents(
aggregate.getId(),
aggregate.getUncommittedEvents(),
aggregate.getVersion()
);

// 检查是否需要创建新快照
if (aggregate.getVersion() % SNAPSHOT_INTERVAL == 0) {
snapshotStore.save(new Snapshot(
aggregate.getId(),
aggregate.getVersion(),
serialize(aggregate)
));
}

aggregate.markEventsAsCommitted();
}
}

Axon Framework 实战

Axon Framework 是 Java 生态中最成熟的 CQRS + Event Sourcing 框架。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// 使用 Axon 的聚合根定义
@Aggregate
public class OrderAggregate {
@AggregateIdentifier
private String orderId;
private OrderStatus status;
private Money totalAmount;

@CommandHandler
public OrderAggregate(CreateOrderCommand cmd) {
// 验证业务规则
if (cmd.items().isEmpty()) {
throw new IllegalArgumentException("Order must have items");
}
// 发布事件
AggregateLifecycle.apply(new OrderCreatedEvent(
cmd.getOrderId(),
cmd.getCustomerId(),
cmd.getItems(),
calculateTotal(cmd.getItems())
));
}

@CommandHandler
public void handle(ConfirmOrderCommand cmd) {
if (status != OrderStatus.PLACED) {
throw new IllegalStateException("Order not in PLACED status");
}
AggregateLifecycle.apply(new OrderConfirmedEvent(orderId));
}

@EventSourcingHandler
public void on(OrderCreatedEvent event) {
this.orderId = event.getOrderId();
this.status = OrderStatus.PLACED;
this.totalAmount = event.getTotalAmount();
}

@EventSourcingHandler
public void on(OrderConfirmedEvent event) {
this.status = OrderStatus.CONFIRMED;
}

protected OrderAggregate() {} // Axon 需要无参构造器
}

// Axon 查询端投影
@Component
@ProcessingGroup("order-projection")
public class OrderProjection {
private final OrderViewRepository repository;

@EventHandler
public void on(OrderCreatedEvent event) {
OrderView view = new OrderView(
event.getOrderId(),
event.getCustomerId(),
event.getTotalAmount(),
"PLACED"
);
repository.save(view);
}

@EventHandler
public void on(OrderConfirmedEvent event) {
repository.findById(event.getOrderId())
.ifPresent(view -> {
view.setStatus("CONFIRMED");
repository.save(view);
});
}

@QueryHandler
public OrderView handle(FindOrderQuery query) {
return repository.findById(query.getOrderId())
.orElseThrow();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Axon 配置 (application.yml)
axon:
axonserver:
servers: localhost:8124
serializer:
general: jackson
messages: jackson
events: jackson
eventhandling:
processors:
order-projection:
mode: tracking
initial-segment-count: 2
batch-size: 100

CQRS + Event Sourcing 的数据流

graph TB
    Client[客户端] -->|Command| CmdBus[Command Bus]
    CmdBus --> CmdHandler[Command Handler]
    CmdHandler --> Aggregate[聚合根]
    Aggregate -->|产生事件| EventStore[(Event Store)]

    EventStore -->|发布事件| EventBus[Event Bus]
    EventBus --> Projection1[投影 1]
    EventBus --> Projection2[投影 2]
    EventBus --> Saga[Saga]

    Projection1 --> ReadDB1[(读数据库)]
    Projection2 --> ReadDB2[(搜索引擎)]

    Client -->|Query| QueryBus[Query Bus]
    QueryBus --> QueryHandler[Query Handler]
    QueryHandler --> ReadDB1
    QueryHandler --> ReadDB2

优缺点分析

优势

  1. 完整审计日志:事件流天然是审计日志,满足合规要求
  2. 时间旅行:可以重建任意时间点的状态
  3. 读写独立扩展:读模型和写模型可以独立优化和扩缩容
  4. 灵活的读模型:可以根据新的查询需求随时创建新的投影
  5. 事件驱动集成:事件流可以驱动其他系统的集成

挑战

  1. 最终一致性:读模型和写模型之间存在延迟,需要接受最终一致性
  2. 事件版本演进:事件 Schema 变更需要谨慎处理(upcasting)
  3. 学习曲线:团队需要转变思维模式
  4. 查询复杂性:需要精心设计投影来满足查询需求
  5. 事件存储增长:事件不可删除,存储会持续增长

适用场景

适合 不适合
复杂业务领域 简单 CRUD
需要审计日志 数据量小
读写比例差异大 强一致性要求
需要事件驱动集成 小团队 MVP
金融交易系统 实时查询要求极高

总结

CQRS 和事件溯源是应对复杂业务领域的有力工具。CQRS 通过分离读写关注点来简化模型设计,事件溯源通过存储事件而非状态来获得完整的变更历史。两者结合使用时威力最大,但也引入了显著的复杂性。建议从业务最复杂的核心域开始采用,而非在整个系统中全面铺开。在实施前,确保团队理解最终一致性的含义,并具备处理事件版本演进的能力。

作者 · authorzt
发布 · date2023-12-06
篇幅 · length3.1k 字 · 8 min
许可 · licenseCC BY-SA 4.0
$ echo "comments" · 评论