Java · #java#netty#nio#network

Netty核心架构与源码分析

2025.05.14 Java 9 min 3.7k
// 目录 · contents

前言

Netty是Java生态中最流行的异步事件驱动网络框架,被广泛应用于各种高性能网络应用中:Dubbo、gRPC-Java、Elasticsearch、Kafka的网络层都基于Netty构建。理解Netty的核心架构和设计理念,不仅有助于用好Netty,更能提升对高性能网络编程的整体认知。本文将从架构设计、核心组件、源码分析三个维度深入解析Netty。

Reactor 模式

Netty的线程模型基于经典的Reactor模式。理解Reactor模式是理解Netty的前提。

graph TB
    subgraph 主从Reactor模型
        CLIENT1[客户端1] --> BOSS
        CLIENT2[客户端2] --> BOSS
        CLIENT3[客户端3] --> BOSS

        BOSS[BossGroup<br/>MainReactor<br/>负责Accept] -->|分配连接| WORKER1[WorkerGroup-1<br/>SubReactor]
        BOSS -->|分配连接| WORKER2[WorkerGroup-2<br/>SubReactor]
        BOSS -->|分配连接| WORKER3[WorkerGroup-N<br/>SubReactor]

        WORKER1 -->|读写事件| H1[Handler<br/>编解码+业务]
        WORKER2 -->|读写事件| H2[Handler<br/>编解码+业务]
        WORKER3 -->|读写事件| H3[Handler<br/>编解码+业务]
    end

    style BOSS fill:#f96,stroke:#333,stroke-width:2px
    style WORKER1 fill:#9cf,stroke:#333
    style WORKER2 fill:#9cf,stroke:#333
    style WORKER3 fill:#9cf,stroke:#333
  • BossGroup(MainReactor):负责接收客户端连接(Accept事件),通常1个线程足够
  • WorkerGroup(SubReactor):负责处理已建立连接的I/O读写事件,通常设置为CPU核心数*2

Netty 核心架构

graph TB
    subgraph Netty架构层次
        TRANSPORT[传输层<br/>Channel / EventLoop / ChannelFuture]
        PIPELINE[管道层<br/>ChannelPipeline / ChannelHandler]
        CODEC[编解码层<br/>ByteToMessageDecoder / MessageToByteEncoder]
        PROTOCOL[协议层<br/>HTTP / WebSocket / 自定义协议]
        BIZ[业务层<br/>SimpleChannelInboundHandler]
    end

    TRANSPORT --> PIPELINE --> CODEC --> PROTOCOL --> BIZ

    subgraph 支撑组件
        BYTEBUF[ByteBuf<br/>内存管理]
        BOOTSTRAP[Bootstrap<br/>启动引导]
        FUTURE[Promise/Future<br/>异步编程]
    end

    TRANSPORT ---|依赖| BYTEBUF
    TRANSPORT ---|依赖| BOOTSTRAP
    TRANSPORT ---|依赖| FUTURE

    style TRANSPORT fill:#f96,stroke:#333
    style PIPELINE fill:#ff6,stroke:#333
    style CODEC fill:#9f6,stroke:#333

核心组件源码分析

1. EventLoopGroup 与 EventLoop

EventLoop是Netty的核心调度引擎。每个EventLoop绑定一个线程,负责处理多个Channel的I/O事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* Netty服务端启动代码
*/
public class NettyServer {

public static void main(String[] args) throws Exception {
// BossGroup: 处理连接请求,1个线程足够
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// WorkerGroup: 处理I/O读写,默认为CPU核心数*2
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024) // 连接队列大小
.option(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.TCP_NODELAY, true) // 禁用Nagle算法
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 编解码器
pipeline.addLast(new LengthFieldBasedFrameDecoder(
65535, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
// 心跳检测
pipeline.addLast(new IdleStateHandler(60, 30, 0));
// 业务处理器
pipeline.addLast(new ServerBusinessHandler());
}
});

// 绑定端口并启动
ChannelFuture future = bootstrap.bind(8080).sync();
System.out.println("Server started on port 8080");

