Architecture · #system-design#im#websocket

系统设计实战:即时通讯系统架构

2024.04.17 10 min 3.9k
// 目录 · contents

前言

即时通讯(IM)系统是最具挑战性的分布式系统之一,它要求低延迟的实时消息投递、海量消息的可靠存储、复杂的群聊消息扩散,以及在线状态管理等能力。本文将从需求分析出发,逐步设计一个支持千万级用户的 IM 系统架构。

需求分析

功能需求

  1. 单聊:一对一实时消息
  2. 群聊:支持数百人的群组聊天
  3. 消息类型:文本、图片、语音、文件
  4. 在线状态:显示好友在线/离线
  5. 消息状态:已发送、已送达、已读
  6. 离线消息:用户上线后拉取离线消息
  7. 推送通知:离线用户收到推送

非功能需求

  • 低延迟:消息端到端延迟 < 200ms
  • 可靠性:消息不丢失、不重复
  • 有序性:保证消息顺序
  • 高并发:支持千万在线连接

容量估算

1
2
3
4
5
6
7
8
用户数: 1000万 DAU
平均每人每天发送: 30 条消息
消息平均大小: 200 bytes

日消息量: 1000万 × 30 = 3亿条
每秒消息数: 3亿 / 86400 ≈ 3,500 msg/s (峰值 × 5 ≈ 17,500 msg/s)
日存储量: 3亿 × 200B = 60GB
并发连接: 1000万 × 30% = 300万 (同时在线)

系统架构总览

graph TB
    subgraph 客户端
        iOS[iOS App]
        Android[Android App]
        Web[Web App]
    end

    iOS --> LB[负载均衡 L4]
    Android --> LB
    Web --> LB

    LB --> GW1[Gateway 1<br/>WebSocket]
    LB --> GW2[Gateway 2<br/>WebSocket]
    LB --> GWN[Gateway N<br/>WebSocket]

    GW1 --> MsgService[消息服务]
    GW2 --> MsgService

    MsgService --> Router[消息路由]
    Router --> GW1
    Router --> GW2

    MsgService --> MQ[Kafka]
    MQ --> Storage[存储服务]
    MQ --> Push[推送服务]
    MQ --> Search[搜索服务]

    Storage --> MsgDB[(消息DB<br/>TiDB/MongoDB)]
    Storage --> SeqDB[(序号DB<br/>Redis)]

    subgraph 辅助服务
        Presence[在线状态服务]
        Group[群组服务]
        User[用户服务]
    end

    GW1 <--> Presence
    MsgService --> Group

WebSocket 长连接

连接管理

sequenceDiagram
    participant Client
    participant LB as 负载均衡
    participant GW as Gateway
    participant Auth as 认证服务
    participant Registry as 连接注册表

    Client->>LB: WebSocket 握手
    LB->>GW: 转发(基于IP Hash)
    GW->>Auth: 验证Token
    Auth-->>GW: userId=123

    GW->>GW: 建立WebSocket连接
    GW->>Registry: 注册连接<br/>userId:123 → gateway:1

    loop 心跳保活
        Client->>GW: PING (每30s)
        GW-->>Client: PONG
    end

    Note over Client,GW: 连接断开时
    GW->>Registry: 注销连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// WebSocket Gateway 核心逻辑
type Gateway struct {
connections sync.Map // userId -> *Connection
registry Registry // 分布式连接注册表 (Redis)
router Router // 消息路由
}

type Connection struct {
userId string
conn *websocket.Conn
sendChan chan []byte
closeChan chan struct{}
}

func (gw *Gateway) HandleWebSocket(w http.ResponseWriter, r *http.Request) {
// 1. 升级为 WebSocket
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}

// 2. 认证
token := r.URL.Query().Get("token")
userId, err := gw.authService.Verify(token)
if err != nil {
conn.Close()
return
}

// 3. 创建连接对象
c := &Connection{
userId: userId,
conn: conn,
sendChan: make(chan []byte, 256),
closeChan: make(chan struct{}),
}

// 4. 踢掉旧连接(同一用户只允许一个设备在线,或支持多端)
if old, ok := gw.connections.Load(userId); ok {
old.(*Connection).Close()
}
gw.connections.Store(userId, c)

