前言
即时通讯(IM)系统是最具挑战性的分布式系统之一,它要求低延迟的实时消息投递、海量消息的可靠存储、复杂的群聊消息扩散,以及在线状态管理等能力。本文将从需求分析出发,逐步设计一个支持千万级用户的
IM 系统架构。
需求分析
功能需求
单聊 :一对一实时消息
群聊 :支持数百人的群组聊天
消息类型 :文本、图片、语音、文件
在线状态 :显示好友在线/离线
消息状态 :已发送、已送达、已读
离线消息 :用户上线后拉取离线消息
推送通知 :离线用户收到推送
非功能需求
低延迟 :消息端到端延迟 < 200ms
可靠性 :消息不丢失、不重复
有序性 :保证消息顺序
高并发 :支持千万在线连接
容量估算
1 2 3 4 5 6 7 8 用户数: 1000万 DAU 平均每人每天发送: 30 条消息 消息平均大小: 200 bytes 日消息量: 1000万 × 30 = 3亿条 每秒消息数: 3亿 / 86400 ≈ 3,500 msg/s (峰值 × 5 ≈ 17,500 msg/s) 日存储量: 3亿 × 200B = 60GB 并发连接: 1000万 × 30% = 300万 (同时在线)
系统架构总览
graph TB
subgraph 客户端
iOS[iOS App]
Android[Android App]
Web[Web App]
end
iOS --> LB[负载均衡 L4]
Android --> LB
Web --> LB
LB --> GW1[Gateway 1<br/>WebSocket]
LB --> GW2[Gateway 2<br/>WebSocket]
LB --> GWN[Gateway N<br/>WebSocket]
GW1 --> MsgService[消息服务]
GW2 --> MsgService
MsgService --> Router[消息路由]
Router --> GW1
Router --> GW2
MsgService --> MQ[Kafka]
MQ --> Storage[存储服务]
MQ --> Push[推送服务]
MQ --> Search[搜索服务]
Storage --> MsgDB[(消息DB<br/>TiDB/MongoDB)]
Storage --> SeqDB[(序号DB<br/>Redis)]
subgraph 辅助服务
Presence[在线状态服务]
Group[群组服务]
User[用户服务]
end
GW1 <--> Presence
MsgService --> Group
WebSocket 长连接
连接管理
sequenceDiagram
participant Client
participant LB as 负载均衡
participant GW as Gateway
participant Auth as 认证服务
participant Registry as 连接注册表
Client->>LB: WebSocket 握手
LB->>GW: 转发(基于IP Hash)
GW->>Auth: 验证Token
Auth-->>GW: userId=123
GW->>GW: 建立WebSocket连接
GW->>Registry: 注册连接<br/>userId:123 → gateway:1
loop 心跳保活
Client->>GW: PING (每30s)
GW-->>Client: PONG
end
Note over Client,GW: 连接断开时
GW->>Registry: 注销连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 type Gateway struct { connections sync.Map registry Registry router Router }type Connection struct { userId string conn *websocket.Conn sendChan chan []byte closeChan chan struct {} }func (gw *Gateway) HandleWebSocket(w http.ResponseWriter, r *http.Request) { conn, err := upgrader.Upgrade(w, r, nil ) if err != nil { return } token := r.URL.Query().Get("token" ) userId, err := gw.authService.Verify(token) if err != nil { conn.Close() return } c := &Connection{ userId: userId, conn: conn, sendChan: make (chan []byte , 256 ), closeChan: make (chan struct {}), } if old, ok := gw.connections.Load(userId); ok { old.(*Connection).Close() } gw.connections.Store(userId, c) gw.registry.Register(userId, gw.nodeId) go c.readPump(gw) go c.writePump() }func (c *Connection) readPump(gw *Gateway) { defer func () { gw.connections.Delete(c.userId) gw.registry.Unregister(c.userId) c.conn.Close() }() c.conn.SetReadDeadline(time.Now().Add(60 * time.Second)) c.conn.SetPongHandler(func (string ) error { c.conn.SetReadDeadline(time.Now().Add(60 * time.Second)) return nil }) for { _, message, err := c.conn.ReadMessage() if err != nil { break } gw.handleMessage(c.userId, message) } }func (c *Connection) writePump() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case message := <-c.sendChan: c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) c.conn.WriteMessage(websocket.TextMessage, message) case <-ticker.C: c.conn.WriteMessage(websocket.PingMessage, nil ) case <-c.closeChan: return } } }
连接注册表
使用 Redis 维护用户到 Gateway 节点的映射:
1 2 3 4 5 6 7 8 9 10 11 12 13 type RedisRegistry struct { client *redis.Client }func (r *RedisRegistry) Register(userId, nodeId string ) { r.client.HSet(ctx, "user_conn" , userId, nodeId) r.client.Expire(ctx, "user_conn:" +userId, 90 *time.Second) }func (r *RedisRegistry) Lookup(userId string ) (string , error ) { return r.client.HGet(ctx, "user_conn" , userId).Result() }
消息投递流程
单聊消息
sequenceDiagram
participant A as 用户A<br/>(Gateway 1)
participant GW1 as Gateway 1
participant MS as 消息服务
participant MQ as Kafka
participant Store as 存储服务
participant GW2 as Gateway 2
participant B as 用户B<br/>(Gateway 2)
A->>GW1: 发送消息<br/>{to: B, content: "Hello"}
GW1->>MS: 转发消息
MS->>MS: 1. 生成消息ID(Snowflake)
MS->>MS: 2. 生成序号(递增seq)
MS->>MQ: 3. 投递到Kafka
par 并行处理
MQ->>Store: 持久化消息
Store->>Store: 写入消息表
and
MS->>MS: 查找B的Gateway
MS->>GW2: 路由消息到Gateway 2
GW2->>B: 推送消息
end
B-->>GW2: ACK
GW2-->>MS: ACK
MS-->>GW1: 已送达
GW1-->>A: 消息已送达 ✓✓
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 @Service public class MessageService { @Autowired private SnowflakeIdGenerator idGen; @Autowired private SequenceService seqService; @Autowired private KafkaTemplate<String, Message> kafka; @Autowired private RedisRegistry registry; @Autowired private GatewayRouter router; public SendResult sendMessage (String fromUserId, SendMessageRequest req) { long msgId = idGen.nextId(); String convId = generateConversationId(fromUserId, req.getToUserId()); long seq = seqService.getNextSeq(convId); Message message = Message.builder() .msgId(msgId) .conversationId(convId) .fromUserId(fromUserId) .toUserId(req.getToUserId()) .content(req.getContent()) .msgType(req.getMsgType()) .seq(seq) .timestamp(System.currentTimeMillis()) .build(); kafka.send("chat-messages" , convId, message); pushToUser(req.getToUserId(), message); return new SendResult (msgId, seq); } private void pushToUser (String userId, Message message) { String gatewayNode = registry.lookup(userId); if (gatewayNode != null ) { router.routeToGateway(gatewayNode, userId, message); } else { pushNotificationService.send(userId, message); } } }
消息存储
存储模型
graph TB
subgraph 写扩散 Write Diffusion
Msg[消息] --> Inbox1[用户A收件箱]
Msg --> Inbox2[用户B收件箱]
Note1[每条消息写入每个接收者的收件箱<br/>写放大N倍,读简单]
end
subgraph 读扩散 Read Diffusion
Msg2[消息] --> Timeline[会话时间线]
User1[用户A] -->|读取| Timeline
User2[用户B] -->|读取| Timeline
Note2[消息只存一份<br/>读时聚合,读放大]
end
单聊采用写扩散 (消息写入双方的收件箱),群聊采用读扩散 (消息只存在群时间线中)。对于大群(>
500 人),如果用写扩散,一条消息要写 500 次,开销太大。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 CREATE TABLE conversation ( id VARCHAR (64 ) PRIMARY KEY, type TINYINT NOT NULL , last_msg_id BIGINT , last_msg_seq BIGINT , last_msg_time BIGINT , updated_at TIMESTAMP ) ENGINE= InnoDB;CREATE TABLE message ( msg_id BIGINT PRIMARY KEY, conversation_id VARCHAR (64 ) NOT NULL , from_user_id VARCHAR (36 ) NOT NULL , msg_type TINYINT NOT NULL , content TEXT, seq BIGINT NOT NULL , status TINYINT DEFAULT 1 , created_at BIGINT NOT NULL , INDEX idx_conv_seq (conversation_id, seq) ) ENGINE= InnoDB;CREATE TABLE user_inbox ( user_id VARCHAR (36 ) NOT NULL , conversation_id VARCHAR (64 ) NOT NULL , last_read_seq BIGINT DEFAULT 0 , muted BOOLEAN DEFAULT FALSE , pinned BOOLEAN DEFAULT FALSE , PRIMARY KEY (user_id, conversation_id), INDEX idx_user (user_id) ) ENGINE= InnoDB;
消息序号服务
消息序号(seq)保证了消息的顺序性和客户端的增量同步。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 type SequenceService struct { redis *redis.Client }func (s *SequenceService) GetNextSeq(conversationId string ) (int64 , error ) { key := "seq:" + conversationId return s.redis.Incr(ctx, key).Result() }func (s *SequenceService) GetNextSeqBatch(conversationId string , count int64 ) (int64 , error ) { key := "seq:" + conversationId endSeq, err := s.redis.IncrBy(ctx, key, count).Result() if err != nil { return 0 , err } return endSeq - count + 1 , nil }
群聊消息
群消息扩散
sequenceDiagram
participant Sender as 发送者
participant MS as 消息服务
participant GS as 群组服务
participant MQ as Kafka
participant Store as 存储服务
participant GW as Gateway(s)
participant Members as 群成员(在线)
Sender->>MS: 发送群消息<br/>{groupId, content}
MS->>GS: 获取群成员列表
GS-->>MS: [user1, user2, ..., userN]
MS->>MS: 生成消息ID和seq
MS->>MQ: 写入消息(只存一份)
MQ->>Store: 持久化到群消息表
loop 每个在线成员
MS->>GW: 推送消息
GW->>Members: WebSocket推送
end
Note over MS: 离线成员上线后<br/>拉取未读消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public void sendGroupMessage (String fromUserId, String groupId, SendMessageRequest req) { GroupInfo group = groupService.getGroup(groupId); if (!group.isMember(fromUserId)) { throw new NotGroupMemberException (); } long msgId = idGen.nextId(); long seq = seqService.getNextSeq("group:" + groupId); Message message = Message.builder() .msgId(msgId) .conversationId("group:" + groupId) .fromUserId(fromUserId) .msgType(req.getMsgType()) .content(req.getContent()) .seq(seq) .timestamp(System.currentTimeMillis()) .build(); kafka.send("group-messages" , groupId, message); List<String> memberIds = group.getMemberIds(); for (String memberId : memberIds) { if (memberId.equals(fromUserId)) continue ; pushToUser(memberId, message); } }
在线状态服务(Presence)
graph TB
subgraph Presence Service
GW[Gateway] -->|连接/断开事件| PS[Presence Service]
PS --> Redis[(Redis<br/>在线状态)]
PS --> PubSub[Redis PubSub<br/>状态变更通知]
end
Client[好友客户端] -->|订阅好友状态| PubSub
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 type PresenceService struct { redis *redis.Client }func (ps *PresenceService) Online(userId string ) { pipe := ps.redis.Pipeline() pipe.Set(ctx, "presence:" +userId, "online" , 60 *time.Second) pipe.Set(ctx, "last_active:" +userId, time.Now().Unix(), 0 ) pipe.Exec(ctx) ps.redis.Publish(ctx, "presence:change" , userId+":online" ) }func (ps *PresenceService) Offline(userId string ) { ps.redis.Del(ctx, "presence:" +userId) ps.redis.Set(ctx, "last_active:" +userId, time.Now().Unix(), 0 ) ps.redis.Publish(ctx, "presence:change" , userId+":offline" ) }func (ps *PresenceService) Heartbeat(userId string ) { ps.redis.Expire(ctx, "presence:" +userId, 60 *time.Second) }func (ps *PresenceService) GetPresence(userIds []string ) map [string ]bool { pipe := ps.redis.Pipeline() cmds := make ([]*redis.StringCmd, len (userIds)) for i, uid := range userIds { cmds[i] = pipe.Get(ctx, "presence:" +uid) } pipe.Exec(ctx) result := make (map [string ]bool ) for i, uid := range userIds { _, err := cmds[i].Result() result[uid] = err == nil } return result }
推送通知
当用户离线时,通过 APNs(iOS)或 FCM(Android)发送推送通知。
graph LR
MS[消息服务] -->|用户离线| PushService[推送服务]
PushService --> APNs[Apple APNs]
PushService --> FCM[Google FCM]
PushService --> HW[华为推送]
PushService --> XM[小米推送]
APNs --> iPhone[iOS设备]
FCM --> Android[Android设备]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 @Service public class PushNotificationService { @Autowired private DeviceTokenMapper deviceTokenMapper; public void send (String userId, Message message) { List<DeviceToken> tokens = deviceTokenMapper.findByUserId(userId); for (DeviceToken token : tokens) { PushPayload payload = PushPayload.builder() .title(message.getFromUserName()) .body(truncateContent(message)) .badge(getUnreadCount(userId)) .data(Map.of( "conversationId" , message.getConversationId(), "msgId" , String.valueOf(message.getMsgId()) )) .build(); switch (token.getPlatform()) { case "ios" -> apnsClient.send(token.getToken(), payload); case "android" -> fcmClient.send(token.getToken(), payload); case "huawei" -> huaweiClient.send(token.getToken(), payload); } } } }
消息排序与同步
基于 seq 的增量同步
客户端只需要记住本地最大 seq,上线后拉取大于该 seq 的消息即可。
sequenceDiagram
participant Client
participant API as API Server
participant DB as 消息DB
Note over Client: 本地最大seq=100
Client->>API: 同步消息<br/>{conversationId, lastSeq: 100, limit: 50}
API->>DB: SELECT * FROM message<br/>WHERE conv_id=? AND seq > 100<br/>ORDER BY seq LIMIT 50
DB-->>API: messages (seq: 101-150)
API-->>Client: 返回50条消息
Note over Client: 更新本地seq=150
Note over Client: 如果返回满50条<br/>继续拉取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @GetMapping("/messages/sync") public SyncResponse syncMessages ( @RequestParam String conversationId, @RequestParam long lastSeq, @RequestParam(defaultValue = "50") int limit) { List<Message> messages = messageMapper.selectByConvIdAndSeqGreaterThan( conversationId, lastSeq, limit); boolean hasMore = messages.size() == limit; long maxSeq = messages.isEmpty() ? lastSeq : messages.get(messages.size() - 1 ).getSeq(); return new SyncResponse (messages, hasMore, maxSeq); }
已读回执
sequenceDiagram
participant A as 用户A
participant Server as 服务器
participant B as 用户B
A->>Server: 消息(seq=5)
Server->>B: 推送消息
B->>B: 用户阅读消息
B->>Server: 已读回执<br/>{conversationId, readSeq: 5}
Server->>Server: 更新B的已读位置
Server->>A: 推送已读回执
Note over A: 消息显示 ✓✓(已读)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 @Service public class ReadReceiptService { public void markAsRead (String userId, String conversationId, long readSeq) { userInboxMapper.updateLastReadSeq(userId, conversationId, readSeq); String otherUserId = getOtherUserId(conversationId, userId); ReadReceipt receipt = new ReadReceipt (conversationId, userId, readSeq); pushToUser(otherUserId, receipt); redis.delete("unread:" + userId + ":" + conversationId); } public long getUnreadCount (String userId, String conversationId) { String cacheKey = "unread:" + userId + ":" + conversationId; Long cached = redis.opsForValue().get(cacheKey); if (cached != null ) return cached; long lastReadSeq = userInboxMapper.getLastReadSeq(userId, conversationId); long lastMsgSeq = seqService.getCurrentSeq(conversationId); long unread = lastMsgSeq - lastReadSeq; redis.opsForValue().set(cacheKey, unread, Duration.ofMinutes(5 )); return unread; } }
水平扩展
Gateway 扩展
Gateway 是无状态的(连接信息注册在 Redis 中),可以直接水平扩展。使用
L4 负载均衡(基于连接的负载均衡)分配新连接。
消息服务扩展
消息服务按会话 ID 进行 Kafka
分区路由,保证同一会话的消息有序处理。
数据库扩展
graph TB
subgraph 分片策略
Router[路由] -->|conversation_id hash| S1[Shard 1<br/>消息表]
Router --> S2[Shard 2<br/>消息表]
Router --> S3[Shard 3<br/>消息表]
Router --> S4[Shard 4<br/>消息表]
end
subgraph 冷热分离
Hot[热数据 < 3个月<br/>TiDB/MySQL] --> Cold[冷数据<br/>HBase/对象存储]
end
graph TB
subgraph 完整部署架构
LB[L4 负载均衡] --> GW1[Gateway ×N<br/>WebSocket]
GW1 --> MS[消息服务 ×N]
MS --> Kafka[Kafka Cluster]
Kafka --> Store[存储服务 ×N]
Store --> TiDB[TiDB Cluster<br/>消息存储]
Store --> Redis[Redis Cluster<br/>序号/状态/缓存]
MS --> Presence[Presence Service ×N]
Presence --> Redis
Kafka --> Push[推送服务 ×N]
Push --> APNs[APNs/FCM]
end
关键设计决策总结
通信协议
WebSocket
双向实时通信
消息ID
Snowflake
趋势递增、全局唯一
消息排序
会话级递增seq
保证有序、支持增量同步
单聊存储
写扩散
读取简单,适合小规模扩散
群聊存储
读扩散
避免大群的写放大
在线状态
Redis + TTL
简单高效,自动过期
消息队列
Kafka
高吞吐、有序、持久化
总结
IM 系统的核心挑战在于:WebSocket
长连接的管理和路由、消息的可靠有序投递、群聊消息的高效扩散,以及在线状态的实时同步。通过将
Gateway 层(连接管理)与消息服务层(业务逻辑)分离,使用 Redis
作为连接注册表和序号生成器,使用 Kafka
实现消息的可靠异步处理,可以构建一个支持千万级用户的 IM
系统。在实际实现中,还需要关注消息加密(端到端加密)、消息审计、敏感词过滤等安全需求。