// 等待服务器关闭
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

EventLoop 核心运行循环

NioEventLoop的核心是一个无限循环,不断执行三件事:

flowchart TB
    START[NioEventLoop.run] --> SELECT[1. select<br/>轮询I/O事件]
    SELECT --> PROCESS[2. processSelectedKeys<br/>处理I/O事件]
    PROCESS --> RUNTASKS[3. runAllTasks<br/>执行异步任务队列]
    RUNTASKS --> SELECT

    SELECT -.->|JDK epoll bug<br/>空轮询| REBUILD[重建Selector<br/>规避Bug]
    REBUILD --> SELECT

    style SELECT fill:#f96,stroke:#333
    style PROCESS fill:#9cf,stroke:#333
    style RUNTASKS fill:#9f6,stroke:#333
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* NioEventLoop核心逻辑简化版(源码位于io.netty.channel.nio.NioEventLoop)
*
* 关键设计:
* 1. I/O事件和任务处理共享同一个线程,避免线程切换
* 2. ioRatio控制I/O处理和任务处理的时间比例(默认50:50)
* 3. 内置JDK epoll空轮询bug的检测和修复机制
*/
// 以下为源码核心流程的简化表示
// 实际源码在 NioEventLoop#run() 方法中
//
// for (;;) {
// // Step 1: 调用selector.select(),等待I/O事件
// int selectedKeys = select(curDeadlineNanos);
//
// // Step 2: 处理所有就绪的I/O事件
// if (selectedKeys != 0) {
// processSelectedKeys();
// }
//
// // Step 3: 执行taskQueue中的异步任务
// runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
//
// // 检测epoll空轮询bug
// if (unexpectedSelectorWakeup(selectCnt)) {
// rebuildSelector(); // 重建Selector
// }
// }

2. Channel Pipeline

ChannelPipeline是Netty的事件处理链,采用责任链模式。每个Channel都有一个Pipeline,Pipeline中包含多个ChannelHandler。

graph LR
    subgraph ChannelPipeline
        HEAD[HeadContext<br/>head] --> IN1[Decoder<br/>Inbound]
        IN1 --> IN2[BusinessHandler<br/>Inbound]
        IN2 --> TAIL[TailContext<br/>tail]

        TAIL --> OUT1[Encoder<br/>Outbound]
        OUT1 --> HEAD
    end

    SOCKET[Socket] -->|读取数据| HEAD
    HEAD -->|写出数据| SOCKET

    IN_FLOW[入站事件流向] -.->|从Head到Tail| TAIL
    OUT_FLOW[出站事件流向] -.->|从Tail到Head| HEAD

    style HEAD fill:#f96,stroke:#333
    style TAIL fill:#f96,stroke:#333
    style IN1 fill:#9cf,stroke:#333
    style IN2 fill:#9cf,stroke:#333
    style OUT1 fill:#fc9,stroke:#333
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* 入站处理器:处理接收到的数据
*/
public class ServerBusinessHandler extends SimpleChannelInboundHandler<String> {

private static final Logger log = LoggerFactory.getLogger(ServerBusinessHandler.class);

@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
log.info("Received from {}: {}", ctx.channel().remoteAddress(), msg);

// 业务处理
String response = processMessage(msg);

// 写回响应
ctx.writeAndFlush(response).addListener(future -> {
if (!future.isSuccess()) {
log.error("Failed to send response", future.cause());
}
});
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
log.info("Client connected: {}", ctx.channel().remoteAddress());
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
log.info("Client disconnected: {}", ctx.channel().remoteAddress());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("Exception in channel {}", ctx.channel().remoteAddress(), cause);
ctx.close();
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof IdleStateEvent idleEvent) {
switch (idleEvent.state()) {
case READER_IDLE -> {
log.warn("Reader idle, closing: {}", ctx.channel().remoteAddress());
ctx.close();
}
case WRITER_IDLE -> {
// 发送心跳
ctx.writeAndFlush("HEARTBEAT");
}
default -> {}
}
}
}