// 5. 注册到分布式注册表
gw.registry.Register(userId, gw.nodeId)

// 6. 启动读写协程
go c.readPump(gw)
go c.writePump()
}

func (c *Connection) readPump(gw *Gateway) {
defer func() {
gw.connections.Delete(c.userId)
gw.registry.Unregister(c.userId)
c.conn.Close()
}()

c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
c.conn.SetPongHandler(func(string) error {
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
})

for {
_, message, err := c.conn.ReadMessage()
if err != nil {
break
}
gw.handleMessage(c.userId, message)
}
}

func (c *Connection) writePump() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

for {
select {
case message := <-c.sendChan:
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
c.conn.WriteMessage(websocket.TextMessage, message)
case <-ticker.C:
c.conn.WriteMessage(websocket.PingMessage, nil)
case <-c.closeChan:
return
}
}
}

连接注册表

使用 Redis 维护用户到 Gateway 节点的映射:

1
2
3
4
5
6
7
8
9
10
11
12
13
type RedisRegistry struct {
client *redis.Client
}

func (r *RedisRegistry) Register(userId, nodeId string) {
// HSET user_connections userId nodeId
r.client.HSet(ctx, "user_conn", userId, nodeId)
r.client.Expire(ctx, "user_conn:"+userId, 90*time.Second)
}

func (r *RedisRegistry) Lookup(userId string) (string, error) {
return r.client.HGet(ctx, "user_conn", userId).Result()
}

消息投递流程

单聊消息

sequenceDiagram
    participant A as 用户A<br/>(Gateway 1)
    participant GW1 as Gateway 1
    participant MS as 消息服务
    participant MQ as Kafka
    participant Store as 存储服务
    participant GW2 as Gateway 2
    participant B as 用户B<br/>(Gateway 2)

    A->>GW1: 发送消息<br/>{to: B, content: "Hello"}
    GW1->>MS: 转发消息

    MS->>MS: 1. 生成消息ID(Snowflake)
    MS->>MS: 2. 生成序号(递增seq)
    MS->>MQ: 3. 投递到Kafka

    par 并行处理
        MQ->>Store: 持久化消息
        Store->>Store: 写入消息表
    and
        MS->>MS: 查找B的Gateway
        MS->>GW2: 路由消息到Gateway 2
        GW2->>B: 推送消息
    end

    B-->>GW2: ACK
    GW2-->>MS: ACK
    MS-->>GW1: 已送达
    GW1-->>A: 消息已送达 ✓✓
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Service
public class MessageService {
@Autowired
private SnowflakeIdGenerator idGen;
@Autowired
private SequenceService seqService;
@Autowired
private KafkaTemplate<String, Message> kafka;
@Autowired
private RedisRegistry registry;
@Autowired
private GatewayRouter router;

public SendResult sendMessage(String fromUserId, SendMessageRequest req) {
// 1. 生成消息ID
long msgId = idGen.nextId();

// 2. 获取会话序号(保证消息有序)
String convId = generateConversationId(fromUserId, req.getToUserId());
long seq = seqService.getNextSeq(convId);

// 3. 构建消息对象
Message message = Message.builder()
.msgId(msgId)
.conversationId(convId)
.fromUserId(fromUserId)
.toUserId(req.getToUserId())
.content(req.getContent())
.msgType(req.getMsgType())
.seq(seq)
.timestamp(System.currentTimeMillis())
.build();

// 4. 投递到 Kafka 持久化
kafka.send("chat-messages", convId, message);

// 5. 实时推送给接收方
pushToUser(req.getToUserId(), message);

return new SendResult(msgId, seq);
}

private void pushToUser(String userId, Message message) {
String gatewayNode = registry.lookup(userId);
if (gatewayNode != null) {
// 在线:通过 Gateway 推送
router.routeToGateway(gatewayNode, userId, message);
} else {
// 离线:发送推送通知
pushNotificationService.send(userId, message);
}
}
}

消息存储

存储模型

graph TB
    subgraph 写扩散 Write Diffusion
        Msg[消息] --> Inbox1[用户A收件箱]
        Msg --> Inbox2[用户B收件箱]
        Note1[每条消息写入每个接收者的收件箱<br/>写放大N倍,读简单]
    end

    subgraph 读扩散 Read Diffusion
        Msg2[消息] --> Timeline[会话时间线]
        User1[用户A] -->|读取| Timeline
        User2[用户B] -->|读取| Timeline
        Note2[消息只存一份<br/>读时聚合,读放大]
    end

