前言
传统的 CRUD
模式将读写操作混合在同一个模型中,当系统复杂度增长时,读写需求的差异会导致模型变得臃肿。CQRS(Command
Query Responsibility
Segregation)通过分离读写模型来解决这个问题,而事件溯源(Event
Sourcing)则提供了一种全新的数据持久化范式——不存储状态,而是存储导致状态变化的事件序列。两者结合,构成了处理复杂业务领域的强大架构模式。
CQRS 模式
传统 CRUD 的问题
在传统架构中,同一个模型(通常是 ORM
Entity)既负责处理写操作,又负责响应查询请求。
graph LR
Client[客户端] --> API[API Layer]
API --> Service[Service Layer]
Service --> Repo[Repository]
Repo --> DB[(数据库)]
style DB fill:#f96
这种方式的问题在于:
读写冲突 :复杂查询需要关联多个表,而写操作需要保持数据一致性,两者的优化方向相反
模型臃肿 :领域模型为了满足查询需求而添加了大量计算属性
扩展受限 :读写无法独立扩缩容
CQRS 架构
CQRS
的核心思想是将一个系统的操作分为两类:改变状态的命令(Command) 和返回数据的查询(Query) ,使用不同的模型来处理它们。
graph TB
Client[客户端]
Client -->|写操作| CommandAPI[Command API]
Client -->|读操作| QueryAPI[Query API]
CommandAPI --> CommandHandler[Command Handler]
CommandHandler --> WriteModel[写模型/领域模型]
WriteModel --> WriteDB[(写数据库<br/>MySQL)]
WriteDB -->|数据同步| ReadDB[(读数据库<br/>Elasticsearch)]
QueryAPI --> QueryHandler[Query Handler]
QueryHandler --> ReadModel[读模型/视图模型]
ReadModel --> ReadDB
style WriteDB fill:#f96
style ReadDB fill:#6f9
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 public record CreateOrderCommand ( String customerId, List<OrderItemDTO> items, String shippingAddress ) {}public record CancelOrderCommand (String orderId, String reason) {}@Service public class OrderCommandHandler { private final OrderRepository orderRepository; private final EventPublisher eventPublisher; @CommandHandler @Transactional public String handle (CreateOrderCommand cmd) { Order order = Order.create( cmd.customerId(), cmd.items(), cmd.shippingAddress() ); orderRepository.save(order); eventPublisher.publish(order.getDomainEvents()); return order.getId(); } @CommandHandler @Transactional public void handle (CancelOrderCommand cmd) { Order order = orderRepository.findById(cmd.orderId()) .orElseThrow(() -> new OrderNotFoundException (cmd.orderId())); order.cancel(cmd.reason()); orderRepository.save(order); eventPublisher.publish(order.getDomainEvents()); } }public record GetOrderDetailQuery (String orderId) {}@Service public class OrderQueryHandler { private final OrderReadRepository readRepository; @QueryHandler public OrderDetailView handle (GetOrderDetailQuery query) { return readRepository.findById(query.orderId()) .orElseThrow(() -> new OrderNotFoundException (query.orderId())); } }@Document(indexName = "orders") public class OrderDetailView { private String orderId; private String customerName; private String customerEmail; private BigDecimal totalAmount; private String status; private List<OrderItemView> items; private String shippingAddress; private LocalDateTime createdAt; private LocalDateTime updatedAt; }
事件溯源(Event Sourcing)
核心理念
传统的数据存储方式是保存实体的最新状态。事件溯源反其道而行——不保存状态,只保存导致状态变化的事件。实体的当前状态通过重放所有历史事件来计算得出。
graph LR
subgraph 传统方式
A[Account] -->|UPDATE balance=800| DB1[(DB)]
end
subgraph 事件溯源
E1[AccountCreated<br/>balance=1000] --> E2[MoneyWithdrawn<br/>amount=200]
E2 --> E3[MoneyDeposited<br/>amount=500]
E3 --> E4[MoneyWithdrawn<br/>amount=300]
E4 --> Store[(Event Store)]
Store -->|重放事件| State[当前余额: 1000]
end
事件存储设计
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 CREATE TABLE event_store ( id BIGSERIAL PRIMARY KEY, aggregate_type VARCHAR (255 ) NOT NULL , aggregate_id VARCHAR (36 ) NOT NULL , sequence_number BIGINT NOT NULL , event_type VARCHAR (255 ) NOT NULL , event_data JSONB NOT NULL , metadata JSONB, created_at TIMESTAMP NOT NULL DEFAULT NOW(), UNIQUE (aggregate_id, sequence_number) );CREATE INDEX idx_event_store_aggregate ON event_store (aggregate_id, sequence_number);CREATE TABLE snapshots ( aggregate_id VARCHAR (36 ) PRIMARY KEY, aggregate_type VARCHAR (255 ) NOT NULL , sequence_number BIGINT NOT NULL , snapshot_data JSONB NOT NULL , created_at TIMESTAMP NOT NULL DEFAULT NOW() );
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 public abstract class EventSourcedAggregate { private final List<DomainEvent> uncommittedEvents = new ArrayList <>(); private long version = -1 ; public static <T extends EventSourcedAggregate > T reconstitute ( Class<T> type, List<DomainEvent> events) { T aggregate = instantiate(type); for (DomainEvent event : events) { aggregate.apply(event, false ); } return aggregate; } protected void raiseEvent (DomainEvent event) { apply(event, true ); } private void apply (DomainEvent event, boolean isNew) { invokeWhenMethod(event); version++; if (isNew) { uncommittedEvents.add(event); } } public List<DomainEvent> getUncommittedEvents () { return Collections.unmodifiableList(uncommittedEvents); } public void markEventsAsCommitted () { uncommittedEvents.clear(); } }public class Order extends EventSourcedAggregate { private String orderId; private String customerId; private List<OrderItem> items; private OrderStatus status; private Money totalAmount; public static Order create (String customerId, List<OrderItem> items) { Order order = new Order (); order.raiseEvent(new OrderCreatedEvent ( UUID.randomUUID().toString(), customerId, items, calculateTotal(items), Instant.now() )); return order; } public void cancel (String reason) { if (status != OrderStatus.PLACED) { throw new IllegalStateException ("Cannot cancel order in status: " + status); } raiseEvent(new OrderCancelledEvent (orderId, reason, Instant.now())); } @SuppressWarnings("unused") private void when (OrderCreatedEvent event) { this .orderId = event.orderId(); this .customerId = event.customerId(); this .items = new ArrayList <>(event.items()); this .totalAmount = event.totalAmount(); this .status = OrderStatus.PLACED; } @SuppressWarnings("unused") private void when (OrderCancelledEvent event) { this .status = OrderStatus.CANCELLED; } }
Event Store 实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 @Repository public class JdbcEventStore implements EventStore { private final JdbcTemplate jdbc; private final ObjectMapper objectMapper; @Override @Transactional public void saveEvents (String aggregateId, List<DomainEvent> events, long expectedVersion) { long currentVersion = getCurrentVersion(aggregateId); if (currentVersion != expectedVersion) { throw new ConcurrencyException ( "Expected version " + expectedVersion + " but found " + currentVersion); } long seq = expectedVersion + 1 ; for (DomainEvent event : events) { jdbc.update(""" INSERT INTO event_store (aggregate_type, aggregate_id, sequence_number, event_type, event_data, metadata, created_at) VALUES (?, ?, ?, ?, ?::jsonb, ?::jsonb, ?) """ , event.getAggregateType(), aggregateId, seq++, event.getClass().getSimpleName(), objectMapper.writeValueAsString(event), objectMapper.writeValueAsString(event.getMetadata()), Instant.now() ); } } @Override public List<DomainEvent> getEvents (String aggregateId) { return jdbc.query(""" SELECT event_type, event_data FROM event_store WHERE aggregate_id = ? ORDER BY sequence_number ASC """ , (rs, rowNum) -> deserializeEvent( rs.getString("event_type" ), rs.getString("event_data" ) ), aggregateId ); } }
投影(Projection)
投影是将事件流转换为查询优化的读模型的过程。每个投影关注特定的事件子集,维护特定的视图。
graph LR
ES[(Event Store)] --> P1[订单列表投影]
ES --> P2[客户订单统计投影]
ES --> P3[商品销量投影]
P1 --> V1[(orders_list_view<br/>MySQL)]
P2 --> V2[(customer_stats<br/>Redis)]
P3 --> V3[(product_sales<br/>Elasticsearch)]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 @Component public class OrderListProjection { private final OrderReadRepository readRepo; @EventHandler public void on (OrderCreatedEvent event) { OrderListView view = new OrderListView (); view.setOrderId(event.orderId()); view.setCustomerId(event.customerId()); view.setTotalAmount(event.totalAmount().value()); view.setStatus("PLACED" ); view.setItemCount(event.items().size()); view.setCreatedAt(event.occurredAt()); readRepo.save(view); } @EventHandler public void on (OrderPaidEvent event) { readRepo.updateStatus(event.orderId(), "PAID" ); } @EventHandler public void on (OrderCancelledEvent event) { readRepo.updateStatus(event.orderId(), "CANCELLED" ); } @EventHandler public void on (OrderShippedEvent event) { readRepo.updateStatusAndTracking( event.orderId(), "SHIPPED" , event.trackingNumber()); } }@Component public class CustomerStatsProjection { private final RedisTemplate<String, Object> redis; @EventHandler public void on (OrderCreatedEvent event) { String key = "customer:stats:" + event.customerId(); redis.opsForHash().increment(key, "totalOrders" , 1 ); redis.opsForHash().increment(key, "totalAmount" , event.totalAmount().value().longValue()); } @EventHandler public void on (OrderCancelledEvent event) { String key = "customer:stats:" + event.customerId(); redis.opsForHash().increment(key, "cancelledOrders" , 1 ); } }
快照机制
当聚合的事件数量很多时,每次重放所有事件会很慢。快照机制定期保存聚合的状态快照,重建时只需要从最近的快照开始重放后续事件。
sequenceDiagram
participant Client
participant Repo as Repository
participant SS as Snapshot Store
participant ES as Event Store
Client->>Repo: 加载聚合(id=123)
Repo->>SS: 获取最新快照
SS-->>Repo: 快照(version=100, state)
Repo->>ES: 获取version>100的事件
ES-->>Repo: events[101..105]
Repo->>Repo: 从快照恢复状态<br/>重放5个事件
Repo-->>Client: 聚合(version=105)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public class SnapshotRepository <T extends EventSourcedAggregate > { private static final int SNAPSHOT_INTERVAL = 100 ; private final EventStore eventStore; private final SnapshotStore snapshotStore; public T load (String aggregateId, Class<T> type) { Optional<Snapshot> snapshot = snapshotStore.getLatest(aggregateId); List<DomainEvent> events; T aggregate; if (snapshot.isPresent()) { aggregate = deserialize(snapshot.get().getData(), type); events = eventStore.getEventsAfter( aggregateId, snapshot.get().getVersion()); } else { aggregate = instantiate(type); events = eventStore.getEvents(aggregateId); } for (DomainEvent event : events) { aggregate.apply(event, false ); } return aggregate; } public void save (T aggregate) { eventStore.saveEvents( aggregate.getId(), aggregate.getUncommittedEvents(), aggregate.getVersion() ); if (aggregate.getVersion() % SNAPSHOT_INTERVAL == 0 ) { snapshotStore.save(new Snapshot ( aggregate.getId(), aggregate.getVersion(), serialize(aggregate) )); } aggregate.markEventsAsCommitted(); } }
Axon Framework 实战
Axon Framework 是 Java 生态中最成熟的 CQRS + Event Sourcing
框架。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 @Aggregate public class OrderAggregate { @AggregateIdentifier private String orderId; private OrderStatus status; private Money totalAmount; @CommandHandler public OrderAggregate (CreateOrderCommand cmd) { if (cmd.items().isEmpty()) { throw new IllegalArgumentException ("Order must have items" ); } AggregateLifecycle.apply(new OrderCreatedEvent ( cmd.getOrderId(), cmd.getCustomerId(), cmd.getItems(), calculateTotal(cmd.getItems()) )); } @CommandHandler public void handle (ConfirmOrderCommand cmd) { if (status != OrderStatus.PLACED) { throw new IllegalStateException ("Order not in PLACED status" ); } AggregateLifecycle.apply(new OrderConfirmedEvent (orderId)); } @EventSourcingHandler public void on (OrderCreatedEvent event) { this .orderId = event.getOrderId(); this .status = OrderStatus.PLACED; this .totalAmount = event.getTotalAmount(); } @EventSourcingHandler public void on (OrderConfirmedEvent event) { this .status = OrderStatus.CONFIRMED; } protected OrderAggregate () {} }@Component @ProcessingGroup("order-projection") public class OrderProjection { private final OrderViewRepository repository; @EventHandler public void on (OrderCreatedEvent event) { OrderView view = new OrderView ( event.getOrderId(), event.getCustomerId(), event.getTotalAmount(), "PLACED" ); repository.save(view); } @EventHandler public void on (OrderConfirmedEvent event) { repository.findById(event.getOrderId()) .ifPresent(view -> { view.setStatus("CONFIRMED" ); repository.save(view); }); } @QueryHandler public OrderView handle (FindOrderQuery query) { return repository.findById(query.getOrderId()) .orElseThrow(); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 axon: axonserver: servers: localhost:8124 serializer: general: jackson messages: jackson events: jackson eventhandling: processors: order-projection: mode: tracking initial-segment-count: 2 batch-size: 100
CQRS + Event Sourcing
的数据流
graph TB
Client[客户端] -->|Command| CmdBus[Command Bus]
CmdBus --> CmdHandler[Command Handler]
CmdHandler --> Aggregate[聚合根]
Aggregate -->|产生事件| EventStore[(Event Store)]
EventStore -->|发布事件| EventBus[Event Bus]
EventBus --> Projection1[投影 1]
EventBus --> Projection2[投影 2]
EventBus --> Saga[Saga]
Projection1 --> ReadDB1[(读数据库)]
Projection2 --> ReadDB2[(搜索引擎)]
Client -->|Query| QueryBus[Query Bus]
QueryBus --> QueryHandler[Query Handler]
QueryHandler --> ReadDB1
QueryHandler --> ReadDB2
优缺点分析
优势
完整审计日志 :事件流天然是审计日志,满足合规要求
时间旅行 :可以重建任意时间点的状态
读写独立扩展 :读模型和写模型可以独立优化和扩缩容
灵活的读模型 :可以根据新的查询需求随时创建新的投影
事件驱动集成 :事件流可以驱动其他系统的集成
挑战
最终一致性 :读模型和写模型之间存在延迟,需要接受最终一致性
事件版本演进 :事件 Schema
变更需要谨慎处理(upcasting)
学习曲线 :团队需要转变思维模式
查询复杂性 :需要精心设计投影来满足查询需求
事件存储增长 :事件不可删除,存储会持续增长
适用场景
复杂业务领域
简单 CRUD
需要审计日志
数据量小
读写比例差异大
强一致性要求
需要事件驱动集成
小团队 MVP
金融交易系统
实时查询要求极高
总结
CQRS 和事件溯源是应对复杂业务领域的有力工具。CQRS
通过分离读写关注点来简化模型设计,事件溯源通过存储事件而非状态来获得完整的变更历史。两者结合使用时威力最大,但也引入了显著的复杂性。建议从业务最复杂的核心域开始采用,而非在整个系统中全面铺开。在实施前,确保团队理解最终一致性的含义,并具备处理事件版本演进的能力。