private String processMessage(String msg) {
return "Echo: " + msg;
}
}

3. ByteBuf:高性能缓冲区

ByteBuf是Netty对NIO ByteBuffer的增强替代,解决了ByteBuffer的诸多不便。

graph TB
    subgraph ByteBuf内存布局
        direction LR
        DISCARD[已丢弃区域<br/>0..readerIndex]
        READABLE[可读区域<br/>readerIndex..writerIndex]
        WRITABLE[可写区域<br/>writerIndex..capacity]
        RESERVE[保留区域<br/>capacity..maxCapacity]
    end

    RI[readerIndex] --> READABLE
    WI[writerIndex] --> WRITABLE
    CAP[capacity] --> RESERVE

    style DISCARD fill:#ccc,stroke:#333
    style READABLE fill:#9f6,stroke:#333
    style WRITABLE fill:#9cf,stroke:#333
    style RESERVE fill:#ffa,stroke:#333
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/**
* ByteBuf核心特性演示
*/
public class ByteBufExamples {

/**
* 基本读写操作
*/
public void basicOperations() {
// 堆缓冲区
ByteBuf heapBuf = Unpooled.buffer(256);
// 直接缓冲区(零拷贝,适合I/O操作)
ByteBuf directBuf = Unpooled.directBuffer(256);
// 池化缓冲区(生产推荐)
ByteBuf pooledBuf = PooledByteBufAllocator.DEFAULT.buffer(256);

try {
// 写入数据,writerIndex自动后移
heapBuf.writeInt(42);
heapBuf.writeLong(System.currentTimeMillis());
heapBuf.writeBytes("Hello Netty".getBytes(CharsetUtil.UTF_8));

// 读取数据,readerIndex自动后移
int value = heapBuf.readInt();
long timestamp = heapBuf.readLong();

// 读取指定长度的字节
byte[] bytes = new byte[heapBuf.readableBytes()];
heapBuf.readBytes(bytes);
String message = new String(bytes, CharsetUtil.UTF_8);
} finally {
// 释放引用计数
heapBuf.release();
directBuf.release();
pooledBuf.release();
}
}

/**
* 零拷贝:CompositeByteBuf
* 将多个ByteBuf逻辑组合为一个,无需内存拷贝
*/
public void zeroCopyComposite() {
ByteBuf header = Unpooled.wrappedBuffer("HEADER|".getBytes());
ByteBuf body = Unpooled.wrappedBuffer("message body".getBytes());

// 逻辑组合,不发生内存拷贝
CompositeByteBuf composite = Unpooled.compositeBuffer();
composite.addComponents(true, header, body);

// 像操作单个ByteBuf一样使用
byte[] all = new byte[composite.readableBytes()];
composite.readBytes(all);
System.out.println(new String(all)); // HEADER|message body

composite.release();
}

/**
* 零拷贝:slice
* 对ByteBuf进行切片,共享底层内存
*/
public void zeroCopySlice() {
ByteBuf original = Unpooled.wrappedBuffer("Hello, World!".getBytes());

// slice共享底层内存,不发生拷贝
ByteBuf hello = original.slice(0, 5);
ByteBuf world = original.slice(7, 6);

// 注意:slice后的ByteBuf和原始ByteBuf共享内存
// 修改一个会影响另一个

original.release();
}
}

4. 内存池化机制

Netty通过内存池化大幅减少GC压力和内存分配开销。

graph TB
    subgraph PooledByteBufAllocator
        ARENA[PoolArena]
        ARENA --> TC[PoolThreadCache<br/>线程本地缓存]
        ARENA --> CHUNK[PoolChunk 16MB]
        CHUNK --> PAGE[PoolSubpage 8KB]

        TC -->|优先从线程缓存分配| ALLOC[分配ByteBuf]
        CHUNK -->|线程缓存未命中| ALLOC
    end

    subgraph 内存分级
        TINY[Tiny: 16B ~ 512B]
        SMALL[Small: 512B ~ 8KB]
        NORMAL[Normal: 8KB ~ 16MB]
        HUGE[Huge: > 16MB<br/>不池化]
    end

    style TC fill:#ffa,stroke:#333
    style CHUNK fill:#aff,stroke:#333
    style ARENA fill:#f96,stroke:#333
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* 内存泄漏检测
* Netty使用引用计数管理ByteBuf生命周期
*/
public class MemoryLeakPrevention {

/**
* 正确释放ByteBuf
*/
public class SafeHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
try {
// 处理数据
processData(buf);
} finally {
// 必须释放,否则内存泄漏
ReferenceCountUtil.release(msg);
}
}
}