单聊采用写扩散(消息写入双方的收件箱),群聊采用读扩散(消息只存在群时间线中)。对于大群(> 500 人),如果用写扩散,一条消息要写 500 次,开销太大。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
-- 会话表
CREATE TABLE conversation (
id VARCHAR(64) PRIMARY KEY,
type TINYINT NOT NULL, -- 1:单聊 2:群聊
last_msg_id BIGINT,
last_msg_seq BIGINT,
last_msg_time BIGINT,
updated_at TIMESTAMP
) ENGINE=InnoDB;

-- 消息表 (按会话ID分片)
CREATE TABLE message (
msg_id BIGINT PRIMARY KEY,
conversation_id VARCHAR(64) NOT NULL,
from_user_id VARCHAR(36) NOT NULL,
msg_type TINYINT NOT NULL, -- 1:text 2:image 3:voice
content TEXT,
seq BIGINT NOT NULL,
status TINYINT DEFAULT 1, -- 1:正常 2:撤回 3:删除
created_at BIGINT NOT NULL,

INDEX idx_conv_seq (conversation_id, seq)
) ENGINE=InnoDB;

-- 用户收件箱 (写扩散-单聊)
CREATE TABLE user_inbox (
user_id VARCHAR(36) NOT NULL,
conversation_id VARCHAR(64) NOT NULL,
last_read_seq BIGINT DEFAULT 0,
muted BOOLEAN DEFAULT FALSE,
pinned BOOLEAN DEFAULT FALSE,

PRIMARY KEY (user_id, conversation_id),
INDEX idx_user (user_id)
) ENGINE=InnoDB;

消息序号服务

消息序号(seq)保证了消息的顺序性和客户端的增量同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 基于 Redis 的序号生成器
type SequenceService struct {
redis *redis.Client
}

func (s *SequenceService) GetNextSeq(conversationId string) (int64, error) {
key := "seq:" + conversationId
// INCR 原子递增
return s.redis.Incr(ctx, key).Result()
}

// 批量获取序号(减少 Redis 调用)
func (s *SequenceService) GetNextSeqBatch(conversationId string, count int64) (int64, error) {
key := "seq:" + conversationId
endSeq, err := s.redis.IncrBy(ctx, key, count).Result()
if err != nil {
return 0, err
}
return endSeq - count + 1, nil // 返回起始序号
}

群聊消息

群消息扩散

sequenceDiagram
    participant Sender as 发送者
    participant MS as 消息服务
    participant GS as 群组服务
    participant MQ as Kafka
    participant Store as 存储服务
    participant GW as Gateway(s)
    participant Members as 群成员(在线)

    Sender->>MS: 发送群消息<br/>{groupId, content}
    MS->>GS: 获取群成员列表
    GS-->>MS: [user1, user2, ..., userN]

    MS->>MS: 生成消息ID和seq
    MS->>MQ: 写入消息(只存一份)

    MQ->>Store: 持久化到群消息表

    loop 每个在线成员
        MS->>GW: 推送消息
        GW->>Members: WebSocket推送
    end

    Note over MS: 离线成员上线后<br/>拉取未读消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void sendGroupMessage(String fromUserId, String groupId, SendMessageRequest req) {
// 1. 检查群成员资格
GroupInfo group = groupService.getGroup(groupId);
if (!group.isMember(fromUserId)) {
throw new NotGroupMemberException();
}

// 2. 生成消息
long msgId = idGen.nextId();
long seq = seqService.getNextSeq("group:" + groupId);

Message message = Message.builder()
.msgId(msgId)
.conversationId("group:" + groupId)
.fromUserId(fromUserId)
.msgType(req.getMsgType())
.content(req.getContent())
.seq(seq)
.timestamp(System.currentTimeMillis())
.build();

// 3. 持久化(只写一份到群消息表)
kafka.send("group-messages", groupId, message);

// 4. 推送给在线成员(读扩散)
List<String> memberIds = group.getMemberIds();
for (String memberId : memberIds) {
if (memberId.equals(fromUserId)) continue;
pushToUser(memberId, message);
}
}

