新增Netty解读

pull/4/head
595208882@qq.com 3 years ago
parent a205522e22
commit 276ef00160

@ -2581,23 +2581,6 @@ Netty内部逻辑的流转
提供了基本的API用于网络I/O操作如register、bind、connect、read、write、flush 等。Netty的Channel是以JDK NIO Channel为基础的相比较于JDK NIONetty的Channel提供了更高层次的抽象同时屏蔽了底层Socket的复杂性赋予了Channel更加强大的功能在使用Netty时基本不需要再与Java Socket类直接打交道。
Channel常用的实现类有
- NioServerSocketChannel异步TCP服务端
- NioSocketChannel异步TCP客户端
- OioServerSocketChannel同步TCP服务端
- OioSocketChannel同步TCP客户端
- NioDatagramChannel异步UDP连接
- OioDatagramChannel同步UDP连接
Channel常见的状态事件回调有
- channelRegisteredChannel创建后被注册到EventLoop上
- channelUnregisteredChannel创建后未注册或者从EventLoop取消注册
- channelActiveChannel处于就绪状态可以被读写
- channelInactiveChannel处于非就绪状态
- channelReadChannel可以从远端读取到数据
- channelReadCompleteChannel读取数据完成
@ -2653,224 +2636,357 @@ Netty内部逻辑的流转
## 线程模型
### 单Reactor单线程
## Reactor线程模型
![Reactor线程模型运行机制](images/Middleware/Reactor线程模型运行机制.png)
Reactor线程模型运行机制的四个步骤分别为**连接注册、事件轮询、事件分发、任务处理**。
### 单Reactor多线程
- 连接注册Channel建立后注册至Reactor线程中的Selector选择器
- 事件轮询轮询Selector选择器中已注册的所有Channel的I/O事件
- 事件分发为准备就绪的I/O事件分配相应的处理线程
- 任务处理Reactor线程还负责任务队列中的非I/O任务每个Worker线程从各自维护的任务队列中取出任务异步执行
### 主从Reactor多线程
### 单Reactor单线程
![单Reactor单线程](images/Middleware/单Reactor单线程.png)
上图描述了 Reactor 的单线程模型结构,在 Reactor 单线程模型中,所有 I/O 操作(包括连接建立、数据读写、事件分发等),都是由一个线程完成的。单线程模型逻辑简单,缺陷也十分明显:
- 一个线程支持处理的连接数非常有限CPU 很容易打满,性能方面有明显瓶颈
- 当多个事件被同时触发时,只要有一个事件没有处理完,其他后面的事件就无法执行,这就会造成消息积压及请求超时
- 线程在处理 I/O 事件时Select 无法同时处理连接建立、事件分发等操作
- 如果 I/O 线程一直处于满负荷状态,很可能造成服务端节点不可用
## 核心设计
### 定时器TimerTask
### 单Reactor多线程
![单Reactor多线程](images/Middleware/单Reactor多线程.png)
由于单线程模型有性能方面的瓶颈多线程模型作为解决方案就应运而生了。Reactor 多线程模型将业务逻辑交给多个线程进行处理。除此之外多线程模型其他的操作与单线程模型是类似的例如读取数据依然保留了串行化的设计。当客户端有数据发送至服务端时Select 会监听到可读事件,数据读取完毕后提交到业务线程池中并发处理。
### 时间轮HashedWheelTimer
### 主从Reactor多线程
### 无锁队列mpsc queue
![主从Reactor多线程](images/Middleware/主从Reactor多线程.png)
主从多线程模型由多个 Reactor 线程组成,每个 Reactor 线程都有独立的 Selector 对象。MainReactor 仅负责处理客户端连接的 Accept 事件,连接建立成功后将新创建的连接对象注册至 SubReactor。再由 SubReactor 分配线程池中的 I/O 线程与其连接绑定,它将负责连接生命周期内所有的 I/O 事件。
Netty 推荐使用主从多线程模型,这样就可以轻松达到成千上万规模的客户端连接。在海量客户端并发请求的场景下,主从多线程模式甚至可以适当增加 SubReactor 线程的数量,从而利用多核能力提升系统的吞吐量。
### FastThreadLocal
## 核心设计
### ByteBuf
### Netty EventLoop原理
EventLoop 这个概念其实并不是 Netty 独有的,它是一种事件等待和处理的程序模型,可以解决多线程资源消耗高的问题。例如 Node.js 就采用了 EventLoop 的运行机制,不仅占用资源低,而且能够支撑了大规模的流量访问。
EventLoop 可以说是 Netty 的调度中心负责监听多种事件类型I/O 事件、信号事件、定时事件等。
### 编解码协议
#### EventLoop运行模式
netty-codec模块主要负责编解码工作通过编解码实现原始字节数据与业务实体对象之间的相互转化。Netty支持大多数业界主流协议的编解码器如**HTTP、HTTP2、Redis、XML**等,为开发者节省了大量的精力。此外该模块提供了抽象的编解码类**ByteToMessageDecoder**和**MessageToByteEncoder**,通过继承这两个类可以轻松实现自定义的编解码逻辑。
![EventLoop通用的运行模式](images/Middleware/EventLoop通用的运行模式.png)
![Netty协议](images/Middleware/Netty协议.png)
上图展示了 EventLoop 通用的运行模式。每当事件发生时,应用程序都会将产生的事件放入事件队列当中,然后 EventLoop 会轮询从队列中取出事件执行或者将事件分发给相应的事件监听者执行。事件执行的方式通常分为**立即执行、延后执行、定期执行**几种。
### 拆包粘包
#### NioEventLoop原理
在 Netty 中 EventLoop 可以理解为 Reactor 线程模型的事件处理引擎,每个 EventLoop 线程都维护一个 Selector 选择器和任务队列 taskQueue。它主要负责处理 I/O 事件、普通任务和定时任务。Netty 中推荐使用 NioEventLoop 作为实现类,那么 Netty 是如何实现 NioEventLoop 的呢?首先我们来看 NioEventLoop 最核心的 run() 方法源码:
- 拆包/粘包的解决方案
```java
protected void run() {
for (;;) {
try {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
// 轮询 I/O 事件
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
} catch (IOException e) {
rebuildSelector0();
handleLoopException(e);
continue;
}
- **消息长度固定**:每个数据报文都需要一个固定的长度。当接收方累计读取到固定长度的报文后,就认为已经获得一个完整的消息。当发送方的数据小于固定长度时,则需要空位补齐。消息定长法使用非常简单,但是缺点也非常明显,无法很好设定固定长度的值,如果长度太大会造成字节浪费,长度太小又会影响消息传输,所以在一般情况下消息定长法不会被采用。
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
// 处理 I/O 事件
processSelectedKeys();
} finally {
// 处理所有任务
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
// 处理 I/O 事件
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
// 处理完 I/O 事件,再处理异步任务队列
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
```
- **特定分隔符**
上述源码的结构比较清晰NioEventLoop 每次循环的处理流程都包含事件轮询 select、事件处理 processSelectedKeys、任务处理 runAllTasks 几个步骤,是典型的 Reactor 线程模型的运行机制。而且 Netty 提供了一个参数 ioRatio可以调整 I/O 事件处理和任务处理的时间比例。下面我们将着重从**事件处理**和**任务处理**两个核心部分出发,详细介绍 Netty EventLoop 的实现原理。
既然接收方无法区分消息的边界,那么我们可以在每次发送报文的尾部加上特定分隔符,接收方就可以根据特殊分隔符进行消息拆分。
- **消息长度 + 消息内容**
消息长度 + 消息内容是项目开发中最常用的一种协议,如上展示了该协议的基本格式。消息头中存放消息的总长度,例如使用 4 字节的 int 值记录消息的长度,消息体实际的二进制的字节数据。接收方在解析数据时,首先读取消息头的长度字段 Len然后紧接着读取长度为 Len 的字节数据,该数据即判定为一个完整的数据报文。
#### 事件处理机制
![事件处理机制](images/Middleware/事件处理机制.png)
结合Netty的整体架构看上述EventLoop的事件流转图以便更好地理解 Netty EventLoop 的设计原理。NioEventLoop 的事件处理机制采用的是**无锁串行化的设计思路**
## Netty流程
- **BossEventLoopGroup****WorkerEventLoopGroup** 包含一个或者多个 NioEventLoop
从功能上,流程可以分为服务启动、建立连接、读取数据、业务处理、发送数据、关闭连接以及关闭服务。整体流程如下所示(图中没有包含关闭的部分)
BossEventLoopGroup 负责监听客户端的 Accept 事件,当事件触发时,将事件注册至 WorkerEventLoopGroup 中的一个 NioEventLoop 上。每新建一个 Channel 只选择一个 NioEventLoop 与其绑定。所以说 Channel 生命周期的所有事件处理都是**线程独立**的,不同的 NioEventLoop 线程之间不会发生任何交集。
![Netty整体流程](images/Middleware/Netty整体流程.png)
- NioEventLoop 完成数据读取后,会调用绑定的 ChannelPipeline 进行事件传播
![Netty线程模型](images/Middleware/Netty线程模型.png)
ChannelPipeline 也是**线程安全**的,数据会被传递到 ChannelPipeline 的第一个 ChannelHandler 中。数据处理完成后,将加工完成的数据再传递给下一个 ChannelHandler整个过程是**串行化**执行,不会发生线程上下文切换的问题。
## 处理事件
NioEventLoop 无锁串行化的设计不仅使系统吞吐量达到最大化,而且降低了用户开发业务逻辑的难度,不需要花太多精力关心线程安全问题。虽然单线程执行避免了线程切换,但是它的缺陷就是不能执行时间过长的 I/O 操作,一旦某个 I/O 事件发生阻塞,那么后续的所有 I/O 事件都无法执行,甚至造成事件积压。在使用 Netty 进行程序开发时,我们一定要对 ChannelHandler 的实现逻辑有充分的风险意识。
Netty中Reactor线程和worker线程所处理的事件
NioEventLoop 线程的可靠性至关重要,一旦 NioEventLoop 发生阻塞或者陷入空轮询,就会导致整个系统不可用。在 JDK 中, Epoll 的实现是存在漏洞的,即使 Selector 轮询的事件列表为空NIO 线程一样可以被唤醒,导致 CPU 100% 占用。这就是臭名昭著的 JDK epoll 空轮询的 Bug。Netty 作为一个高性能、高可靠的网络框架,需要保证 I/O 线程的安全性。那么它是如何解决 JDK epoll 空轮询的 Bug 呢?实际上 Netty 并没有从根源上解决该问题,而是巧妙地规避了这个问题。
1、Server端NioEventLoop处理的事件
![Server端NioEventLoop处理的事件](images/Middleware/Server端NioEventLoop处理的事件.png)
2、Client端NioEventLoop处理的事件
抛开其它细枝末节直接定位到事件轮询select()方法中的最后一部分代码一起看下Netty是如何解决epoll空轮询的Bug
![Client端NioEventLoop处理的事件](images/Middleware/Client端NioEventLoop处理的事件.png)
```java
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
```
Netty提供了一种检测机制判断线程是否可能陷入空轮询具体的实现方式如下
- 每次执行 select 操作之前记录当前时间 currentTimeNanos
- **time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos**,如果事件轮询的持续时间大于等于 timeoutMillis那么说明是正常的否则表明阻塞时间并未达到预期可能触发了空轮询的 Bug
- Netty 引入了计数变量 selectCnt。在正常情况下selectCnt 会重置,否则会对 selectCnt 自增计数。当 selectCnt 达到 SELECTOR_AUTO_REBUILD_THRESHOLD默认512 阈值时,会触发重建 Selector 对象
### 服务启动
Netty 采用这种方法巧妙地规避了 JDK Bug。异常的 Selector 中所有的 SelectionKey 会重新注册到新建的 Selector 上,重建完成之后异常的 Selector 就可以废弃了。
服务启动时,以 example 代码中的 EchoServer 为例,启动的过程以及相应的源码类如下:
1. `EchoServer#new NioEventLoopGroup(1)->NioEventLoop#provider.openSelector()` : 创建 selector
2. `EchoServer#b.bind(PORT).sync->AbstractBootStrap#doBind()->initAndRegister()-> channelFactory.newChannel() / init(channel)` : 创建 serverSocketChannel 以及初始化
3. `EchoServer#b.bind(PORT).sync->AbstractBootStrap#doBind()->initAndRegister()-> config().group().register(channel)` :从 boss group 中选择一个 NioEventLoop 开始注册 serverSocketChannel
4. `EchoServer#b.bind(PORT).sync->AbstractBootStrap#doBind()->initAndRegister()->config().group().register(channel)->AbstractChannel#register0(promise)->AbstractNioChannel#javaChannel().register(eventLoop().unwrappedSelector(), 0, this)` : 将 server socket channel 注册到选择的 NioEventLoop 的 selector
5. `EchoServer#b.bind(PORT).sync()->AbstractBootStrap#doBind()->doBind0()->AbstractChannel#doBind(localAddress)->NioServerSocketChannel#javaChannel().bind(localAddress, config.getBacklog())` : 绑定地址端口开始启动
6. `EchoServer#b.bind(PORT).sync()->AbstractBootStrap#doBind()->doBind0()->AbstractChannel#pipeline.fireChannelActive()->AbstractNioChannel#selectionKey.interestOps(interestOps|readInterestOp)`: 注册 OP_READ 事件
上述启动流程中1、2、3 由我们自己的线程执行即mainThread4、5、6 是由Boss Thread执行。相应时序图如下
![Netty流程-服务启动](images/Middleware/Netty流程-服务启动.jpg)
#### 任务处理机制
NioEventLoop 不仅负责处理 I/O 事件,还要兼顾执行任务队列中的任务。任务队列遵循 FIFO 规则可以保证任务执行的公平性。NioEventLoop 处理的任务类型基本可以分为三类:
- **普通任务**:通过 NioEventLoop 的 execute() 方法向任务队列 taskQueue 中添加任务。例如 Netty 在写数据时会封装 WriteAndFlushTask 提交给 taskQueue。taskQueue 的实现类是多生产者单消费者队列 MpscChunkedArrayQueue在多线程并发添加任务时可以保证线程安全
- **定时任务**:通过调用 NioEventLoop 的 schedule() 方法向定时任务队列 scheduledTaskQueue 添加一个定时任务,用于周期性执行该任务。例如,心跳消息发送等。定时任务队列 scheduledTaskQueue 采用优先队列 PriorityQueue 实现
- **尾部队列**tailTasks 相比于普通任务队列优先级较低,在每次执行完 taskQueue 中任务后会去获取尾部队列中任务执行。尾部任务并不常用,主要用于做一些收尾工作,例如统计事件循环的执行时间、监控信息上报等
### 建立连接
下面结合任务处理 runAllTasks 的源码结构,分析下 NioEventLoop 处理任务的逻辑,源码实现如下:
服务启动后便是建立连接的过程了,相应过程及源码类如下:
```java
protected boolean runAllTasks(long timeoutNanos) {
// 1. 合并定时任务到普通任务队列
fetchFromScheduledTaskQueue();
// 2. 从普通任务队列中取出任务
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
// 3. 计算任务处理的超时时间
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
// 4. 安全执行任务
safeExecute(task);
runTasks ++;
// 5. 每执行 64 个任务检查一下是否超时
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
// 6. 收尾工作
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
```
1. `NioEventLoop#run()->processSelectedKey()` NioEventLoop 中的 selector 轮询创建连接事件OP_ACCEPT
2. `NioEventLoop#run()->processSelectedKey()->AbstractNioMessageChannel#read->NioServerSocketChannel#doReadMessages()->SocketUtil#accept(serverSocketChannel)` 创建 socket channel
3. `NioEventLoop#run()->processSelectedKey()->AbstractNioMessageChannel#fireChannelRead->ServerBootstrap#ServerBootstrapAcceptor#channelRead-> childGroup.register(child)` 从worker group 中选择一个 NioEventLoop 开始注册 socket channel
4. `NioEventLoop#run()->processSelectedKey()->AbstractNioMessageChannel#fireChannelRead->ServerBootstrap#ServerBootstrapAcceptor#channelRead-> childGroup.register(child)->AbstractChannel#register0(promise)-> AbstractNioChannel#javaChannel().register(eventLoop().unwrappedSelector(), 0, this)` 将 socket channel 注册到选择的 NioEventLoop 的 selector
5. `NioEventLoop#run()->processSelectedKey()->AbstractNioMessageChannel#fireChannelRead->ServerBootstrap#ServerBootstrapAcceptor#channelRead-> childGroup.register(child)->AbstractChannel#pipeline.fireChannelActive()-> AbstractNioChannel#selectionKey.interestOps(interestOps | readInterestOp)` 注册 OP_ACCEPT 事件
在代码中以注释的方式标注了具体的实现步骤,可以分为 6 个步骤:
同样,上述流程中 1、2、3 的执行仍由 Boss Thread 执行,直到 4、5 由具体的 Work Thread 执行。
![Netty流程-建立连接](images/Middleware/Netty流程-建立连接.jpg)
- **fetchFromScheduledTaskQueue函数**:将定时任务从 scheduledTaskQueue 中取出,聚合放入普通任务队列 taskQueue 中,只有定时任务的截止时间小于当前时间才可以被合并
- **从普通任务队列taskQueue中取出任务**
- **计算任务执行的最大超时时间**
- **safeExecute函数**:安全执行任务,实际直接调用的 Runnable 的 run() 方法
- **每执行 64 个任务进行超时时间的检查**:如果执行时间大于最大超时时间,则立即停止执行任务,避免影响下一轮的 I/O 事件的处理
- **最后获取尾部队列中的任务执行**
### 读写与业务处理
#### EventLoop最佳实践
连接建立完毕后是具体的读写,以及业务处理逻辑。以 EchoServerHandler 为例,读取数据后会将数据传播出去供业务逻辑处理,此时的 EchoServerHandler 代表我们的业务逻辑,而它的实现也非常简单,就是直接将数据写回去。我们将这块看成一个整条,流程如下:
在日常开发中用好 EventLoop 至关重要,这里结合实际工作中的经验给出一些 EventLoop 的最佳实践方案
1. `NioEventLoop#run()->processSelectedKey() NioEventLoop 中的 selector` 轮询创建读取事件OP_READ
2. `NioEventLoop#run()->processSelectedKey()->AbstractNioByteChannel#read()`nioSocketChannel 开始读取数据
3. `NioEventLoop#run()->processSelectedKey()->AbstractNioByteChannel#read()->pipeline.fireChannelRead(byteBuf)`把读取到的数据传播出去供业务处理
4. `AbstractNioByteChannel#pipeline.fireChannelRead->EchoServerHandler#channelRead`在这个例子中即 EchoServerHandler 的执行
5. `EchoServerHandler#write->ChannelOutboundBuffer#addMessage` 调用 write 方法
6. `EchoServerHandler#flush->ChannelOutboundBuffer#addFlush` 调用 flush 准备数据
7. `EchoServerHandler#flush->NioSocketChannel#doWrite` 调用 flush 发送数据
- 网络连接建立过程中三次握手、安全认证的过程会消耗不少时间。这里建议采用 Boss 和 Worker 两个 EventLoopGroup有助于分担 Reactor 线程的压力
- 由于 Reactor 线程模式适合处理耗时短的任务场景,对于耗时较长的 ChannelHandler 可以考虑维护一个业务线程池,将编解码后的数据封装成 Task 进行异步处理,避免 ChannelHandler 阻塞而造成 EventLoop 不可用
- 如果业务逻辑执行时间较短,建议直接在 ChannelHandler 中执行。例如编解码操作,这样可以避免过度设计而造成架构的复杂性
- 不宜设计过多的 ChannelHandler。对于系统性能和可维护性都会存在问题在设计业务架构的时候需要明确业务分层和 Netty 分层之间的界限。不要一味地将业务逻辑都添加到 ChannelHandler 中
在这个过程中读写数据都是由 Work Thread 执行的,但是业务处理可以由我们自定义的线程池来处理,并且一般我们也是这么做的,默认没有指定线程的情况下仍然由 Work Thread 代为处理。
![Netty流程-读写与业务处理](images/Middleware/Netty流程-读写与业务处理.jpg)
### ChannelPipeline
### 关闭连接
Pipeline 的字面意思是管道、流水线。它在 Netty 中起到的作用,和一个工厂的流水线类似。原始的网络字节流经过 Pipeline被一步步加工包装最后得到加工后的成品。是Netty的核心处理链用以实现网络事件的动态编排和有序传播。
服务处理完毕后,单个连接的关闭是什么样的呢?
#### ChannelPipeline内部结构
1. `NioEventLoop#run()->processSelectedKey()` NioEventLoop 中的 selector 轮询创建读取事件OP_READ),这里关闭连接仍然是读取事件
2. `NioEventLoop#run()->processSelectedKey()->AbstractNioByteChannel#read()->closeOnRead(pipeline)`当字节<0 时开始执行关闭 nioSocketChannel
3. `NioEventLoop#run()->processSelectedKey()->AbstractNioByteChannel#read()->closeOnRead(pipeline)->AbstractChannel#close->AbstractNioChannel#doClose()` 关闭 socketChannel
4. `NioEventLoop#run()->processSelectedKey()->AbstractNioByteChannel#read()->closeOnRead(pipeline)->AbstractChannel#close->outboundBuffer.failFlushed/close` 清理消息不接受新信息fail 掉所有 queue 中消息
5. `NioEventLoop#run()->processSelectedKey()->AbstractNioByteChannel#read()->closeOnRead(pipeline)->AbstractChannel#close->fireChannelInactiveAndDeregister->AbstractNioChannel#doDeregister eventLoop().cancel(selectionKey())` 关闭多路复用器的 key
ChannelPipeline 可以看作是 ChannelHandler 的容器载体,它是由一组 ChannelHandler 实例组成的,内部通过双向链表将不同的 ChannelHandler 链接在一起,如下图所示。当有 I/O 读写事件触发时ChannelPipeline 会依次调用 ChannelHandler 列表对 Channel 的数据进行拦截和处理。
时序图如下:
![Netty流程-关闭连接.jpg](images/Middleware/Netty流程-关闭连接.jpg)
![ChannelPipeline内部结构](images/Middleware/ChannelPipeline内部结构.png)
根据网络数据的流向ChannelPipeline 分为入站 ChannelInboundHandler 和出站 ChannelOutboundHandler 两种处理器。在客户端与服务端通信的过程中,数据从客户端发向服务端的过程叫出站,反之称为入站。
![ChannelPipeline入站和出站](images/Middleware/ChannelPipeline入站和出站.png)
### 关闭服务
最后是关闭整个 Netty 服务:
1. `NioEventLoop#run->closeAll()->selectionKey.cancel/channel.close` 关闭 channel取消 selectionKey
2. `NioEventLoop#run->confirmShutdown->cancelScheduledTasks` 取消定时任务
3. `NioEventLoop#cleanup->selector.close()` 关闭 selector
#### ChannelHandler接口设计
时序图如下,为了好画将 NioEventLoop 拆成了 2 块:
![Netty流程-关闭服务.jpg](images/Middleware/Netty流程-关闭服务.jpg)
整个ChannelHandler是围绕I/O事件的生命周期所设计的如建立连接、读数据、写数据、连接销毁等。ChannelHandler 有两个重要的**子接口****ChannelInboundHandler**和**ChannelOutboundHandler**,分别拦截**入站和出站的各种 I/O 事件**。
**① ChannelInboundHandler的事件回调方法与触发时机**
| 事件回调方法 | 触发时机 |
| ------------------------- | -------------------------------------------------- |
| channelRegistered | Channel 被注册到 EventLoop |
| channelUnregistered | Channel 从 EventLoop 中取消注册 |
| channelActive | Channel 处于就绪状态,可以被读写 |
| channelInactive | Channel 处于非就绪状态Channel 可以从远端读取到数据 |
| channelRead | Channel 可以从远端读取到数据 |
| channelReadComplete | Channel 读取数据完成 |
| userEventTriggered | 用户事件触发时 |
| channelWritabilityChanged | Channel 的写状态发生变化 |
## 长连接优化
### 更多连接
### 更高QPS
**② ChannelOutboundHandler的事件回调方法与触发时机**
ChannelOutboundHandler 的事件回调方法非常清晰,直接通过 ChannelOutboundHandler 的接口列表可以看到每种操作所对应的回调方法,如下图所示。这里每个回调方法都是在相应操作执行之前触发,在此就不多做赘述了。此外 ChannelOutboundHandler 中绝大部分接口都包含ChannelPromise 参数,以便于在操作完成时能够及时获得通知。
![ChannelOutboundHandler](images/Middleware/ChannelOutboundHandler.png)
## 线程模型
Netty通过Reactor模型基于多路复用器接收并处理用户请求内部实现了两个线程池boss线程池和work线程池其中boss线程池的线程负责处理请求的accept事件当接收到accept事件的请求时把对应的socket封装到一个NioSocketChannel中并交给work线程池其中work线程池负责请求的read和write事件由对应的Handler处理。
### 单线程Reactor线程模型
#### ChannelPipeline事件传播机制
下图演示了单线程reactor线程模型之所以称之为单线程还是因为只有一个accpet Thread接受任务之后转发到reactor线程中进行处理。两个黄色框表示的是Reactor Thread Group里面有多个Reactor Thread。一个Reactor Thread Group中的Reactor Thread功能都是相同的例如第一个黄色框中的Reactor Thread都是处理拆分后的任务的第一阶段第二个黄色框中的Reactor Thread都是处理拆分后的任务的第二步骤。任务具体要怎么拆分要结合具体场景下图只是演示作用。**一般来说,都是以比较耗时的操作(例如IO)为切分点**。
上述ChannelPipeline可分为入站 ChannelInboundHandler 和出站 ChannelOutboundHandler 两种处理器,与此对应传输的事件类型可以分为**Inbound 事件**和**Outbound 事件**。
![单线程reactor线程模型](images/Middleware/单线程reactor线程模型.png)
- **Inbound事件**传播方向为Head->Tail即按照添加的顺序进行正向传播A→B→C
特别的如果我们在任务处理的过程中不划分为多个阶段进行处理的话那么单线程reactor线程模型就退化成了并行工作和模型。**事实上可以认为并行工作者模型就是单线程reactor线程模型的最简化版本。**
- **Outbound事件**传播方向为Tail->Head即按照添加的顺序进行反向传播C→B→A
代码示例体验 ChannelPipeline 的事件传播机制:
```java
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new SampleInBoundHandler("SampleInBoundHandlerA", false))
.addLast(new SampleInBoundHandler("SampleInBoundHandlerB", false))
.addLast(new SampleInBoundHandler("SampleInBoundHandlerC", true));
ch.pipeline()
.addLast(new SampleOutBoundHandler("SampleOutBoundHandlerA"))
.addLast(new SampleOutBoundHandler("SampleOutBoundHandlerB"))
.addLast(new SampleOutBoundHandler("SampleOutBoundHandlerC"));
### 多线程Reactor线程模型
}
}
```
所谓多线程reactor线程模型无非就是有多个accpet线程如下图中的虚线框中的部分。
执行结果:
![多线程reactor线程模型](images/Middleware/多线程reactor线程模型.png)
![SampleOutBoundHandler执行结果](images/Middleware/SampleOutBoundHandler执行结果.png)
### 混合型Reactor线程模型
#### ChannelPipeline异常传播机制
混合型reactor线程模型实际上最能体现reactor线程模型的本质
ChannelPipeline 事件传播的实现采用了经典的责任链模式调用链路环环相扣。那么如果有一个节点处理逻辑异常会出现什么现象呢ctx.fireExceptionCaugh 会将异常按顺序从 Head 节点传播到 Tail 节点。如果用户没有对异常进行拦截处理,最后将由 Tail 节点统一处理。
- 将任务处理切分成多个阶段进行,每个阶段处理完自己的部分之后,转发到下一个阶段进行处理。不同的阶段之间的执行是异步的,可以认为每个阶段都有一个独立的线程池。
- 不同的类型的任务,有着不同的处理流程,划分时需要划分成不同的阶段。如下图蓝色是一种任务、绿色是另一种任务,两种任务有着不同的执行流程
![Netty异常处理的最佳实践](images/Middleware/Netty异常处理的最佳实践.png)
![混合型reactor线程模型](images/Middleware/混合型reactor线程模型.png)
建议用户自定义的异常处理器代码示例如下:
```java
public class ExceptionHandler extends ChannelDuplexHandler {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
if (cause instanceof RuntimeException) {
System.out.println("Handle Business Exception Success.");
}
}
}
```
加入统一的异常处理器后,可以看到异常已经被优雅地拦截并处理掉了。这也是 Netty 推荐的最佳异常处理实践。
### Netty-Reactor线程模型
![Netty统一异常处理验证](images/Middleware/Netty统一异常处理验证.png)
![Netty-Reactor](images/Middleware/Netty-Reactor.png)
图中大致包含了5个步骤而我们编写的服务端代码中可能并不能完全体现这样的步骤因为Netty将其中一些步骤的细节隐藏了笔者将会通过图形分析与源码分析相结合的方式帮助读者理解这五个步骤。这个五个步骤可以按照以下方式简要概括
- 设置服务端ServerBootStrap启动参数
- 通过ServerBootStrap的bind方法启动服务端bind方法会在parentGroup中注册NioServerScoketChannel监听客户端的连接请求
- Client发起连接CONNECT请求parentGroup中的NioEventLoop不断轮循是否有新的客户端请求如果有ACCEPT事件触发
- ACCEPT事件触发后parentGroup中NioEventLoop会通过NioServerSocketChannel获取到对应的代表客户端的NioSocketChannel并将其注册到childGroup中
- childGroup中的NioEventLoop不断检测自己管理的NioSocketChannel是否有读写事件准备好如果有的话调用对应的ChannelHandler进行处理
### 定时器TimerTask
## HashedWheelTimer
### 时间轮HashedWheelTimer
时间轮其实就是一种环形的数据结构,可以想象成时钟,分成很多格子,一个格子代表一段时间。并用一个链表保存在该格子上的计划任务,同时一个指针随着时间一格一格转动,并执行相应格子中的所有到期任务。任务通过时间取模决定放入那个格子。
@ -2892,7 +3008,7 @@ Netty 的时间轮 `HashedWheelTimer` 给出了一个**粗略的定时器实现*
### 源码解读
**源码解读**
```java
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit,
@ -2910,7 +3026,7 @@ public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit
### 优缺点
**优缺点**
- **优点**
- 可以添加、删除、取消定时任务
@ -2921,7 +3037,7 @@ public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit
### 定时任务方案
**定时任务方案**
目前主流的一些定时任务方案:
@ -2932,9 +3048,7 @@ public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit
- Netty的HashedWheelTimer时间轮
- Kafka的TimingWheel层级时间轮
### 使用案例
使用案例:
```java
// 构造一个 Timer 实例
@ -2968,6 +3082,219 @@ timer.newTimeout(timeout1.task(), 3, TimeUnit.SECONDS);
### 无锁队列mpsc queue
### FastThreadLocal
### ByteBuf
### 编解码协议
netty-codec模块主要负责编解码工作通过编解码实现原始字节数据与业务实体对象之间的相互转化。Netty支持大多数业界主流协议的编解码器如**HTTP、HTTP2、Redis、XML**等,为开发者节省了大量的精力。此外该模块提供了抽象的编解码类**ByteToMessageDecoder**和**MessageToByteEncoder**,通过继承这两个类可以轻松实现自定义的编解码逻辑。
![Netty协议](images/Middleware/Netty协议.png)
### 拆包粘包
- 拆包/粘包的解决方案
- **消息长度固定**:每个数据报文都需要一个固定的长度。当接收方累计读取到固定长度的报文后,就认为已经获得一个完整的消息。当发送方的数据小于固定长度时,则需要空位补齐。消息定长法使用非常简单,但是缺点也非常明显,无法很好设定固定长度的值,如果长度太大会造成字节浪费,长度太小又会影响消息传输,所以在一般情况下消息定长法不会被采用。
- **特定分隔符**
既然接收方无法区分消息的边界,那么我们可以在每次发送报文的尾部加上特定分隔符,接收方就可以根据特殊分隔符进行消息拆分。
- **消息长度 + 消息内容**
消息长度 + 消息内容是项目开发中最常用的一种协议,如上展示了该协议的基本格式。消息头中存放消息的总长度,例如使用 4 字节的 int 值记录消息的长度,消息体实际的二进制的字节数据。接收方在解析数据时,首先读取消息头的长度字段 Len然后紧接着读取长度为 Len 的字节数据,该数据即判定为一个完整的数据报文。
## 核心流程
### 服务端启动流程
Netty 服务端的启动过程大致分为三个步骤:
- 配置线程池
- Channel 初始化
- 端口绑定
#### 配置线程池
**单线程模式**
Reactor 单线程模型所有 I/O 操作都由一个线程完成,所以只需要启动一个 EventLoopGroup 即可。
```java
EventLoopGroup group = new NioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.group(group);
```
**多线程模式**
Reactor 单线程模型有非常严重的性能瓶颈,因此 Reactor 多线程模型出现了。在 Netty 中使用 Reactor 多线程模型与单线程模型非常相似,区别是 NioEventLoopGroup 可以不需要任何参数,它默认会启动 2 倍 CPU 核数的线程。当然,你也可以自己手动设置固定的线程数。
```java
EventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(group);
```
**主从多线程模式**
在大多数场景下,我们采用的都是主从多线程 Reactor 模型。Boss 是主 ReactorWorker 是从 Reactor。它们分别使用不同的 NioEventLoopGroup主 Reactor 负责处理 Accept然后把 Channel 注册到从 Reactor 上,从 Reactor 主要负责 Channel 生命周期内的所有 I/O 事件。
```java
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
```
#### Channel 初始化
**设置Channel类型**
```java
// 客户端Channel
b.channel(NioSocketChannel.class);
b.channel(OioSocketChannel.class);
// 服务端Channel
b.channel(NioServerSocketChannel.class);
b.channel(OioServerSocketChannel.class);
b.channel(EpollServerSocketChannel.class);
// UDP
b.channel(NioDatagramChannel.class);
b.channel(OioDatagramChannel.class);
```
**注册ChannelHandler**
ServerBootstrap 的 childHandler() 方法需要注册一个 ChannelHandler。ChannelInitializer是实现了 ChannelHandler接口的匿名类通过实例化 ChannelInitializer 作为 ServerBootstrap 的参数。
```java
b.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) {
        ch.pipeline()
// HTTP 编解码处理器
                .addLast("codec", new HttpServerCodec())
// HTTPContent 压缩处理器
                .addLast("compressor", new HttpContentCompressor())
// HTTP 消息聚合处理器
                .addLast("aggregator", new HttpObjectAggregator(65536)) 
// 自定义业务逻辑处理器
                .addLast("handler", new HttpServerHandler());
    }
});
```
**设置Channel参数**
ServerBootstrap 设置 Channel 属性有option和childOption两个方法option 主要负责设置 Boss 线程组,而 childOption 对应的是 Worker 线程组。
```java
b.option(ChannelOption.SO_KEEPALIVE, true);
```
常用参数如下:
| 参数名 | 描述信息 |
| ---------------------- | ------------------------------------------------------------ |
| SO_KEEPALIVE | 设置为 true 代表启用了 TCP SO_KEEPALIVE 属性TCP 会主动探测连接状态,即连接保活 |
| SO_BACKLOG | 已完成三次握手的请求队列最大长度,同一时刻服务端可能会处理多个连接,在高并发海量连接的场景下,该参数应适当调大 |
| TCP_NODELAY | Netty 默认是 true表示立即发送数据。如果设置为 false 表示启用 Nagle 算法,该算法会将 TCP 网络数据包累积到一定量才会发送虽然可以减少报文发送的数量但是会造成一定的数据延迟。Netty 为了最小化数据传输的延迟,默认禁用了 Nagle 算法 |
| SO_SNDBUF | TCP 数据发送缓冲区大小 |
| SO_RCVBUF | TCP数据接收缓冲区大小TCP数据接收缓冲区大小 |
| SO_LINGER | 设置延迟关闭的时间,等待缓冲区中的数据发送完成 |
| CONNECT_TIMEOUT_MILLIS | 建立连接的超时时间 |
#### 端口绑定
bind() 方法会真正触发启动sync() 方法则会阻塞,直至整个启动过程完成,具体使用方式如下:
```java
ChannelFuture f = b.bind(8080).sync();
```
## Netty流程
从功能上,流程可以分为服务启动、建立连接、读取数据、业务处理、发送数据、关闭连接以及关闭服务。整体流程如下所示(图中没有包含关闭的部分)
![Netty整体流程](images/Middleware/Netty整体流程.png)
![Netty线程模型](images/Middleware/Netty线程模型.png)
## 线程模型
Netty通过Reactor模型基于多路复用器接收并处理用户请求内部实现了两个线程池boss线程池和work线程池其中boss线程池的线程负责处理请求的accept事件当接收到accept事件的请求时把对应的socket封装到一个NioSocketChannel中并交给work线程池其中work线程池负责请求的read和write事件由对应的Handler处理。
### 单线程Reactor线程模型
下图演示了单线程reactor线程模型之所以称之为单线程还是因为只有一个accpet Thread接受任务之后转发到reactor线程中进行处理。两个黄色框表示的是Reactor Thread Group里面有多个Reactor Thread。一个Reactor Thread Group中的Reactor Thread功能都是相同的例如第一个黄色框中的Reactor Thread都是处理拆分后的任务的第一阶段第二个黄色框中的Reactor Thread都是处理拆分后的任务的第二步骤。任务具体要怎么拆分要结合具体场景下图只是演示作用。**一般来说,都是以比较耗时的操作(例如IO)为切分点**。
![单线程reactor线程模型](images/Middleware/单线程reactor线程模型.png)
特别的如果我们在任务处理的过程中不划分为多个阶段进行处理的话那么单线程reactor线程模型就退化成了并行工作和模型。**事实上可以认为并行工作者模型就是单线程reactor线程模型的最简化版本。**
### 多线程Reactor线程模型
所谓多线程reactor线程模型无非就是有多个accpet线程如下图中的虚线框中的部分。
![多线程reactor线程模型](images/Middleware/多线程reactor线程模型.png)
### 混合型Reactor线程模型
混合型reactor线程模型实际上最能体现reactor线程模型的本质
- 将任务处理切分成多个阶段进行,每个阶段处理完自己的部分之后,转发到下一个阶段进行处理。不同的阶段之间的执行是异步的,可以认为每个阶段都有一个独立的线程池。
- 不同的类型的任务,有着不同的处理流程,划分时需要划分成不同的阶段。如下图蓝色是一种任务、绿色是另一种任务,两种任务有着不同的执行流程
![混合型reactor线程模型](images/Middleware/混合型reactor线程模型.png)
### Netty-Reactor线程模型
![Netty-Reactor](images/Middleware/Netty-Reactor.png)
图中大致包含了5个步骤而我们编写的服务端代码中可能并不能完全体现这样的步骤因为Netty将其中一些步骤的细节隐藏了笔者将会通过图形分析与源码分析相结合的方式帮助读者理解这五个步骤。这个五个步骤可以按照以下方式简要概括
- 设置服务端ServerBootStrap启动参数
- 通过ServerBootStrap的bind方法启动服务端bind方法会在parentGroup中注册NioServerScoketChannel监听客户端的连接请求
- Client发起连接CONNECT请求parentGroup中的NioEventLoop不断轮循是否有新的客户端请求如果有ACCEPT事件触发
- ACCEPT事件触发后parentGroup中NioEventLoop会通过NioServerSocketChannel获取到对应的代表客户端的NioSocketChannel并将其注册到childGroup中
- childGroup中的NioEventLoop不断检测自己管理的NioSocketChannel是否有读写事件准备好如果有的话调用对应的ChannelHandler进行处理
## ByteBuf

Binary file not shown.

After

Width:  |  Height:  |  Size: 348 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 244 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 386 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 220 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 188 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 358 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 75 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 357 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 124 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 311 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 110 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 128 KiB

Loading…
Cancel
Save