diff --git a/README.md b/README.md index 1268b53..abb721e 100644 --- a/README.md +++ b/README.md @@ -88,26 +88,33 @@ - [IO模型](docs/Netty/IOTechnologyBase/IO模型.md) - [四种IO编程及对比](docs/Netty/IOTechnologyBase/四种IO编程及对比.md) -### TCP粘包/拆包 +### Netty 粘拆包解决方案 - [TCP粘拆包问题及Netty中的解决方案](docs/Netty/TCP粘拆包/TCP粘拆包问题及Netty中的解决方案.md) -### Netty 编解码开发 -- [编解码技术]() -- [Java常用的序列化框架]() - -### Netty多协议开发 -- [HTTP协议开发]() -- [WebSocket协议开发]() -- [自定义协议开发]() - -### Netty源码分析 -- [Channel和Unsafe组件]() -- [ChannelPipeline和ChannelHandler组件]() -- [EventLoop和EventLoopGroup组件]() - -### Netty高级特性 -- [Java多线程编程在Netty中的应用]() -- [Netty的高性能之道]() +### Netty 编解码 +- [Java序列化缺点与主流编解码框架](docs/Netty/Netty编解码/Java序列化缺点与主流编解码框架.md) + +### Netty 多协议开发 +- [基于HTTP协议的Netty开发](docs/Netty/Netty多协议开发/基于HTTP协议的Netty开发.md) +- [基于WebSocket协议的Netty开发](docs/Netty/Netty多协议开发/基于WebSocket协议的Netty开发.md) +- [基于自定义协议的Netty开发](docs/Netty/Netty多协议开发/基于自定义协议的Netty开发.md) + +### 基于Netty开发服务端及客户端 +- [基于Netty的服务端开发](docs/Netty/基于Netty开发服务端及客户端/基于Netty的服务端开发.md) +- [基于Netty的客户端开发](docs/Netty/基于Netty开发服务端及客户端/基于Netty的客户端开发.md) + +### Netty 主要组件的源码分析 +- [ByteBuffer组件]() +- [Channel组件 和 Unsafe组件]() +- [ChannelPipeline 和 ChannelHandler组件]() +- [EventLoop 和 EventLoopGroup组件]() +- [Future 和 Promise组件]() + +### Netty 高级特性 +- [Netty 架构设计](docs/Netty/AdvancedFeaturesOfNetty/Netty架构设计.md) +- [Java 多线程编程在 Netty中的应用](docs/Netty/AdvancedFeaturesOfNetty/Java多线程编程再Netty中的应用.md) +- [Netty 高性能之道](docs/Netty/AdvancedFeaturesOfNetty/Netty高性能之道.md) +- [Netty 高可靠性设计](docs/Netty/AdvancedFeaturesOfNetty/Netty高可靠性设计.md) ## Redis - 努力编写中... diff --git a/docs/LearningExperience/DesignPattern/从Spring及Mybatis框架源码中学习设计模式(创建型).md b/docs/LearningExperience/DesignPattern/从Spring及Mybatis框架源码中学习设计模式(创建型).md index 1c435f1..9c62d35 100644 --- a/docs/LearningExperience/DesignPattern/从Spring及Mybatis框架源码中学习设计模式(创建型).md +++ b/docs/LearningExperience/DesignPattern/从Spring及Mybatis框架源码中学习设计模式(创建型).md @@ -582,7 +582,7 @@ public final class StringBuffer extends AbstractStringBuilder } ``` ### Mybatis中的范例 -MyBatis 的初始化过程使用了建造者模式,抽象类 BaseBuilder 扮演了“建造者接口”的角色,对一些公用方法进行了实现,并定义了公共属性。XMLConfigBuilder、XMLMapperBuilder、XMLStatementBuilder 等实现类扮演了“具体建造者”的角色,分别用于解析mybatis-config.xml配置文件、映射配置文件 以及 SQL节点。Configuration 和 SqlSessionFactoryBuilder 则分别扮演了“产品” 和 “导演”的角色。 +MyBatis 的初始化过程使用了建造者模式,抽象类 BaseBuilder 扮演了“建造者接口”的角色,对一些公用方法进行了实现,并定义了公共属性。XMLConfigBuilder、XMLMapperBuilder、XMLStatementBuilder 等实现类扮演了“具体建造者”的角色,分别用于解析mybatis-config.xml配置文件、映射配置文件 以及 SQL节点。Configuration 和 SqlSessionFactoryBuilder 则分别扮演了“产品” 和 “导演”的角色。**即,SqlSessionFactoryBuilder 使用了 BaseBuilder建造者组件 对复杂对象 Configuration 进行了构建。** BaseBuilder组件的设计与上面标准的建造者模式是有很大不同的,BaseBuilder的建造者模式主要是为了将复杂对象Configuration的构建过程分解的层次更清晰,将整个构建过程分解到多个“具体构造者”类中,需要这些“具体构造者”共同配合才能完成Configuration的构造,单个“具体构造者”不具有单独构造产品的能力,这与StringBuilder及StringBuffer是不同的。 diff --git a/docs/Netty/AdvancedFeaturesOfNetty/Java多线程编程再Netty中的应用.md b/docs/Netty/AdvancedFeaturesOfNetty/Java多线程编程再Netty中的应用.md new file mode 100644 index 0000000..6ded9ff --- /dev/null +++ b/docs/Netty/AdvancedFeaturesOfNetty/Java多线程编程再Netty中的应用.md @@ -0,0 +1 @@ +努力编写中...... \ No newline at end of file diff --git a/docs/Netty/AdvancedFeaturesOfNetty/Netty架构设计.md b/docs/Netty/AdvancedFeaturesOfNetty/Netty架构设计.md new file mode 100644 index 0000000..8b816b7 --- /dev/null +++ b/docs/Netty/AdvancedFeaturesOfNetty/Netty架构设计.md @@ -0,0 +1,73 @@ +本博文用于重点分析 Netty 的逻辑架构及关键的架构质量属性,希望有助于大家从 Netty 的架构设计中汲取营养,设计出高性能、高可靠 +性和可扩展的程序。 + +## Netty的三层架构设计 +Netty 采用了典型的三层网络架构进行设计和开发,其逻辑架构图如下所示。 + +![avatar](/images/Netty/Netty逻辑架构图.png) + +### 通信调度层 Reactor +它由一系列辅助类完成,包括 Reactor线程 NioEventLoop 及其父类,NioSocketChannel / NioServerSocketChannel 及其父类,Buffer组件,Unsafe组件 等。该层的主要职责就是**监听网络的读写和连接操作**,负责**将网络层的数据读取到内存缓冲区**,然后触发各种网络事件,例如连接创建、连接激活、读事件、写事件等,将这些事件触发到 PipeLine 中,由 PipeLine 管理的责任链来进行后续的处理。 + +### 责任链层 Pipeline +它负责上述的各种网络事件 在责任链中的有序传播,同时负责动态地编排责任链。责任链可以选择监听和处理自己关心的事件,它可以拦截处理事件,以及向前向后传播事件。不同应用的 Handler节点 的功能也不同,通常情况下,往往会开发 编解码Hanlder 用于消息的编解码,可以将外部的协议消息转换成 内部的POJO对象,这样上层业务则只需要关心处理业务逻辑即可,不需要感知底层的协议差异和线程模型差异,实现了架构层面的分层隔离。 + +### 业务逻辑编排层 Service ChannelHandler +业务逻辑编排层通常有两类:一类是纯粹的业务逻辑编排,还有一类是其他的应用层协议插件,用于特定协议相关的会话和链路管理。例如,CMPP协议,用于管理和中国移动短信系统的对接。 + +架构的不同层面,需要关心和处理的对象都不同,通常情况下,对于业务开发者,只需要关心责任链的拦截和 业务Handler 的编排。因为应用层协议栈往往是开发一次,到处运行,所以实际上对于业务开发者来说,只需要关心服务层的业务逻辑开发即可。各种应用协议以插件的形式提供,只有协议开发人员需要关注协议插件,对于其他业务开发人员来说,只需关心业务逻辑定制。这种分层的架构设计理念实现了 NIO框架 各层之间的解耦,便于上层业务协议栈的开发和业务逻辑的定制。 + +正是由于 Netty 的分层架构设计非常合理,基于 Netty 的各种应用服务器和协议栈开发才能够如雨后春笋般得到快速发展。 + +## 关键的架构质量属性 +### 性能 +影响最终产品的性能因素非常多,其中软件因素如下。 +- 架构不合理导致的性能问题; +- 编码实现不合理导致的性能问题,例如,锁没用好导致的性能瓶颈。 + +硬件因素如下。 +- 服务器硬件配置太低导致的性能问题; +- 带宽、磁盘的 IOPS 等限制导致的 IO操作 性能差; +- 测试环境被共用导致被测试的软件产品受到影响。 + +尽管影响产品性能的因素非常多,但是架构的性能模型合理与否对性能的影响非常大。如果一个产品的架构设计得不好,无论开发如何努力,都很难开发出一个高性能、高可用的软件产品。 + +“性能是设计出来的,而不是测试出来的”。下面我们看看 Netty 的架构设计是如何实现高性能的。 +1. 采用非阻塞的 NIO类库,基于 Reactor 模式实现,解决了传统 同步阻塞IO模式 下一个服务端无法平滑地处理线性增长的客户端的问题。 +2. TCP 接收和发送缓冲区**使用直接内存代替堆内存,避免了内存复制**,提升了 IO 读取和写入的性能。 +3. 支持通过内存池的方式循环利用 ByteBuffer,避免了频繁创建和销毁 ByteBuffer 带来的性能损耗。 +4. 可配置的 IO线程数、TCP参数 等,为不同的用户场景提供定制化的调优参数,满足不同的性能场景。 +5. 采用环形数组缓冲区实现无锁化并发编程,代替传统的线程安全容器或者锁。 +6. 合理地使用线程安全容器、原子类等,提升系统的并发处理能力。 +7. 关键资源的处理使用单线程串行化的方式,避免多线程并发访问带来的锁竞争和额外的 CPU资源消耗问题。 +8. 通过引用计数器及时地申请释放不再被引用的对象,细粒度的内存管理降低了 GC 的频率,减少了频繁 GC 带来的延时和 CPU损耗。 + +### 可靠性 +作为一个高性能的异步通信框架,架构的可靠性是大家选择的另一个重要依据。下面我们看一下 Netty架构 的可靠性设计。 + +**1、链路有效性检测** +由于长连接不需要每次发送消息都创建链路,也不需要在消息交互完成时关闭链路,因此相对于短连接性能更高。对于长连接,一旦链路建立成功便一直维系双方之间的链路,直到系统退出。 + +为了保证长连接的链路有效性,往往需要通过心跳机制周期性地进行链路检测。使用周期性心跳的原因是:在系统空闲时,例如凌晨,往往没有业务消息。如果此时链路被防火墙 Hang住,或者遭遇网络闪断、网络单通等,通信双方无法识别出这类链路异常。等到第二天业务高峰期到来时,瞬间的海量业务冲击会导致消息积压无法发送给对方,由于链路的重建需要时间,这期间业务会大量失败 (集群或者分布式组网情况会好一些)。为了解决这个问题,需要周期性的 “心跳检测” 对链路进行有效性检查,一旦发生问题,可以及时关闭链路,重建 TCP连接。 + +当有业务消息时,无须心跳检测,可以由业务消息进行链路可用性检测。所以心跳消息往往是在链路空闲时发送的。为了支持心跳机制,Netty 提供了如下两种链路空闲检测机制。 +- 读空闲超时机制:当经过 连续的周期 T 没有消息可读时,触发 超时Handler,用户可以基于 该读空闲超时Handler 发送心跳消息,进行链路检测,如果连续 N个周期 仍然没有读取到心跳消息,可以主动关闭这条链路。 +- 写空闲超时机制:当经过 连续的周期 T 没有消息要发送时,触发 超时Handler,用户可以基于 该写空闲超时Handler 发送心跳消息,进行链路检测,如果连续 N 个周期 仍然没有接收到对方的心跳消息,可以主动关闭这条链路。 + +为了满足不同用户场景的心跳定制,Netty 提供了空闲状态检测事件通知机制,用户可以订阅:空闲超时事件、读空闲超时机制、写空闲超时事件,在接收到对应的空闲事件之后,灵活地进行定制。 + +**2、内存保护机制** +Netty 提供多种机制对内存进行保护,包括以下几个方面。 +- 通过对象引用计数器对 Netty 的 ByteBuffer 等内置对象进行细粒度的内存申请和释放,对非法的对象引用进行检测和保护。 +- 通过内存池来重用 ByteBuffer,节省内存。 +- 可设置的内存容量上限,包括 ByteBuffer、线程池线程数等。 + +### 可定制性 +Netty 的可定制性主要体现在以下几点。 +- 责任链模式:ChannelPipeline 基于责任链模式开发,便于业务逻辑的拦截、定制和扩展。 +- 基于接口的开发:关键的类库都提供了接口或者抽象类,如果 Netty 自身的实现无法满足用户的需求,可以由用户自定义实现相关接口。 +- 提供了大量工厂类,通过重载这些工厂类可以按需创建出用户实现的对象。 +- 提供了大量的系统参数供用户按需设置,增强系统的场景定制性。 + +### 可扩展性 +基于 Netty 的 基本NIO框架,可以方便地进行应用层协议定制,例如,HTTP协议栈、Thrift协议栈、FTP协议栈 等。这些扩展不需要修改 Netty 的源码,直接基于 Netty 的二进制类库即可实现协议的扩展和定制。目前,业界存在大量的基于 Netty框架 开发的协议,例如基于 Netty 的 HTTP协议、Dubbo协议、RocketMQ内部私有协议 等。 \ No newline at end of file diff --git a/docs/Netty/AdvancedFeaturesOfNetty/Netty高可靠性设计.md b/docs/Netty/AdvancedFeaturesOfNetty/Netty高可靠性设计.md new file mode 100644 index 0000000..6ded9ff --- /dev/null +++ b/docs/Netty/AdvancedFeaturesOfNetty/Netty高可靠性设计.md @@ -0,0 +1 @@ +努力编写中...... \ No newline at end of file diff --git a/docs/Netty/AdvancedFeaturesOfNetty/Netty高性能之道.md b/docs/Netty/AdvancedFeaturesOfNetty/Netty高性能之道.md new file mode 100644 index 0000000..6ded9ff --- /dev/null +++ b/docs/Netty/AdvancedFeaturesOfNetty/Netty高性能之道.md @@ -0,0 +1 @@ +努力编写中...... \ No newline at end of file diff --git a/docs/Netty/Netty多协议开发/基于HTTP协议的Netty开发.md b/docs/Netty/Netty多协议开发/基于HTTP协议的Netty开发.md new file mode 100644 index 0000000..6ded9ff --- /dev/null +++ b/docs/Netty/Netty多协议开发/基于HTTP协议的Netty开发.md @@ -0,0 +1 @@ +努力编写中...... \ No newline at end of file diff --git a/docs/Netty/Netty多协议开发/基于WebSocket协议的Netty开发.md b/docs/Netty/Netty多协议开发/基于WebSocket协议的Netty开发.md new file mode 100644 index 0000000..6ded9ff --- /dev/null +++ b/docs/Netty/Netty多协议开发/基于WebSocket协议的Netty开发.md @@ -0,0 +1 @@ +努力编写中...... \ No newline at end of file diff --git a/docs/Netty/Netty多协议开发/基于自定义协议的Netty开发.md b/docs/Netty/Netty多协议开发/基于自定义协议的Netty开发.md new file mode 100644 index 0000000..6ded9ff --- /dev/null +++ b/docs/Netty/Netty多协议开发/基于自定义协议的Netty开发.md @@ -0,0 +1 @@ +努力编写中...... \ No newline at end of file diff --git a/docs/Netty/Netty编解码/Java序列化缺点与主流编解码框架.md b/docs/Netty/Netty编解码/Java序列化缺点与主流编解码框架.md new file mode 100644 index 0000000..5f28270 --- /dev/null +++ b/docs/Netty/Netty编解码/Java序列化缺点与主流编解码框架.md @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/docs/Netty/基于Netty开发服务端及客户端/基于Netty的客户端开发.md b/docs/Netty/基于Netty开发服务端及客户端/基于Netty的客户端开发.md new file mode 100644 index 0000000..6ded9ff --- /dev/null +++ b/docs/Netty/基于Netty开发服务端及客户端/基于Netty的客户端开发.md @@ -0,0 +1 @@ +努力编写中...... \ No newline at end of file diff --git a/docs/Netty/基于Netty开发服务端及客户端/基于Netty的服务端开发.md b/docs/Netty/基于Netty开发服务端及客户端/基于Netty的服务端开发.md new file mode 100644 index 0000000..a2b82d2 --- /dev/null +++ b/docs/Netty/基于Netty开发服务端及客户端/基于Netty的服务端开发.md @@ -0,0 +1,606 @@ +## Netty 服务端创建源码分析 +当我们直接使用 JDK 的 NIO类库 开发基于 NIO 的异步服务端时,需要用到 多路复用器Selector、ServerSocketChannel、SocketChannel、ByteBuffer、SelectionKey 等,相比于传统的 BIO开发,NIO 的开发要复杂很多,开发出稳定、高性能的异步通信框架,一直是个难题。Netty 为了向使用者屏蔽 NIO通信 的底层细节,在和用户交互的边界做了封装,目的就是为了减少用户开发工作量,降低开发难度。ServerBootstrap 是 Socket服务端 的启动辅助类,用户通过 ServerBootstrap 可以方便地创建 Netty 的服务端。 + +### Netty 服务端创建时序图 + +![avatar](/images/Netty/Netty服务端创建时序图.png) + +下面我们对 Netty服务端创建 的关键步骤和原理进行详细解析。 + +1、**创建 ServerBootstrap实例**。ServerBootstrap 是 Netty服务端 的 启动辅助类,它提供了一系列的方法用于设置服务端启动相关的参数。底层对各种 原生NIO 的 API 进行了封装,减少了用户与 底层API 的接触,降低了开发难度。ServerBootstrap 中只有一个 public 的无参的构造函数可以给用户直接使用,ServerBootstrap 只开放一个无参的构造函数 的根本原因是 它的参数太多了,而且未来也可能会发生变化,为了解决这个问题,就需要引入 Builder建造者模式。 + +2、**设置并绑定 Reactor线程池**。Netty 的 Reactor线程池 是 EventLoopGroup,它实际上是一个 EventLoop数组。EventLoop 的职责是处理所有注册到本线程多路复用器 Selector 上的 Channel,Selector 的轮询操作由绑定的 EventLoop线程 的 run()方法 驱动,在一个循环体内循环执行。值得说明的是,EventLoop 的职责不仅仅是处理 网络IO事件,用户自定义的Task 和 定时任务Task 也统一由 EventLoop 负责处理,这样线程模型就实现了统一。从调度层面看,也不存在从 EventLoop线程 中再启动其他类型的线程用于异步执行另外的任务,这样就避免了多线程并发操作和锁竞争,提升了 IO线程 的处理和调度性能。 + +3、**设置并绑定 服务端Channel**。作为 NIO服务端,需要创建 ServerSocketChannel,Netty 对 原生NIO类库 进行了封装,对应的实现是NioServerSocketChannel。对于用户而言,不需要关心 服务端Channel 的底层实现细节和工作原理,只需要指定具体使用哪种服务端 Channel 即可。因此,Netty 中 ServerBootstrap的基类 提供了 channel()方法,用于指定 服务端Channel 的类型。Netty 通过工厂类,利用反射创建 NioServerSocketChannel对象。由于服务端监听端口往往只需要在系统启动时才会调用,因此反射对性能的影响并不大。相关代 +码如下。 +```java +public abstract class AbstractBootstrap, C extends Channel> implements Cloneable { + + /** + * 通过 参数channelClass 创建一个 Channel实例, + */ + public B channel(Class channelClass) { + if (channelClass == null) { + throw new NullPointerException("channelClass"); + } + return channelFactory(new ReflectiveChannelFactory(channelClass)); + } +} +``` + +4、**链路建立的时候创建并初始化 ChannelPipeline**。ChannelPipeline 并不是 NIO服务端 必需的,它本质就是一个负责处理网络事件的职责链,负责管理和执行 ChannelHandler。网络事件以事件流的形式在 ChannelPipeline 中流转,由 ChannelPipeline 根据 ChannelHandler的执行策略 调度 ChannelHandler的执行。典型的网络事件如下。 +1. 链路注册; +2. 链路激活; +3. 链路断开; +4. 接收到请求消息; +5. 请求消息接收并处理完毕; +6. 发送应答消息; +7. 链路发生异常; +8. 发生用户自定义事件。 + +5、**初始化 ChannelPipeline 完成之后,添加并设置 ChannelHandler**。ChannelHandler 是 Netty 提供给用户定制和扩展的关键接口。利用 ChannelHandler 用户可以完成大多数的功能定制,例如消息编解码、心跳、安全认证、TSL/SSL 认证、流量控制和流量整形等。Netty 同时也提供了大量的 系统ChannelHandler 供用户使用,比较实用的 系统ChannelHandler 总结如下。 +1. 系统编解码框架,ByteToMessageCodec; +2. 基于长度的半包解码器,LengthFieldBasedFrameDecoder; +3. 码流日志打印 Handler,LoggingHandler; +4. SSL 安全认证 Handler,SslHandler; +5. 链路空闲检测 Handler,IdleStateHandler; +6. 流量整形 Handler,ChannelTrafficShapingHandler; +7. Base64 编解码,Base64Decoder 和 Base64Encoder。 +创建和添加 ChannelHandler 的代码示例如下。 +```java + .childHandler( new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast( new EchoServerHandler() ); + } + }); +``` + +6、**绑定并启动监听端口**。在绑定监听端口之前系统会做一系列的初始化和检测工作,完成之后,会启动监听端口,并将 ServerSocketChannel 注册到 Selector 上监听客户端连接。 + +7、**Selector 轮询**。由 Reactor线程 NioEventLoop 负责调度和执行 Selector 轮询操作,选择准备就绪的 Channel集合,相关代码如下。 +```java +public final class NioEventLoop extends SingleThreadEventLoop { + + private void select(boolean oldWakenUp) throws IOException { + Selector selector = this.selector; + + ...... + + int selectedKeys = selector.select(timeoutMillis); + selectCnt ++; + + ...... + + } +} +``` + +8、**当轮询到 准备就绪的Channel 之后,就由 Reactor线程 NioEventLoop 执行 ChannelPipeline 的相应方法,最终调度并执行 ChannelHandler**,接口如下图所示。 + +![avatar](/images/Netty/ChannelPipeline的调度相关方法.png) + +9、**执行 Netty 中 系统的ChannelHandler 和 用户添加定制的ChannelHandler** 。ChannelPipeline 根据网络事件的类型,调度并执行 ChannelHandler,相关代码如下。 +```java +public class DefaultChannelPipeline implements ChannelPipeline { + + @Override + public final ChannelPipeline fireChannelRead(Object msg) { + AbstractChannelHandlerContext.invokeChannelRead(head, msg); + return this; + } +} +``` + +### 结合 Netty源码 对服务端的创建过程进行解析 +首先通过构造函数创建 ServerBootstrap实例,随后,通常会创建两个 EventLoopGroup实例 (也可以只创建一个并共享),代码如下。 +```java + EventLoopGroup acceptorGroup = new NioEventLoopGroup(); + EventLoopGroup iOGroup = new NioEventLoopGroup(); +``` +NioEventLoopGroup 实际就是一个 Reactor线程池,负责调度和执行客户端的接入、网络读写事件的处理、用户自定义任务和定时任务的执行。通过 ServerBootstrap 的 group()方法 将两个 EventLoopGroup实例 传入,代码如下。 +```java +public class ServerBootstrap extends AbstractBootstrap { + + /** + * Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These + * {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and + * {@link Channel}'s. + */ + public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { + super.group(parentGroup); + if (childGroup == null) { + throw new NullPointerException("childGroup"); + } + if (this.childGroup != null) { + throw new IllegalStateException("childGroup set already"); + } + this.childGroup = childGroup; + return this; + } +} +``` +其中 parentGroup对象 被设置进了 ServerBootstrap 的父类 AbstractBootstrap 中,代码如下。 +```java +public abstract class AbstractBootstrap, C extends Channel> implements Cloneable { + + volatile EventLoopGroup group; + + /** + * The {@link EventLoopGroup} which is used to handle all the events for the to-be-created + * {@link Channel} + */ + public B group(EventLoopGroup group) { + if (group == null) { + throw new NullPointerException("group"); + } + if (this.group != null) { + throw new IllegalStateException("group set already"); + } + this.group = group; + return self(); + } +} +``` +该方法会被客户端和服务端重用,用于设置 工作IO线程,执行和调度网络事件的读写。线程组和线程类型设置完成后,需要设置 服务端Channel 用于端口监听和客户端链路接入。Netty 通过 Channel工厂类 来创建不同类型的 Channel,对于服务端,需要创建 NioServerSocketChannel。所以,通过指定 Channel类型 的方式创建 Channel工厂。ReflectiveChannelFactory 可以根据 Channel的类型 通过反射创建 Channel的实例,服务端需要创建的是 NioServerSocketChannel实例,代码如下。 +```java +public class ReflectiveChannelFactory implements ChannelFactory { + + private final Constructor constructor; + + public ReflectiveChannelFactory(Class clazz) { + ObjectUtil.checkNotNull(clazz, "clazz"); + try { + this.constructor = clazz.getConstructor(); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) + + " does not have a public non-arg constructor", e); + } + } + + @Override + public T newChannel() { + try { + return constructor.newInstance(); + } catch (Throwable t) { + throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t); + } + } +} +``` +指定 NioServerSocketChannel 后,需要设置 TCP 的一些参数,作为服务端,主要是设置 TCP 的 backlog参数。 + +backlog 指定了内核为此套接口排队的最大连接个数,对于给定的监听套接口,内核要维护两个队列:未链接队列 和 已连接队列,根据 TCP三次握手 的 三个子过程来分隔这两个队列。服务器处于 listen状态 时,收到客户端 syn过程(connect) 时在未完成队列中创建一个新的条目,然后用三次握手的第二个过程,即服务器的 syn响应客户端,此条目在第三个过程到达前 (客户端对服务器 syn 的 ack) 一直保留在未完成连接队列中,如果三次握手完成,该条目将从未完成连接队列搬到已完成连接队列尾部。当进程调用 accept 时,从已完成队列中的头部取出一个条目给进程,当已完成队列为空时进程将睡眠,直到有条目在已完成连接队列中才唤醒。backlog 被规定为两个队列总和的最大值,大多数实现默认值为 5,但在高并发 Web服务器 中此值显然不够。 需要设置此值更大一些的原因是,未完成连接队列的长度可能因为客户端 syn 的到达及等待三次握手的第三个过程延时 而增大。Netty 默认的 backlog 为 100,当然,用户可以修改默认值,这需要根据实际场景和网络状况进行灵活设置。 + +TCP参数 设置完成后,用户可以为启动辅助类和其父类分别指定 Handler。两者 Handler 的用途不同:子类中的 Handler 是 NioServerSocketChannel 对应的 ChannelPipeline 的 Handler;父类中的 Handler 是客户端新接入的连接 SocketChannel 对应的 ChannelPipeline 的 Handler。两者的区别可以通过下图来展示。 + +![avatar](/images/Netty/ServerBootstrap的Handler模型.png) + +本质区别就是:ServerBootstrap 中的 Handler 是 NioServerSocketChannel 使用的,所有连接该监听端口的客户端都会执行它;父类AbstractBootstrap 中的 Handler 是个工厂类,它为每个新接入的客户端都创建一个新的 Handler。 + +服务端启动的最后一步,就是绑定本地端口,启动服务,下面我们来分析下这部分代码。 +```java +public abstract class AbstractBootstrap, C extends Channel> implements Cloneable { + + private ChannelFuture doBind(final SocketAddress localAddress) { + final ChannelFuture regFuture = initAndRegister(); + final Channel channel = regFuture.channel(); + if (regFuture.cause() != null) { + return regFuture; + } + + if (regFuture.isDone()) { + // At this point we know that the registration was complete and successful. + ChannelPromise promise = channel.newPromise(); + doBind0(regFuture, channel, localAddress, promise); + return promise; + } else { + // Registration future is almost always fulfilled already, but just in case it's not. + final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); + regFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + Throwable cause = future.cause(); + if (cause != null) { + // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an + // IllegalStateException once we try to access the EventLoop of the Channel. + promise.setFailure(cause); + } else { + // Registration was successful, so set the correct executor to use. + // See https://github.com/netty/netty/issues/2586 + promise.registered(); + + doBind0(regFuture, channel, localAddress, promise); + } + } + }); + return promise; + } + } +} +``` +先看下上述代码调用的 initAndRegister()方法。它首先实例化了一个 NioServerSocketChannel类型 的 Channel对象。相关代码如下。 +```java + final ChannelFuture initAndRegister() { + Channel channel = null; + try { + channel = channelFactory.newChannel(); + init(channel); + } catch (Throwable t) { + if (channel != null) { + // channel can be null if newChannel crashed (eg SocketException("too many open files")) + channel.unsafe().closeForcibly(); + // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor + return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); + } + // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor + return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); + } + + ChannelFuture regFuture = config().group().register(channel); + if (regFuture.cause() != null) { + if (channel.isRegistered()) { + channel.close(); + } else { + channel.unsafe().closeForcibly(); + } + } + return regFuture; + } +``` +NioServerSocketChannel 创建成功后,对它进行初始化,初始化工作主要有以下三点。 +```java + @Override + void init(Channel channel) throws Exception { + final Map, Object> options = options0(); + synchronized (options) { + setChannelOptions(channel, options, logger); + } + + // 1、设置 Socket参数 和 NioServerSocketChannel 的附加属性 + final Map, Object> attrs = attrs0(); + synchronized (attrs) { + for (Entry, Object> e: attrs.entrySet()) { + @SuppressWarnings("unchecked") + AttributeKey key = (AttributeKey) e.getKey(); + channel.attr(key).set(e.getValue()); + } + } + + // 2、将 AbstractBootstrap 的 Handler 添加到 NioServerSocketChannel + // 的 ChannelPipeline 中 + ChannelPipeline p = channel.pipeline(); + + final EventLoopGroup currentChildGroup = childGroup; + final ChannelHandler currentChildHandler = childHandler; + final Entry, Object>[] currentChildOptions; + final Entry, Object>[] currentChildAttrs; + synchronized (childOptions) { + currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0)); + } + synchronized (childAttrs) { + currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0)); + } + + // 3、将用于服务端注册的 Handler ServerBootstrapAcceptor 添加到 ChannelPipeline 中 + p.addLast(new ChannelInitializer() { + @Override + public void initChannel(final Channel ch) throws Exception { + final ChannelPipeline pipeline = ch.pipeline(); + ChannelHandler handler = config.handler(); + if (handler != null) { + pipeline.addLast(handler); + } + + ch.eventLoop().execute(new Runnable() { + @Override + public void run() { + pipeline.addLast(new ServerBootstrapAcceptor( + ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); + } + }); + } + }); + } +``` +到此,Netty 服务端监听的相关资源已经初始化完毕,就剩下最后一步,注册 NioServerSocketChannel 到 Reactor线程 的多路复用器上,然后轮询客户端连接事件。在分析注册代码之前,我们先通过下图,看看目前 NioServerSocketChannel 的 ChannelPipeline 的组成。 +![avatar](/images/Netty/NioServerSocketChannel的ChannelPipeline.png) +最后,我们看下 NioServerSocketChannel 的注册。当 NioServerSocketChannel 初始化完成之后,需要将它注册到 Reactor线程 的多路复用器上监听新客户端的接入,代码如下。 +```java +public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { + + protected abstract class AbstractUnsafe implements Unsafe { + + /** + * 将完成初始化的 NioServerSocketChannel 注册到 Reactor线程 + * 的多路复用器上,监听新客户端的接入 + */ + @Override + public final void register(EventLoop eventLoop, final ChannelPromise promise) { + + ...... + + // 首先判断是否是 NioEventLoop 自身发起的操作。如果是,则不存在并发操作,直接 + // 执行 Channel注册;如果由其他线程发起,则封装成一个 Task 放入消息队列中异步执行。 + // 此处,由于是由 ServerBootstrap 所在线程执行的注册操作,所以会将其封装成 Task 投递 + // 到 NioEventLoop 中执行 + if (eventLoop.inEventLoop()) { + register0(promise); + } else { + try { + eventLoop.execute(new Runnable() { + @Override + public void run() { + register0(promise); + } + }); + } catch (Throwable t) { + + ...... + + } + } + } + + private void register0(ChannelPromise promise) { + try { + // check if the channel is still open as it could be closed in the mean time when the register + // call was outside of the eventLoop + if (!promise.setUncancellable() || !ensureOpen(promise)) { + return; + } + boolean firstRegistration = neverRegistered; + // 该方法在本类中是一个空实现,下面看一下它在子类 AbstractNioChannel 中的实现 + doRegister(); + neverRegistered = false; + registered = true; + + pipeline.invokeHandlerAddedIfNeeded(); + + safeSetSuccess(promise); + pipeline.fireChannelRegistered(); + if (isActive()) { + if (firstRegistration) { + pipeline.fireChannelActive(); + } else if (config().isAutoRead()) { + beginRead(); + } + } + } catch (Throwable t) { + closeForcibly(); + closeFuture.setClosed(); + safeSetFailure(promise, t); + } + } + } +} + + +public abstract class AbstractNioChannel extends AbstractChannel { + + @Override + protected void doRegister() throws Exception { + boolean selected = false; + for (;;) { + try { + // 将 NioServerSocketChannel 注册到 NioEventLoop 的 多路复用器Selector 上 + selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); + return; + } catch (CancelledKeyException e) { + + ...... + + } + } + } +} +``` +到此,服务端监听启动部分源码已经分析完成。 + +## 结合 Netty源码 对客户端接入过程进行解析 +负责处理网络读写、连接和客户端请求接入的 Reactor线程 就是 NioEventLoop,下面我们看下 NioEventLoop 是如何处理新的客户端连接接入的。当 多路复用器 检测到新的准备就绪的 Channel 时,默认执行 processSelectedKeysOptimized()方法,代码如下。 +```java +public final class NioEventLoop extends SingleThreadEventLoop { + + private void processSelectedKeys() { + if (selectedKeys != null) { + processSelectedKeysOptimized(); + } else { + processSelectedKeysPlain(selector.selectedKeys()); + } + } + + private void processSelectedKeysOptimized() { + for (int i = 0; i < selectedKeys.size; ++i) { + final SelectionKey k = selectedKeys.keys[i]; + selectedKeys.keys[i] = null; + + final Object a = k.attachment(); + + if (a instanceof AbstractNioChannel) { + // 根据就绪的操作位 SelectionKey,执行不同的操作 + processSelectedKey(k, (AbstractNioChannel) a); + } else { + @SuppressWarnings("unchecked") + NioTask task = (NioTask) a; + processSelectedKey(k, task); + } + + if (needsToSelectAgain) { + selectedKeys.reset(i + 1); + selectAgain(); + i = -1; + } + } + } + + // 根据就绪的操作位 SelectionKey,执行不同的操作 + private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { + // 由于不同的 Channel 执行不同的操作,所以 NioUnsafe 被设计成接口 + // 由不同的 Channel 内部的 NioUnsafe实现类 负责具体实现 + final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); + if (!k.isValid()) { + final EventLoop eventLoop; + try { + eventLoop = ch.eventLoop(); + } catch (Throwable ignored) { + return; + } + if (eventLoop != this || eventLoop == null) { + return; + } + unsafe.close(unsafe.voidPromise()); + return; + } + + try { + int readyOps = k.readyOps(); + if ((readyOps & SelectionKey.OP_CONNECT) != 0) { + int ops = k.interestOps(); + ops &= ~SelectionKey.OP_CONNECT; + k.interestOps(ops); + unsafe.finishConnect(); + } + + if ((readyOps & SelectionKey.OP_WRITE) != 0) { + ch.unsafe().forceFlush(); + } + + // read()方法 的实现有两个,分别是 NioByteUnsafe 和 NioMessageUnsafe, + // 对于 NioServerSocketChannel,它使用的是 NioMessageUnsafe + // 下面看一下 NioMessageUnsafe 对 read() 方法的实现 + if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { + unsafe.read(); + } + } catch (CancelledKeyException ignored) { + unsafe.close(unsafe.voidPromise()); + } + } +} + + +public abstract class AbstractNioMessageChannel extends AbstractNioChannel { + + private final class NioMessageUnsafe extends AbstractNioUnsafe { + + private final List readBuf = new ArrayList(); + + @Override + public void read() { + assert eventLoop().inEventLoop(); + final ChannelConfig config = config(); + final ChannelPipeline pipeline = pipeline(); + final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); + allocHandle.reset(config); + + boolean closed = false; + Throwable exception = null; + try { + try { + do { + // 接收新的客户端连接并创建 NioSocketChannel + int localRead = doReadMessages(readBuf); + if (localRead == 0) { + break; + } + if (localRead < 0) { + closed = true; + break; + } + allocHandle.incMessagesRead(localRead); + } while (allocHandle.continueReading()); + } catch (Throwable t) { + exception = t; + } + + int size = readBuf.size(); + for (int i = 0; i < size; i ++) { + readPending = false; + // 接收到新的客户端连接后,触发 ChannelPipeline 的 channelRead方法。 + // 事件在 ChannelPipeline 中传递,执行 ServerBootstrapAcceptor 的 + // channelRead方法 + pipeline.fireChannelRead(readBuf.get(i)); + } + + ...... + + } + } + } +} + + +public class NioServerSocketChannel extends AbstractNioMessageChannel + implements io.netty.channel.socket.ServerSocketChannel { + + /** + * 接收新的客户端连接并创建 NioSocketChannel + */ + @Override + protected int doReadMessages(List buf) throws Exception { + SocketChannel ch = SocketUtils.accept(javaChannel()); + + try { + if (ch != null) { + buf.add(new NioSocketChannel(this, ch)); + return 1; + } + } catch (Throwable t) { + + ...... + + } + return 0; + } +} + + +public class ServerBootstrap extends AbstractBootstrap { + + private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter { + + /** + * 该方法主要分为如下三个步骤。 + */ + @Override + @SuppressWarnings("unchecked") + public void channelRead(ChannelHandlerContext ctx, Object msg) { + final Channel child = (Channel) msg; + // 第一步:将启动时传入的 childHandler 加入到客户端 SocketChannel 的 ChannelPipeline 中 + child.pipeline().addLast(childHandler); + // 第二步:设置客户端 SocketChannel 的 TCP参数 + setChannelOptions(child, childOptions, logger); + for (Entry, Object> e: childAttrs) { + child.attr((AttributeKey) e.getKey()).set(e.getValue()); + } + // 第三步:注册 SocketChannel 到多路复用器 + try { + childGroup.register(child).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (!future.isSuccess()) { + forceClose(child, future.cause()); + } + } + }); + } catch (Throwable t) { + forceClose(child, t); + } + } + } +} +``` +下面我们展开看下 NioSocketChannel 的 register()方法。NioSocketChannel 的注册方法与 ServerSocketChannel 的一致, 也是将 Channel 注册到 Reactor线程 的多路复用器上。由于注册的操作位是 0,所以,此时 NioSocketChannel 还不能读取客户端发送的消息,下面我们看看 是什么时候修改监听操作位为 OP_READ 的。 + +执行完注册操作之后,紧接着会触发 ChannelReadComplete 事件。我们继续分析 ChannelReadComplete 在 ChannelPipeline 中的处理流程:Netty 的 Header 和 Tail 本身不关注 ChannelReadComplete事件 就直接透传,执行完 ChannelReadComplete 后,接着执行 PipeLine 的 read()方法,最终执行 HeadHandler 的 read()方法。 + +HeadHandler 的 read()方法用来将网络操作位修改为读操作。创建 NioSocketChannel 的时候已经将 AbstractNioChannel 的 readInterestOp 设置为 OP_ READ,这样,执行 selectionKey. interestOps(interestOps | readInterestOp)操作 时就会把操作位设置为 OP_READ。代码如下。 +```java +public abstract class AbstractNioByteChannel extends AbstractNioChannel { + + protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { + super(parent, ch, SelectionKey.OP_READ); + } +} +``` +到此,新接入的客户端连接处理完成,可以进行网络读写等 IO操作。 \ No newline at end of file diff --git a/images/Netty/ChannelPipeline的调度相关方法.png b/images/Netty/ChannelPipeline的调度相关方法.png new file mode 100644 index 0000000..dd3adf7 Binary files /dev/null and b/images/Netty/ChannelPipeline的调度相关方法.png differ diff --git a/images/Netty/Netty服务端创建时序图.png b/images/Netty/Netty服务端创建时序图.png new file mode 100644 index 0000000..e122fdc Binary files /dev/null and b/images/Netty/Netty服务端创建时序图.png differ diff --git a/images/Netty/Netty逻辑架构图.png b/images/Netty/Netty逻辑架构图.png new file mode 100644 index 0000000..d151b1f Binary files /dev/null and b/images/Netty/Netty逻辑架构图.png differ diff --git a/images/Netty/NioServerSocketChannel的ChannelPipeline.png b/images/Netty/NioServerSocketChannel的ChannelPipeline.png new file mode 100644 index 0000000..f3ef15f Binary files /dev/null and b/images/Netty/NioServerSocketChannel的ChannelPipeline.png differ diff --git a/images/Netty/ServerBootstrap的Handler模型.png b/images/Netty/ServerBootstrap的Handler模型.png new file mode 100644 index 0000000..321bebe Binary files /dev/null and b/images/Netty/ServerBootstrap的Handler模型.png differ