在线状态服务(Presence)

graph TB
    subgraph Presence Service
        GW[Gateway] -->|连接/断开事件| PS[Presence Service]
        PS --> Redis[(Redis<br/>在线状态)]
        PS --> PubSub[Redis PubSub<br/>状态变更通知]
    end

    Client[好友客户端] -->|订阅好友状态| PubSub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
type PresenceService struct {
redis *redis.Client
}

// 用户上线
func (ps *PresenceService) Online(userId string) {
pipe := ps.redis.Pipeline()
// 设置在线状态,60s过期(需要心跳续期)
pipe.Set(ctx, "presence:"+userId, "online", 60*time.Second)
// 更新最后活跃时间
pipe.Set(ctx, "last_active:"+userId, time.Now().Unix(), 0)
pipe.Exec(ctx)

// 通知好友
ps.redis.Publish(ctx, "presence:change", userId+":online")
}

// 用户下线
func (ps *PresenceService) Offline(userId string) {
ps.redis.Del(ctx, "presence:"+userId)
ps.redis.Set(ctx, "last_active:"+userId, time.Now().Unix(), 0)
ps.redis.Publish(ctx, "presence:change", userId+":offline")
}

// 心跳续期
func (ps *PresenceService) Heartbeat(userId string) {
ps.redis.Expire(ctx, "presence:"+userId, 60*time.Second)
}

// 批量查询在线状态
func (ps *PresenceService) GetPresence(userIds []string) map[string]bool {
pipe := ps.redis.Pipeline()
cmds := make([]*redis.StringCmd, len(userIds))
for i, uid := range userIds {
cmds[i] = pipe.Get(ctx, "presence:"+uid)
}
pipe.Exec(ctx)

result := make(map[string]bool)
for i, uid := range userIds {
_, err := cmds[i].Result()
result[uid] = err == nil
}
return result
}

推送通知

当用户离线时,通过 APNs(iOS)或 FCM(Android)发送推送通知。

graph LR
    MS[消息服务] -->|用户离线| PushService[推送服务]
    PushService --> APNs[Apple APNs]
    PushService --> FCM[Google FCM]
    PushService --> HW[华为推送]
    PushService --> XM[小米推送]

    APNs --> iPhone[iOS设备]
    FCM --> Android[Android设备]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Service
public class PushNotificationService {
@Autowired
private DeviceTokenMapper deviceTokenMapper;

public void send(String userId, Message message) {
List<DeviceToken> tokens = deviceTokenMapper.findByUserId(userId);

for (DeviceToken token : tokens) {
PushPayload payload = PushPayload.builder()
.title(message.getFromUserName())
.body(truncateContent(message))
.badge(getUnreadCount(userId))
.data(Map.of(
"conversationId", message.getConversationId(),
"msgId", String.valueOf(message.getMsgId())
))
.build();

switch (token.getPlatform()) {
case "ios" -> apnsClient.send(token.getToken(), payload);
case "android" -> fcmClient.send(token.getToken(), payload);
case "huawei" -> huaweiClient.send(token.getToken(), payload);
}
}
}
}

消息排序与同步

基于 seq 的增量同步

客户端只需要记住本地最大 seq,上线后拉取大于该 seq 的消息即可。

sequenceDiagram
    participant Client
    participant API as API Server
    participant DB as 消息DB

    Note over Client: 本地最大seq=100

    Client->>API: 同步消息<br/>{conversationId, lastSeq: 100, limit: 50}
    API->>DB: SELECT * FROM message<br/>WHERE conv_id=? AND seq > 100<br/>ORDER BY seq LIMIT 50
    DB-->>API: messages (seq: 101-150)
    API-->>Client: 返回50条消息

    Note over Client: 更新本地seq=150
    Note over Client: 如果返回满50条<br/>继续拉取
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@GetMapping("/messages/sync")
public SyncResponse syncMessages(
@RequestParam String conversationId,
@RequestParam long lastSeq,
@RequestParam(defaultValue = "50") int limit) {

List<Message> messages = messageMapper.selectByConvIdAndSeqGreaterThan(
conversationId, lastSeq, limit);

boolean hasMore = messages.size() == limit;
long maxSeq = messages.isEmpty() ? lastSeq :
messages.get(messages.size() - 1).getSeq();

return new SyncResponse(messages, hasMore, maxSeq);
}