/**
* SimpleChannelInboundHandler会自动释放
*/
public class AutoReleaseHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
// 处理数据
// msg会在方法返回后自动释放
processData(msg);
}
}
}

启用内存泄漏检测:

1
2
3
4
5
6
7
8
# 开发环境:高级检测(对每个ByteBuf进行采样跟踪)
java -Dio.netty.leakDetection.level=ADVANCED -jar app.jar

# 测试环境:偏执级检测(跟踪所有ByteBuf,性能开销大)
java -Dio.netty.leakDetection.level=PARANOID -jar app.jar

# 生产环境:简单检测(默认,仅采样1%)
java -Dio.netty.leakDetection.level=SIMPLE -jar app.jar

编解码框架

自定义协议编解码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
/**
* 自定义协议格式:
* +--------+--------+--------+--------+
* | Magic | Version| MsgType| Length |
* | 2 bytes| 1 byte | 1 byte | 4 bytes|
* +--------+--------+--------+--------+
* | Body |
* | (Length bytes) |
* +------------------------------------+
*/
public class CustomProtocol {

public static final short MAGIC = (short) 0xCAFE;
public static final byte VERSION = 1;

// 消息类型
public static final byte MSG_REQUEST = 1;
public static final byte MSG_RESPONSE = 2;
public static final byte MSG_HEARTBEAT = 3;
}

/**
* 解码器
*/
public class CustomDecoder extends ByteToMessageDecoder {

private static final int HEADER_LENGTH = 8; // 2+1+1+4

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in,
List<Object> out) {

// 可读字节不够头部长度,等待更多数据
if (in.readableBytes() < HEADER_LENGTH) {
return;
}

in.markReaderIndex();

// 读取Magic Number
short magic = in.readShort();
if (magic != CustomProtocol.MAGIC) {
in.resetReaderIndex();
throw new DecoderException("Invalid magic number: " + magic);
}

byte version = in.readByte();
byte msgType = in.readByte();
int bodyLength = in.readInt();

// 校验body长度合法性
if (bodyLength < 0 || bodyLength > 1024 * 1024) {
throw new DecoderException("Invalid body length: " + bodyLength);
}

// body数据不够,等待
if (in.readableBytes() < bodyLength) {
in.resetReaderIndex();
return;
}

// 读取body
byte[] body = new byte[bodyLength];
in.readBytes(body);

// 反序列化为消息对象
Message message = new Message();
message.setVersion(version);
message.setMsgType(msgType);
message.setBody(body);

out.add(message);
}
}

/**
* 编码器
*/
public class CustomEncoder extends MessageToByteEncoder<Message> {

@Override
protected void encode(ChannelHandlerContext ctx, Message msg,
ByteBuf out) {
out.writeShort(CustomProtocol.MAGIC);
out.writeByte(msg.getVersion());
out.writeByte(msg.getMsgType());
out.writeInt(msg.getBody().length);
out.writeBytes(msg.getBody());
}
}