已读回执

sequenceDiagram
    participant A as 用户A
    participant Server as 服务器
    participant B as 用户B

    A->>Server: 消息(seq=5)
    Server->>B: 推送消息
    B->>B: 用户阅读消息
    B->>Server: 已读回执<br/>{conversationId, readSeq: 5}
    Server->>Server: 更新B的已读位置
    Server->>A: 推送已读回执
    Note over A: 消息显示 ✓✓(已读)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 已读回执处理
@Service
public class ReadReceiptService {

public void markAsRead(String userId, String conversationId, long readSeq) {
// 1. 更新用户收件箱的已读位置
userInboxMapper.updateLastReadSeq(userId, conversationId, readSeq);

// 2. 通知消息发送者
String otherUserId = getOtherUserId(conversationId, userId);
ReadReceipt receipt = new ReadReceipt(conversationId, userId, readSeq);
pushToUser(otherUserId, receipt);

// 3. 更新未读计数
redis.delete("unread:" + userId + ":" + conversationId);
}

// 获取未读消息数
public long getUnreadCount(String userId, String conversationId) {
String cacheKey = "unread:" + userId + ":" + conversationId;
Long cached = redis.opsForValue().get(cacheKey);
if (cached != null) return cached;

long lastReadSeq = userInboxMapper.getLastReadSeq(userId, conversationId);
long lastMsgSeq = seqService.getCurrentSeq(conversationId);
long unread = lastMsgSeq - lastReadSeq;

redis.opsForValue().set(cacheKey, unread, Duration.ofMinutes(5));
return unread;
}
}

水平扩展

Gateway 扩展

Gateway 是无状态的(连接信息注册在 Redis 中),可以直接水平扩展。使用 L4 负载均衡(基于连接的负载均衡)分配新连接。

消息服务扩展

消息服务按会话 ID 进行 Kafka 分区路由,保证同一会话的消息有序处理。

数据库扩展

graph TB
    subgraph 分片策略
        Router[路由] -->|conversation_id hash| S1[Shard 1<br/>消息表]
        Router --> S2[Shard 2<br/>消息表]
        Router --> S3[Shard 3<br/>消息表]
        Router --> S4[Shard 4<br/>消息表]
    end

    subgraph 冷热分离
        Hot[热数据 < 3个月<br/>TiDB/MySQL] --> Cold[冷数据<br/>HBase/对象存储]
    end
graph TB
    subgraph 完整部署架构
        LB[L4 负载均衡] --> GW1[Gateway ×N<br/>WebSocket]
        GW1 --> MS[消息服务 ×N]
        MS --> Kafka[Kafka Cluster]
        Kafka --> Store[存储服务 ×N]

        Store --> TiDB[TiDB Cluster<br/>消息存储]
        Store --> Redis[Redis Cluster<br/>序号/状态/缓存]

        MS --> Presence[Presence Service ×N]
        Presence --> Redis

        Kafka --> Push[推送服务 ×N]
        Push --> APNs[APNs/FCM]
    end

关键设计决策总结

决策点 选择 原因
通信协议 WebSocket 双向实时通信
消息ID Snowflake 趋势递增、全局唯一
消息排序 会话级递增seq 保证有序、支持增量同步
单聊存储 写扩散 读取简单,适合小规模扩散
群聊存储 读扩散 避免大群的写放大
在线状态 Redis + TTL 简单高效,自动过期
消息队列 Kafka 高吞吐、有序、持久化

总结

IM 系统的核心挑战在于:WebSocket 长连接的管理和路由、消息的可靠有序投递、群聊消息的高效扩散,以及在线状态的实时同步。通过将 Gateway 层(连接管理)与消息服务层(业务逻辑)分离,使用 Redis 作为连接注册表和序号生成器,使用 Kafka 实现消息的可靠异步处理,可以构建一个支持千万级用户的 IM 系统。在实际实现中,还需要关注消息加密(端到端加密)、消息审计、敏感词过滤等安全需求。

作者 · authorzt
发布 · date2024-04-17
篇幅 · length3.9k 字 · 10 min
许可 · licenseCC BY-SA 4.0
$ echo "comments" · 评论