心跳机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* 完整的心跳处理方案
*/
public class HeartbeatHandler extends ChannelInboundHandlerAdapter {

private static final Logger log = LoggerFactory.getLogger(HeartbeatHandler.class);
private int missedHeartbeats = 0;
private static final int MAX_MISSED = 3;

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent idleEvent) {
switch (idleEvent.state()) {
case READER_IDLE -> {
missedHeartbeats++;
if (missedHeartbeats >= MAX_MISSED) {
log.warn("Missed {} heartbeats, closing connection: {}",
missedHeartbeats, ctx.channel().remoteAddress());
ctx.close();
} else {
log.debug("Reader idle (missed: {}), peer: {}",
missedHeartbeats, ctx.channel().remoteAddress());
}
}
case WRITER_IDLE -> {
log.debug("Sending heartbeat to: {}",
ctx.channel().remoteAddress());
Message heartbeat = new Message();
heartbeat.setMsgType(CustomProtocol.MSG_HEARTBEAT);
heartbeat.setBody(new byte[0]);
ctx.writeAndFlush(heartbeat);
}
default -> {}
}
} else {
super.userEventTriggered(ctx, evt);
}
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 收到任何数据都重置心跳计数
missedHeartbeats = 0;

if (msg instanceof Message message
&& message.getMsgType() == CustomProtocol.MSG_HEARTBEAT) {
// 心跳消息不传递给后续Handler
return;
}

// 非心跳消息传递给下一个Handler
ctx.fireChannelRead(msg);
}
}

性能优化要点

Netty调优参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)

// 服务端 TCP 参数
.option(ChannelOption.SO_BACKLOG, 1024) // SYN队列大小
.option(ChannelOption.SO_REUSEADDR, true) // 端口复用

// 子连接参数
.childOption(ChannelOption.TCP_NODELAY, true) // 禁用Nagle
.childOption(ChannelOption.SO_KEEPALIVE, true) // TCP保活
.childOption(ChannelOption.SO_SNDBUF, 32 * 1024) // 发送缓冲区
.childOption(ChannelOption.SO_RCVBUF, 32 * 1024) // 接收缓冲区

// 使用池化内存分配器(Netty 4默认即为池化)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)

// 写缓冲区水位线:高水位时停止写入,低水位时恢复
.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
new WriteBufferWaterMark(32 * 1024, 64 * 1024));

关键优化策略

策略 说明
使用池化ByteBuf 减少GC压力,默认已启用
使用DirectBuffer 减少堆内外内存拷贝
CompositeByteBuf 零拷贝组合多个Buffer
IO线程不做耗时操作 耗时业务放到独立线程池
合理设置水位线 防止写缓冲区无限增长导致OOM
优雅关闭 shutdownGracefully确保资源释放
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 耗时业务放到独立线程池
*/
public class BusinessThreadPoolHandler extends ChannelInboundHandlerAdapter {

// 业务线程池,不占用EventLoop
private static final EventExecutorGroup BUSINESS_GROUP =
new DefaultEventExecutorGroup(16);

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 将耗时操作提交到业务线程池
BUSINESS_GROUP.submit(() -> {
String result = heavyBusinessLogic((Message) msg);
ctx.writeAndFlush(result);
});
}

private String heavyBusinessLogic(Message msg) {
// 数据库查询、复杂计算等耗时操作
return "processed";
}
}

// 或在Pipeline中指定Handler的执行线程组
pipeline.addLast(businessGroup, "businessHandler", new BusinessHandler());

总结

Netty的高性能源于多个层面的精心设计:

  1. 线程模型:基于主从Reactor的事件驱动模型,少量线程处理大量连接
  2. 零拷贝:CompositeByteBuf、slice、FileRegion等机制减少内存拷贝
  3. 内存池化:PooledByteBufAllocator通过内存池和线程本地缓存减少GC和分配开销
  4. Pipeline机制:责任链模式实现灵活的事件处理流水线
  5. 无锁设计:EventLoop绑定单线程,同一Channel的所有操作在同一线程中执行,避免同步开销

理解这些设计思想,不仅能帮助我们更好地使用Netty,也能启发我们在其他系统设计中应用类似的高性能策略。在实际使用中,注意ByteBuf的引用计数管理防止内存泄漏,避免在EventLoop线程中执行阻塞操作,合理配置心跳和水位线机制保障系统稳定性。

作者 · authorzt
发布 · date2025-05-14
篇幅 · length3.7k 字 · 9 min
许可 · licenseCC BY-SA 4.0
$ echo "comments" · 评论