diff --git a/README.md b/README.md index 7ef1425..b0a62bf 100644 --- a/README.md +++ b/README.md @@ -132,43 +132,40 @@ ## Netty ### 网络 IO 技术基础 - * [把被说烂的 BIO、NIO、AIO 再从头到尾扯一遍](docs/Netty/IOTechnologyBase/把被说烂的BIO、NIO、AIO再从头到尾扯一遍.md) * [IO模型](docs/Netty/IOTechnologyBase/IO模型.md) * [四种IO编程及对比](docs/Netty/IOTechnologyBase/四种IO编程及对比.md) -### Netty 粘拆包及解决方案 +### JDK1.8 NIO包 核心组件源码剖析 +* [Selector、SelectionKey及Channel组件](docs/Netty/IOTechnologyBase/Selector、SelectionKey及Channel组件.md) +### Netty 粘拆包及解决方案 * [TCP粘拆包问题及Netty中的解决方案](docs/Netty/TCP粘拆包/TCP粘拆包问题及Netty中的解决方案.md) -### Netty 编解码 - -* [Java序列化缺点与主流编解码框架](docs/Netty/Netty编解码/Java序列化缺点与主流编解码框架.md) - ### Netty 多协议开发 - * [基于HTTP协议的Netty开发](docs/Netty/Netty多协议开发/基于HTTP协议的Netty开发.md) * [基于WebSocket协议的Netty开发](docs/Netty/Netty多协议开发/基于WebSocket协议的Netty开发.md) * [基于自定义协议的Netty开发](docs/Netty/Netty多协议开发/基于自定义协议的Netty开发.md) ### 基于 Netty 开发服务端及客户端 - * [基于Netty的服务端开发](docs/Netty/基于Netty开发服务端及客户端/基于Netty的服务端开发.md) * [基于Netty的客户端开发](docs/Netty/基于Netty开发服务端及客户端/基于Netty的客户端开发.md) ### Netty 主要组件的源码分析 - * [ByteBuf组件](docs/Netty/Netty主要组件源码分析/ByteBuf组件.md) * [Channel组件 和 Unsafe组件](docs/Netty/Netty主要组件源码分析/Channel和Unsafe组件.md) -* [ChannelPipeline 和 ChannelHandler组件](docs/Netty/Netty主要组件源码分析/ChannelPipeline和ChannelHandler组件.md) * [EventLoop 和 EventLoopGroup组件](docs/Netty/Netty主要组件源码分析/EventLoop和EventLoopGroup组件.md) +* [ChannelPipeline 和 ChannelHandler组件](docs/Netty/Netty主要组件源码分析/ChannelPipeline和ChannelHandler组件.md) * [Future 和 Promise组件](docs/Netty/Netty主要组件源码分析/Future和Promise组件.md) ### Netty 高级特性 - * [Netty 架构设计](docs/Netty/AdvancedFeaturesOfNetty/Netty架构设计.md) * [Netty 高性能之道](docs/Netty/AdvancedFeaturesOfNetty/Netty高性能之道.md) +### Netty 技术细节源码分析 +* [FastThreadLocal源码分析](docs/Netty/Netty技术细节源码分析/FastThreadLocal源码分析.md) +* [Recycler对象池原理分析](docs/Netty/Netty技术细节源码分析/Recycler对象池原理分析.md) + ## Dubbo ### 架构设计 diff --git a/docs/Netty/IOTechnologyBase/Selector、SelectionKey及Channel组件.md b/docs/Netty/IOTechnologyBase/Selector、SelectionKey及Channel组件.md new file mode 100644 index 0000000..c99a8f5 --- /dev/null +++ b/docs/Netty/IOTechnologyBase/Selector、SelectionKey及Channel组件.md @@ -0,0 +1,298 @@ +Selector、SelectionKey和Channel 这三个组件构成了Java nio包的核心,也是Reactor模型在代码层面的体现。Selector能让单线程同时处理多个客户端Channel,非常适用于高并发,传输数据量较小的场景。要使用Selector,首先要将对应的Channel及IO事件(读、写、连接)注册到Selector,注册后会产生一个SelectionKey对象,用于关联Selector和Channel,及后续的IO事件处理。这三者的关系如下图所示。 + +![在这里插入图片描述](../../../images/Netty/Selector和SelectionKey和Channel关系图.png) + +对nio编程不熟的同学可以搜索一些简单的demo跑一下,下面 我们直接进入源码,窥探一些nio的奥秘。 +### Selector +其实,不管是 Selector 还是 SelectionKey 的源码,其具体实现类都是依赖于底层操作系统的,这里我们只看一下抽象类 Selector 的源码,日后有事件,再找一些具体的实现类深入分析一下。 +```java +public abstract class Selector implements Closeable { + + protected Selector() { } + + /** + * 获取一个 Selector对象,具体实现依赖于底层操作系统 + */ + public static Selector open() throws IOException { + return SelectorProvider.provider().openSelector(); + } + + /** + * 判断该 Selector 是否已开启 + */ + public abstract boolean isOpen(); + + /** + * 当前所有向Selector注册的Channel 所对应的SelectionKey的集合 + */ + public abstract Set keys(); + + /** + * 相关事件已经被 Selector 捕获的 SelectionKey的集合 + */ + public abstract Set selectedKeys(); + + /** + * 阻塞到至少有一个通道在你注册的事件上就绪了 + */ + public abstract int select() throws IOException; + + /** + * 和select()一样,除了最长会阻塞timeout毫秒 + */ + public abstract int select(long timeout) throws IOException; + + /** + * 此方法执行非阻塞的选择操作,如果自从上一次选择操作后, + * 没有通道变成可选择的,则此方法直接返回 0 + */ + public abstract int selectNow() throws IOException; + + /** + * 用完Selector后调用其close()方法会关闭该Selector,且使注册到该Selector上的所有SelectionKey实例无效 + * 通道本身并不会关闭 + */ + public abstract void close() throws IOException; +} +``` + +### SelectionKey +表示 SelectableChannel 在 Selector 中的注册的标记 / 句柄。 +```java +public abstract class SelectionKey { + + protected SelectionKey() { } + + + // -- Channel and selector operations -- + + /** + * 获取该 SelectionKey 对应的Channel,Channel注册到Selector时会产生该 SelectionKey对象 + */ + public abstract SelectableChannel channel(); + + /** + * 获取该 SelectionKey 对应的 Selector + */ + public abstract Selector selector(); + + /** + * 该 SelectionKey 是否是有效的 + */ + public abstract boolean isValid(); + + // ------ Operation-set accessors ------ + + /** + * 获取该 SelectionKey 的兴趣事件 (既 SelectionKey 的4个 事件静态常量) + */ + public abstract int interestOps(); + + /** + * 设置该 SelectionKey 的兴趣事件 + */ + public abstract SelectionKey interestOps(int ops); + + /** + * 获取该 SelectionKey 的已操作集 + */ + public abstract int readyOps(); + + + // ------ Operation bits and bit-testing convenience methods ------ + + /** + * channel中的数据是否已经可以读取 + */ + public static final int OP_READ = 1 << 0; + + /** + * channel是否可以开始写入数据 + */ + public static final int OP_WRITE = 1 << 2; + + /** + * channel是否已经建立连接 + */ + public static final int OP_CONNECT = 1 << 3; + + /** + * ServerSocketChannel 是否可以与客户端建立连接 + */ + public static final int OP_ACCEPT = 1 << 4; + + /** + * channel是否可读 + */ + public final boolean isReadable() { + return (readyOps() & OP_READ) != 0; + } + + /** + * channel是否可写 + */ + public final boolean isWritable() { + return (readyOps() & OP_WRITE) != 0; + } + + /** + * channel是否建立连接 + */ + public final boolean isConnectable() { + return (readyOps() & OP_CONNECT) != 0; + } + + /** + * ServerSocketChannel是否可与客户端channel建立连接 + */ + public final boolean isAcceptable() { + return (readyOps() & OP_ACCEPT) != 0; + } +} +``` +### Channel组件 +平时编码用的比较多的就是 SocketChannel 和 ServerSocketChannel,而将 Channel 与 Selecor 关联到一起的核心API则定义在它们的公共父类SelectableChannel中,整个Channel组件的核心类图如下所示。 + +![在这里插入图片描述](../../../images/Netty/Channel组件.png) + +#### SelectableChannel +```java +public abstract class SelectableChannel extends AbstractInterruptibleChannel implements Channel { + + protected SelectableChannel() { } + + /** + * 当前channel是否注册到了某个selector上,新创建的channel都是未注册状态 + */ + public abstract boolean isRegistered(); + + /** + * 根据给定的 Selector,获取本channel注册上去的 SelectionKey + */ + public abstract SelectionKey keyFor(Selector sel); + + /** + * 将当前channel及关注的事件,注册到Selector上,返回一个 SelectionKey + */ + public final SelectionKey register(Selector sel, int ops) throws ClosedChannelException { + return register(sel, ops, null); + } + + public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException; + + /** + * 设置该channel的阻塞模式,默认为 true阻塞 + */ + public abstract SelectableChannel configureBlocking(boolean block) throws IOException; + + /** + * 是否为阻塞IO模式 + */ + public abstract boolean isBlocking(); +} +``` + +#### ServerSocketChannel +相当于 BIO 中的 ServerSocket,主要用于服务端与客户端建立连接通信的channel。 +```java +public abstract class ServerSocketChannel extends AbstractSelectableChannel implements NetworkChannel { + + protected ServerSocketChannel(SelectorProvider provider) { + super(provider); + } + + /** + * 获取一个 ServerSocketChannel实例,具体实现依赖底层操作系统 + */ + public static ServerSocketChannel open() throws IOException { + return SelectorProvider.provider().openServerSocketChannel(); + } + + // -- ServerSocket-specific operations -- + + /** + * 绑定ip地址及要监听的端口 + */ + public final ServerSocketChannel bind(SocketAddress local) throws IOException { + return bind(local, 0); + } + + public abstract ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException; + + /** + * 与一个客户端channel建立连接,返回该客户端的存根 SocketChannel + */ + public abstract SocketChannel accept() throws IOException; +} +``` +#### SocketChannel +相当于 BIO 中的 Socket,主要用于通信双方的读写操作。 +```java +public abstract class SocketChannel extends AbstractSelectableChannel + implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel { + + protected SocketChannel(SelectorProvider provider) { + super(provider); + } + + /** + * 根据 SocketAddress 获取一个 SocketChannel,具体实现依赖底层操作系统 + */ + public static SocketChannel open(SocketAddress remote) throws IOException { + SocketChannel sc = open(); + try { + sc.connect(remote); + } catch (Throwable x) { + try { + sc.close(); + } catch (Throwable suppressed) { + x.addSuppressed(suppressed); + } + throw x; + } + assert sc.isConnected(); + return sc; + } + + public static SocketChannel open() throws IOException { + return SelectorProvider.provider().openSocketChannel(); + } + + // -- Socket-specific operations -- + + /** + * 绑定要连接的远程服务的ip及端口 + */ + @Override + public abstract SocketChannel bind(SocketAddress local) throws IOException; + + /** + * 该channel与服务端是否已连接 + */ + public abstract boolean isConnected(); + + // -- ByteChannel operations -- + + /** + * 将 channel 中的数据读到 ByteBuffer + */ + public abstract int read(ByteBuffer dst) throws IOException; + + public final long read(ByteBuffer[] dsts) throws IOException { + return read(dsts, 0, dsts.length); + } + + public abstract long read(ByteBuffer[] dsts, int offset, int length) throws IOException; + + /** + * 将 ByteBuffer 中的数据写到 channel + */ + public abstract int write(ByteBuffer src) throws IOException; + + public final long write(ByteBuffer[] srcs) throws IOException { + return write(srcs, 0, srcs.length); + } + + public abstract long write(ByteBuffer[] srcs, int offset, int length) throws IOException; +} +``` \ No newline at end of file diff --git a/docs/Netty/Netty主要组件源码分析/ChannelPipeline和ChannelHandler组件.md b/docs/Netty/Netty主要组件源码分析/ChannelPipeline和ChannelHandler组件.md index fcb5dbe..9165f52 100644 --- a/docs/Netty/Netty主要组件源码分析/ChannelPipeline和ChannelHandler组件.md +++ b/docs/Netty/Netty主要组件源码分析/ChannelPipeline和ChannelHandler组件.md @@ -1 +1,243 @@ -努力编写中... \ No newline at end of file +Netty 的 ChannelPipeline 和 ChannelHandler 机制类似于 Servlet 和 Filter过滤器,这类拦截器实际上是职责链模式的一种变形,主要是为了方便事件的拦截和用户业务逻辑的定制。 + +Servlet Filter 能够以声明的方式(web.xml 配置文件)插入到 HTTP请求响应的处理过程中,用于拦截请求和响应,以便能够查看、提取或以某种方式操作正在客户端和服务器之间交换的数据。拦截器封装了业务定制逻辑,能够实现对 Web应用程序 的预处理和事后处理。 + +Netty 的 Channel过滤器 实现原理与 Servlet Filter机制 一致,它将 Channel的数据管道 抽象为 ChannelPipeline,消息在 ChannelPipeline 中流动和传递。ChannelPipeline 持有 I/O事件拦截器 ChannelHandler链表,由 ChannelHandler链表 对 IO事件 进行拦截和处理,可以通过新增和删除 ChannelHandler 来实现不同的业务逻辑定制,不需要对已有的 ChannelHandler 进行修改,能够实现对修改封闭和对扩展的支持。 + +下面我们对 ChannelPipeline 和 ChannelHandler 的功能进行简单地介绍,然后分析下其源码设计。 + +## ChannelPipeline 的功能和作用 +ChannelPipeline 是 ChannelHandler 的容器,它负责 ChannelHandler 的管理、事件拦截与调度。 + +#### ChannelPipeline 的事件处理 +下图展示了 一个消息被 ChannelPipeline 的 ChannelHandler链 拦截和处理的全过程。 + +```java + * I/O Request + * via {@link Channel} or + * {@link ChannelHandlerContext} + * | + * +---------------------------------------------------+---------------+ + * | ChannelPipeline | | + * | \|/ | + * | +---------------------+ +-----------+----------+ | + * | | Inbound Handler N | | Outbound Handler 1 | | + * | +----------+----------+ +-----------+----------+ | + * | /|\ | | + * | | \|/ | + * | +----------+----------+ +-----------+----------+ | + * | | Inbound Handler N-1 | | Outbound Handler 2 | | + * | +----------+----------+ +-----------+----------+ | + * | /|\ . | + * | . . | + * | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()| + * | [ method call] [method call] | + * | . . | + * | . \|/ | + * | +----------+----------+ +-----------+----------+ | + * | | Inbound Handler 2 | | Outbound Handler M-1 | | + * | +----------+----------+ +-----------+----------+ | + * | /|\ | | + * | | \|/ | + * | +----------+----------+ +-----------+----------+ | + * | | Inbound Handler 1 | | Outbound Handler M | | + * | +----------+----------+ +-----------+----------+ | + * | /|\ | | + * +---------------+-----------------------------------+---------------+ + * | \|/ + * +---------------+-----------------------------------+---------------+ + * | | | | + * | [ Socket.read() ] [ Socket.write() ] | + * | | + * | Netty Internal I/O Threads (Transport Implementation) | + * +-------------------------------------------------------------------+ +``` +从上图可以看出 消息读取和发送处理全流程为: +1. 底层的 SocketChannel.read()方法 读取 ByteBuf,触发 ChannelRead事件,由 IO线程 NioEventLoop 调用 ChannelPipeline 的 fireChannelRead(Object msg)方法,将消息传输到 ChannelPipeline 中。 +2. 消息依次被 HeadHandler、ChannelHandler1、ChannelHandler2 … TailHandler 拦截和处理,在这个过程中,任何 ChannelHandler 都可以中断当前的流程,结束消息的传递。 +3. 调用 ChannelHandlerContext 的 write方法 发送消息,消息从 TailHandler 开始途经 ChannelHandlerN … ChannelHandler1、HeadHandler,最终被添加到消息发送缓冲区中等待刷新和发送,在此过程中也可以中断消息的传递,例如当编码失败时,就需要中断流程,构造异常的Future返回。 + +Netty 中的事件分为 Inbound事件 和 Outbound事件。Inbound事件 通常由 I/O线程 触发,例如 TCP链路建立事件、链路关闭事件、读事件、异常通知事件等,它对应上图的左半部分。触发 Inbound事件 的方法如下。 +1. ChannelHandlerContext.fireChannelRegistered():Channel注册事件; +2. ChannelHandlerContext.fireChannelActive():TCP链路建立成功,Channel激活事件; +3. ChannelHandlerContext.fireChannelRead(Object):读事件; +4. ChannelHandlerContext.fireChannelReadComplete():读操作完成通知事件; +5. ChannelHandlerContext.fireExceptionCaught(Throwable):异常通知事件; +6. ChannelHandlerContext.fireUserEventTriggered(Object):用户自定义事件; +7. ChannelHandlerContext.fireChannelWritabilityChanged():Channel的可写状态变化; +8. ChannelHandlerContext.fireChannellnactive():TCP连接关闭,链路不可用通知事件。 + +Outbound事件 通常是由用户主动发起的 网络IO操作,例如用户发起的连接操作、绑定操作、消息发送等操作,它对应上图的右半部分。触发 Outbound事件 的方法如下: +1. ChannelHandlerContext.bind(SocketAddress, ChannelPromise):绑定本地地址事件; +2. ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise):连接服务端事件; +3. ChannelHandlerContext.write(Object, ChannelPromise):发送事件; +4. ChannelHandlerContext.flush():刷新事件; +5. ChannelHandlerContext.read():读事件; +6. ChannelHandlerContext.disconnect(ChannelPromise):断开连接事件; +7. ChannelHandlerContext.close(ChannelPromise):关闭当前Channel事件。 + +#### ChannelPipeline 自定义拦截器 +ChannelPipeline 通过 ChannelHandler 来实现事件的拦截和处理,由于 ChannelHandler 中的事件种类繁多,不同的 ChannelHandler 可能只需要关心其中的个别事件,所以,自定义的ChannelHandler 只需要继承 ChannelInboundHandlerAdapter / ChannelOutboundHandlerAdapter,覆盖自己关心的方法即可。 + +下面的两个示例分别展示了:拦截 Channel Active事件,打印TCP链路建立成功日志,和 链路关闭的时候释放资源,代码如下。 +```java +public class MyInboundHandler extends ChannelInboundHandlerAdapter { + @Override + public void channelActive(ChannelHandlerContext context) { + System.out.println("欢迎来到,LPL!"); + context.fireChannelActive(); + } +} + +public class MyOutboundHandler extends ChannelOutboundHandlerAdapter { + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + System.out.println("游戏结束..."); + ctx.close(); + } +} +``` + +#### 构建 pipeline +使用 Netty 时,用户不需要自己创建 pipeline,因为使用 ServerBootstrap 或者 Bootstrap 进行配置后,Netty 会为每个 Channel连接 创建一个独立的pipeline。我们只需将自定义的 ChannelHandler 加入到 pipeline 即可。相关代码如下。 +```java +ServerBootstrap server = new ServerBootstrap(); +server.childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + /** 解析自定义协议 */ + pipeline.addLast(new MyDecoder()); + pipeline.addLast(new MyEncoder()); + pipeline.addLast(new SocketHandler()); + /** 解析Http请求 */ + pipeline.addLast(new HttpServerCodec()); + //主要是将同一个http请求或响应的多个消息对象变成一个 fullHttpRequest完整的消息对象 + pipeline.addLast(new HttpObjectAggregator(64 * 1024)); + //主要用于处理大数据流,比如一个1G大小的文件如果你直接传输肯定会撑暴jvm内存的 ,加上这个handler我们就不用考虑这个问题了 + pipeline.addLast(new ChunkedWriteHandler()); + } +}); +``` +对于类似编解码这样的 ChannelHandler,它存在先后顺序,例如 MessageToMessageDecoder,在它之前往往需要有 ByteToMessageDecoder 将 ByteBuf 解码为对象,然后将对象做二次解码 得到最终的 POJO对象。pipeline 支持指定位置添加或者删除ChannelHandler。 + +#### ChannelPipeline 的主要特性 +ChannelPipeline 支持运行时动态的添加或者删除 ChannelHandler,在某些场景下这个特性非常实用。例如当业务高峰期需要对系统做拥塞保护时,就可以根据当前的系统时间进行判断,如果处于业务高峰期,则动态地将 系统拥塞保护ChannelHandler 添加到当前的ChannelPipeline 中,当高峰期过去之后,再动态删除 拥塞保护ChannelHandler。 + +ChannelPipeline 是线程安全的,这意味着 N 个业务线程可以并发地操作 ChannelPipeline 而不存在多线程并发问题。但 ChannelHandler 不是线程安全的,这意味着 我们需要自己保证 ChannelHandler 的线程安全。 + +## ChannelPipeline 源码解析 +ChannelPipeline 的代码比较简单,它实际上是一个 ChannelHandler容器,内部维护了一个 ChannelHandler 的链表和迭代器,可以方便地进行 ChannelHandler 的 CRUD。 + +另外一个比较重要的部分是,当发生某个 I/O事件 时,如 链路建立、链路关闭、读写操作 等,都会产一个事件,事件在 pipeline 中传播和处理,它是事件处理的总入口。由于 网络I/O 相关的事件有限,因此 Netty 对这些事件进行了统一抽象,Netty 提供的 和用户自定义的 ChannelHandler 会对感兴趣的事件进行拦截和处理。 + +pipeline 中以 fireXXX 命名的方法都是从 I/O线程 流向 用户业务Handler 的 inbound事件,它们的实现因功能而异,但是处理步骤类似,都是 调用HeadHandler 对应的 fireXXX方法,然后执行事件相关的逻辑操作。 + +```java +public interface ChannelPipeline + extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable> { + + /** + * 管理 ChannelHandler 的api + */ + ChannelPipeline addFirst(String name, ChannelHandler handler); + + ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler); + + ChannelPipeline addLast(String name, ChannelHandler handler); + + ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler); + + ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler); + + ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler); + + ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler); + + ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler); + + ChannelPipeline addFirst(ChannelHandler... handlers); + + ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers); + + ChannelPipeline addLast(ChannelHandler... handlers); + + ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers); + + ChannelPipeline remove(ChannelHandler handler); + + ChannelHandler remove(String name); + + T remove(Class handlerType); + + ChannelHandler removeFirst(); + + ChannelHandler removeLast(); + + ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler); + + ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler); + + T replace(Class oldHandlerType, String newName, ChannelHandler newHandler); + + ChannelHandler first(); + + ChannelHandler last(); + + ChannelHandler get(String name); + + T get(Class handlerType); + + /** + * 处理 I/O事件 的api + */ + @Override + ChannelPipeline fireChannelRegistered(); + + @Override + ChannelPipeline fireChannelUnregistered(); + + @Override + ChannelPipeline fireChannelActive(); + + @Override + ChannelPipeline fireChannelInactive(); + + @Override + ChannelPipeline fireExceptionCaught(Throwable cause); + + @Override + ChannelPipeline fireUserEventTriggered(Object event); + + @Override + ChannelPipeline fireChannelRead(Object msg); + + @Override + ChannelPipeline fireChannelReadComplete(); + + @Override + ChannelPipeline fireChannelWritabilityChanged(); + + @Override + ChannelPipeline flush(); +} +``` + +## ChannelHandler 的功能和作用 +ChannelHandler 负责对 I/O事件 进行拦截处理,它可以选择性地 拦截处理感兴趣的事件,也可以透传和终止事件的传递。基于 ChannelHandler接口,我们可以方便地进行业务逻辑定制,如 打印日志、统一封装异常信息、性能统计和消息编解码等。 + +#### ChannelHandlerAdapter +大部分 ChannelHandler 都会选择性 拦截处理感兴趣的 I/O事件,忽略其他事件,然后交由下一个 ChannelHandler 进行拦截处理。这会导致一个问题:自定义 ChannelHandler 必须要实现 ChannelHandler 的所有接口,包括它不关心的那些事件处理接口,这会导致用户代码的冗余和臃肿,代码的可维护性也会变差。 + +为了解决这个问题,Netty 提供了 ChannelHandlerAdapter 基类,和 ChannelInboundHandlerAdapter / ChannelOutboundHandlerAdapter 两个实现类,如果 自定义ChannelHandler 关心某个事件,只需要继承 ChannelInboundHandlerAdapter / ChannelOutboundHandlerAdapter 覆盖对应的方法即可,对于不关心的,可以直接继承使用父类的方法,这样子类的代码就会非常简洁清晰。 + +## ChannelHandler组件 的类结构 +相对于 ByteBuf 和 Channel,ChannelHandler 的类继承关系稍微简单些,但是它的子类非常多,功能各异,主要可以分为如下四类。 +1. ChannelPipeline 的系统 ChannelHandler,用于 I/O操作 和对事件进行预处理,对用户不可见,这类 ChannelHandler 主要包括 HeadHandler 和 TailHandler; +2. 编解码ChannelHandler,如 MessageToMessageEncoder、MessageToMessageDecoder、MessageToMessageCodec; +3. 其他系统功能性 ChannelHandler,如 流量整型Handler、读写超时Handler、日志Handler等; +4. 自定义 ChannelHandler。 + +ChannelHandler组件 的核心类及常用类的类图如下。 + +![在这里插入图片描述](../../../images/Netty/ChannelHandler组件.png) \ No newline at end of file diff --git a/docs/Netty/Netty主要组件源码分析/Channel和Unsafe组件.md b/docs/Netty/Netty主要组件源码分析/Channel和Unsafe组件.md index fcb5dbe..a6c8652 100644 --- a/docs/Netty/Netty主要组件源码分析/Channel和Unsafe组件.md +++ b/docs/Netty/Netty主要组件源码分析/Channel和Unsafe组件.md @@ -1 +1,954 @@ -努力编写中... \ No newline at end of file +类似于 java.nio包 的 Channel,Netty 提供了自己的 Channel 和其子类实现,用于异步 I/O操作 等。Unsafe 是 Channel 的内部接口,聚合在 Channel 中协助进行网络读写相关的操作,因为它的设计初衷就是 Channel 的内部辅助类,不应该被 Netty框架 的上层使用者调用,所以被命名为 Unsafe。 + +## Channel 组件 +Netty 的 **Channel组件 是 Netty 对网络操作的封装**,**如 网络数据的读写,与客户端建立连接**,主动关闭连接 等,也包含了 Netty框架 相关的一些功能,如 获取该 Chanel 的 **EventLoop、ChannelPipeline** 等。另外,Netty 并没有直接使用 java.nio包 的 SocketChannel和ServerSocketChannel,而是**使用 NioSocketChannel和NioServerSocketChannel 对其进行了进一步的封装**。下面我们先从 Channel接口 的API开始分析,然后看一下其重要子类的源码实现。 + +为了便于后面的阅读源码,我们先看下 NioSocketChannel 和 NioServerSocketChannel 的继承关系类图。 +![在这里插入图片描述](../../../images/Netty/Netty的Channel组件.png) +#### Channel 接口 +```java +public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable { + + /** + * Channel 需要注册到 EventLoop 的多路复用器上,用于处理 I/O事件, + * EventLoop 实际上就是处理网络读写事件的 Reactor线程。 + */ + EventLoop eventLoop(); + + /** + * ChannelMetadata 封装了 TCP参数配置 + */ + ChannelMetadata metadata(); + + /** + * 对于服务端Channel而言,它的父Channel为空; + * 对于客户端Channel,它的 父Channel 就是创建它的 ServerSocketChannel + */ + Channel parent(); + + /** + * 每个 Channel 都有一个全局唯一标识 + */ + ChannelId id(); + + /** + * 获取当前 Channel 的配置信息,如 CONNECT_TIMEOUT_MILLIS + */ + ChannelConfig config(); + + /** + * 当前 Channel 是否已经打开 + */ + boolean isOpen(); + + /** + * 当前 Channel 是否已注册进 EventLoop + */ + boolean isRegistered(); + + /** + * 当前 Channel 是否已激活 + */ + boolean isActive(); + + /** + * 当前 Channel 的本地绑定地址 + */ + SocketAddress localAddress(); + + /** + * 当前 Channel 的远程绑定地址 + */ + SocketAddress remoteAddress(); + + /** + * 当前 Channel 是否可写 + */ + boolean isWritable(); + + /** + * 当前 Channel 内部的 Unsafe对象 + */ + Unsafe unsafe(); + + /** + * 当前 Channel 持有的 ChannelPipeline + */ + ChannelPipeline pipeline(); + + /** + * 从当前 Channel 中读取数据到第一个 inbound缓冲区 中,如果数据被成功读取, + * 触发ChannelHandler.channelRead(ChannelHandlerContext,Object)事件。 + * 读取操作API调用完成之后,紧接着会触发ChannelHandler.channelReadComplete(ChannelHandlerContext)事件, + * 这样业务的ChannelHandler可以决定是否需要继续读取数据。如果己经有读操作请求被挂起,则后续的读操作会被忽略。 + */ + @Override + Channel read(); + + /** + * 将之前写入到发送环形数组中的消息全部写入到目标Chanel中,发送给通信对方 + */ + @Override + Channel flush(); +} +``` + +#### AbstractChannel +```java +public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { + + // 父Channel + private final Channel parent; + // Channel的全局唯一标识 + private final ChannelId id; + // 内部辅助类 Unsafe + private final Unsafe unsafe; + // Netty 会为每一个 channel 创建一个 pipeline + private final DefaultChannelPipeline pipeline; + + // 本地地址 + private volatile SocketAddress localAddress; + // 远程主机地址 + private volatile SocketAddress remoteAddress; + // 注册到了哪个 EventLoop 上 + private volatile EventLoop eventLoop; + // 是否已注册 + private volatile boolean registered; + + /** + * channnel 会将 网络IO操作 触发到 ChannelPipeline 对应的事件方法。 + * Netty 基于事件驱动,我们也可以理解为当 Chnanel 进行 IO操作 时会产生对应的IO 事件, + * 然后驱动事件在 ChannelPipeline 中传播,由对应的 ChannelHandler 对事件进行拦截和处理, + * 不关心的事件可以直接忽略 + */ + @Override + public ChannelFuture bind(SocketAddress localAddress) { + return pipeline.bind(localAddress); + } + + @Override + public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { + return pipeline.bind(localAddress, promise); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress) { + return pipeline.connect(remoteAddress); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { + return pipeline.connect(remoteAddress, localAddress); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { + return pipeline.connect(remoteAddress, promise); + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { + return pipeline.connect(remoteAddress, localAddress, promise); + } + + @Override + public ChannelFuture disconnect() { + return pipeline.disconnect(); + } + + @Override + public ChannelFuture disconnect(ChannelPromise promise) { + return pipeline.disconnect(promise); + } + + @Override + public ChannelFuture close() { + return pipeline.close(); + } + + @Override + public ChannelFuture close(ChannelPromise promise) { + return pipeline.close(promise); + } + + @Override + public ChannelFuture deregister() { + return pipeline.deregister(); + } + + @Override + public ChannelFuture deregister(ChannelPromise promise) { + return pipeline.deregister(promise); + } + + @Override + public Channel flush() { + pipeline.flush(); + return this; + } + + @Override + public Channel read() { + pipeline.read(); + return this; + } + + @Override + public ChannelFuture write(Object msg) { + return pipeline.write(msg); + } + + @Override + public ChannelFuture write(Object msg, ChannelPromise promise) { + return pipeline.write(msg, promise); + } + + @Override + public ChannelFuture writeAndFlush(Object msg) { + return pipeline.writeAndFlush(msg); + } + + @Override + public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { + return pipeline.writeAndFlush(msg, promise); + } +} +``` + +#### AbstractNioChannel + +```java +public abstract class AbstractNioChannel extends AbstractChannel { + + // AbstractNioChannel 是 NioSocketChannel和NioServerSocketChannel 的公共父类,所以定义 + // 了一个 java.nio 的 SocketChannel 和 ServerSocketChannel 的公共父类 SelectableChannel, + // 用于设置 SelectableChannel参数 和进行 IO操作 + private final SelectableChannel ch; + // 它代表了 JDK 的 SelectionKey.OP_READ + protected final int readInterestOp; + // 该 SelectionKey 是 Channel 注册到 EventLoop 后返回的, + // 由于 Channel 会面临多个业务线程的并发写操作,当 SelectionKey 被修改了, + // 需要让其他业务线程感知到变化,所以使用volatile保证修改的可见性 + volatile SelectionKey selectionKey; + + /** + * Channel 的注册 + */ + @Override + protected void doRegister() throws Exception { + boolean selected = false; + for (;;) { + try { + selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); + return; + } catch (CancelledKeyException e) { + if (!selected) { + // Force the Selector to select now as the "canceled" SelectionKey may still be + // cached and not removed because no Select.select(..) operation was called yet. + eventLoop().selectNow(); + selected = true; + } else { + // We forced a select operation on the selector before but the SelectionKey is still cached + // for whatever reason. JDK bug ? + throw e; + } + } + } + } + + protected SelectableChannel javaChannel() { + return ch; + } + + @Override + protected void doBeginRead() throws Exception { + // Channel.read() 或 ChannelHandlerContext.read() 被调用 + final SelectionKey selectionKey = this.selectionKey; + if (!selectionKey.isValid()) { + return; + } + + readPending = true; + + final int interestOps = selectionKey.interestOps(); + if ((interestOps & readInterestOp) == 0) { + selectionKey.interestOps(interestOps | readInterestOp); + } + } +} +``` + +#### NioServerSocketChannel +```java +public class NioServerSocketChannel extends AbstractNioMessageChannel + implements io.netty.channel.socket.ServerSocketChannel { + + // java.nio 包的内容,用于获取 java.nio.channels.ServerSocketChannel 实例 + private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); + + private static ServerSocketChannel newSocket(SelectorProvider provider) { + try { + /** + * 获取的是 java.nio.channels.ServerSocketChannel 实例 + */ + return provider.openServerSocketChannel(); + } catch (IOException e) { + throw new ChannelException("Failed to open a server socket.", e); + } + } + + /** + * Create a new instance + */ + public NioServerSocketChannel() { + this(newSocket(DEFAULT_SELECTOR_PROVIDER)); + } + + /** + * 在父类中完成了 非阻塞IO的配置,及事件的注册 + */ + public NioServerSocketChannel(ServerSocketChannel channel) { + super(null, channel, SelectionKey.OP_ACCEPT); + config = new NioServerSocketChannelConfig(this, javaChannel().socket()); + } + + /** + * 对 NioServerSocketChannel 来说,它的读取操作就是接收客户端的连接,创建 NioSocketChannel对象 + */ + @Override + protected int doReadMessages(List buf) throws Exception { + // 首先通过 ServerSocketChannel 的 accept()方法 接收新的客户端连接, + // 获取 java.nio.channels.SocketChannel 对象 + SocketChannel ch = SocketUtils.accept(javaChannel()); + + try { + // 如果获取到客户端连接对象 SocketChannel,则利用当前的 NioServerSocketChannel、EventLoop + // 和 SocketChannel 创建新的 NioSocketChannel,并添加到 buf 中 + if (ch != null) { + buf.add(new NioSocketChannel(this, ch)); + return 1; + } + } catch (Throwable t) { + logger.warn("Failed to create a new channel from an accepted socket.", t); + + try { + ch.close(); + } catch (Throwable t2) { + logger.warn("Failed to close a socket.", t2); + } + } + + return 0; + } +} +``` + + +#### NioSocketChannel +```java +public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel { + + // 与 NioServerSocketChannel 一样,也依赖了 java.nio包 的API + private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); + + /** + * 从这里可以看出,NioSocketChannel 对 java.nio.channels.SocketChannel 做了进一步封装 + * 使其 适用于 Netty框架 + */ + private static SocketChannel newSocket(SelectorProvider provider) { + try { + return provider.openSocketChannel(); + } catch (IOException e) { + throw new ChannelException("Failed to open a socket.", e); + } + } + + /** + * Create a new instance + */ + public NioSocketChannel() { + this(DEFAULT_SELECTOR_PROVIDER); + } + + public NioSocketChannel(SelectorProvider provider) { + this(newSocket(provider)); + } + + public NioSocketChannel(SocketChannel socket) { + this(null, socket); + } + + public NioSocketChannel(Channel parent, SocketChannel socket) { + // 在父类中完成 非阻塞IO的配置,注册事件 + super(parent, socket); + config = new NioSocketChannelConfig(this, socket.socket()); + } + + @Override + protected SocketChannel javaChannel() { + return (SocketChannel) super.javaChannel(); + } + + @Override + public boolean isActive() { + SocketChannel ch = javaChannel(); + return ch.isOpen() && ch.isConnected(); + } + + /** + * 与远程服务器建立连接 + */ + @Override + protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception { + if (localAddress != null) { + doBind0(localAddress); + } + + boolean success = false; + try { + // 根据远程地址建立TCP连接,对连接结果进行判断 + boolean connected = SocketUtils.connect(javaChannel(), remoteAddress); + if (!connected) { + selectionKey().interestOps(SelectionKey.OP_CONNECT); + } + success = true; + return connected; + } finally { + if (!success) { + doClose(); + } + } + } + + /** + * 关闭 Channel + */ + @Override + protected void doClose() throws Exception { + super.doClose(); + javaChannel().close(); + } + + /** + * 从 Channel 中读取数据 + */ + @Override + protected int doReadBytes(ByteBuf byteBuf) throws Exception { + final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); + allocHandle.attemptedBytesRead(byteBuf.writableBytes()); + return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead()); + } + + @Override + protected int doWriteBytes(ByteBuf buf) throws Exception { + final int expectedWrittenBytes = buf.readableBytes(); + return buf.readBytes(javaChannel(), expectedWrittenBytes); + } + + /** + * 向 Channel 中写数据 + */ + @Override + protected void doWrite(ChannelOutboundBuffer in) throws Exception { + SocketChannel ch = javaChannel(); + int writeSpinCount = config().getWriteSpinCount(); + do { + if (in.isEmpty()) { + // All written so clear OP_WRITE + clearOpWrite(); + // Directly return here so incompleteWrite(...) is not called. + return; + } + + // Ensure the pending writes are made of ByteBufs only. + int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite(); + ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite); + int nioBufferCnt = in.nioBufferCount(); + + // Always us nioBuffers() to workaround data-corruption. + // See https://github.com/netty/netty/issues/2761 + switch (nioBufferCnt) { + case 0: + // We have something else beside ByteBuffers to write so fallback to normal writes. + writeSpinCount -= doWrite0(in); + break; + case 1: { + // Only one ByteBuf so use non-gathering write + // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need + // to check if the total size of all the buffers is non-zero. + ByteBuffer buffer = nioBuffers[0]; + int attemptedBytes = buffer.remaining(); + final int localWrittenBytes = ch.write(buffer); + if (localWrittenBytes <= 0) { + incompleteWrite(true); + return; + } + adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite); + in.removeBytes(localWrittenBytes); + --writeSpinCount; + break; + } + default: { + // Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need + // to check if the total size of all the buffers is non-zero. + // We limit the max amount to int above so cast is safe + long attemptedBytes = in.nioBufferSize(); + final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt); + if (localWrittenBytes <= 0) { + incompleteWrite(true); + return; + } + // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above. + adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes, + maxBytesPerGatheringWrite); + in.removeBytes(localWrittenBytes); + --writeSpinCount; + break; + } + } + } while (writeSpinCount > 0); + + incompleteWrite(writeSpinCount < 0); + } +} +``` + +## Unsafe 功能简介 +Unsafe接口 实际上是 **Channel接口 的辅助接口**,它不应该被用户代码直接调用。**实际的 IO读写操作 都是由 Unsafe接口 负责完成的**。 + +```java +public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable { + + interface Unsafe { + + /** + * 返回绑定的 本地地址 + */ + SocketAddress localAddress(); + + /** + * 返回绑定的 远程地址 + */ + SocketAddress remoteAddress(); + + /** + * 将 Channel 注册到 EventLoop 上 + */ + void register(EventLoop eventLoop, ChannelPromise promise); + + /** + * 绑定 本地地址 到 Channel 上 + */ + void bind(SocketAddress localAddress, ChannelPromise promise); + + /** + * 连接到远程服务器 + */ + void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise); + + /** + * 断开连接 + */ + void disconnect(ChannelPromise promise); + + /** + * 关闭 Channel + */ + void close(ChannelPromise promise); + + /** + * 读就绪 网络事件 + */ + void beginRead(); + + /** + * 发送数据 + */ + void write(Object msg, ChannelPromise promise); + + /** + * 将缓冲区的数据 刷到 Channel + */ + void flush(); + } +} +``` + +#### AbstractUnsafe +```java +public abstract class AbstractChannel extends DefaultAttributeMap implements Channel { + + protected abstract class AbstractUnsafe implements Unsafe { + /** + * 将当前 Unsafe 对应的 Channel 注册到 EventLoop 的多路复用器上, + * 然后调用 DefaultChannelPipeline 的 fireChannelRegistered()方法, + * 如果 Channel 被激活 则调用 DefaultChannelPipeline 的 fireChannelActive()方法 + */ + @Override + public final void register(EventLoop eventLoop, final ChannelPromise promise) { + if (eventLoop == null) { + throw new NullPointerException("eventLoop"); + } + if (isRegistered()) { + promise.setFailure(new IllegalStateException("registered to an event loop already")); + return; + } + if (!isCompatible(eventLoop)) { + promise.setFailure( + new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); + return; + } + + AbstractChannel.this.eventLoop = eventLoop; + + if (eventLoop.inEventLoop()) { + register0(promise); + } else { + try { + eventLoop.execute(new Runnable() { + @Override + public void run() { + register0(promise); + } + }); + } catch (Throwable t) { + logger.warn( + "Force-closing a channel whose registration task was not accepted by an event loop: {}", + AbstractChannel.this, t); + closeForcibly(); + closeFuture.setClosed(); + safeSetFailure(promise, t); + } + } + } + + private void register0(ChannelPromise promise) { + try { + // check if the channel is still open as it could be closed in the mean time when the register + // call was outside of the eventLoop + if (!promise.setUncancellable() || !ensureOpen(promise)) { + return; + } + boolean firstRegistration = neverRegistered; + doRegister(); + neverRegistered = false; + registered = true; + + // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the + // user may already fire events through the pipeline in the ChannelFutureListener. + pipeline.invokeHandlerAddedIfNeeded(); + + safeSetSuccess(promise); + pipeline.fireChannelRegistered(); + // Only fire a channelActive if the channel has never been registered. This prevents firing + // multiple channel actives if the channel is deregistered and re-registered. + if (isActive()) { + if (firstRegistration) { + pipeline.fireChannelActive(); + } else if (config().isAutoRead()) { + // This channel was registered before and autoRead() is set. This means we need to begin read + // again so that we process inbound data. + // + // See https://github.com/netty/netty/issues/4805 + beginRead(); + } + } + } catch (Throwable t) { + // Close the channel directly to avoid FD leak. + closeForcibly(); + closeFuture.setClosed(); + safeSetFailure(promise, t); + } + } + + /** + * 绑定指定的端口,对于服务端 用于绑定监听端口, + * 对于客户端,主要用于指定 客户端Channel 的本地绑定Socket地址。 + */ + @Override + public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { + assertEventLoop(); + + if (!promise.setUncancellable() || !ensureOpen(promise)) { + return; + } + + // See: https://github.com/netty/netty/issues/576 + if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) && + localAddress instanceof InetSocketAddress && + !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() && + !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) { + // Warn a user about the fact that a non-root user can't receive a + // broadcast packet on *nix if the socket is bound on non-wildcard address. + logger.warn( + "A non-root user can't receive a broadcast packet if the socket " + + "is not bound to a wildcard address; binding to a non-wildcard " + + "address (" + localAddress + ") anyway as requested."); + } + + boolean wasActive = isActive(); + try { + doBind(localAddress); + } catch (Throwable t) { + safeSetFailure(promise, t); + closeIfClosed(); + return; + } + + if (!wasActive && isActive()) { + invokeLater(new Runnable() { + @Override + public void run() { + pipeline.fireChannelActive(); + } + }); + } + + safeSetSuccess(promise); + } + + /** + * 客户端 或 服务端,主动关闭连接 + */ + @Override + public final void disconnect(final ChannelPromise promise) { + assertEventLoop(); + + if (!promise.setUncancellable()) { + return; + } + + boolean wasActive = isActive(); + try { + doDisconnect(); + } catch (Throwable t) { + safeSetFailure(promise, t); + closeIfClosed(); + return; + } + + if (wasActive && !isActive()) { + invokeLater(new Runnable() { + @Override + public void run() { + pipeline.fireChannelInactive(); + } + }); + } + + safeSetSuccess(promise); + closeIfClosed(); // doDisconnect() might have closed the channel + } + + /** + * 在链路关闭之前需要首先判断是否处于刷新状态,如果处于刷新状态说明还有消息尚 + * 未发送出去,需要等到所有消息发送完成再关闭链路,因此,将关闭操作封装成Runnable稍后再执行 + */ + @Override + public final void close(final ChannelPromise promise) { + assertEventLoop(); + + close(promise, CLOSE_CLOSED_CHANNEL_EXCEPTION, CLOSE_CLOSED_CHANNEL_EXCEPTION, false); + } + + /** + * 本方法实际上将消息添加到环形发送数组中,并不是真正的写Channel + */ + @Override + public final void write(Object msg, ChannelPromise promise) { + assertEventLoop(); + + ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; + if (outboundBuffer == null) { + // If the outboundBuffer is null we know the channel was closed and so + // need to fail the future right away. If it is not null the handling of the rest + // will be done in flush0() + // See https://github.com/netty/netty/issues/2362 + safeSetFailure(promise, newWriteException(initialCloseCause)); + // release message now to prevent resource-leak + ReferenceCountUtil.release(msg); + return; + } + + int size; + try { + msg = filterOutboundMessage(msg); + size = pipeline.estimatorHandle().size(msg); + if (size < 0) { + size = 0; + } + } catch (Throwable t) { + safeSetFailure(promise, t); + ReferenceCountUtil.release(msg); + return; + } + + outboundBuffer.addMessage(msg, size, promise); + } + + /** + * 将缓冲区中待发送的消息全部写入 Channel,并发送给通信对方 + */ + @Override + public final void flush() { + assertEventLoop(); + + ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; + if (outboundBuffer == null) { + return; + } + + outboundBuffer.addFlush(); + flush0(); + } + + @SuppressWarnings("deprecation") + protected void flush0() { + if (inFlush0) { + // Avoid re-entrance + return; + } + + final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; + if (outboundBuffer == null || outboundBuffer.isEmpty()) { + return; + } + + inFlush0 = true; + + // Mark all pending write requests as failure if the channel is inactive. + if (!isActive()) { + try { + if (isOpen()) { + outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true); + } else { + // Do not trigger channelWritabilityChanged because the channel is closed already. + outboundBuffer.failFlushed(newFlush0Exception(initialCloseCause), false); + } + } finally { + inFlush0 = false; + } + return; + } + + try { + doWrite(outboundBuffer); + } catch (Throwable t) { + if (t instanceof IOException && config().isAutoClose()) { + /** + * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of + * failing all flushed messages and also ensure the actual close of the underlying transport + * will happen before the promises are notified. + * + * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()} + * may still return {@code true} even if the channel should be closed as result of the exception. + */ + initialCloseCause = t; + close(voidPromise(), t, newFlush0Exception(t), false); + } else { + try { + shutdownOutput(voidPromise(), t); + } catch (Throwable t2) { + initialCloseCause = t; + close(voidPromise(), t2, newFlush0Exception(t), false); + } + } + } finally { + inFlush0 = false; + } + } + } +} +``` + +#### AbstractNioUnsafe +AbstractNioUnsafe 是 AbstractUnsafe类 的 NIO实现,它主要实现了 connect 、finishConnect 等方法。 +```java +public abstract class AbstractNioChannel extends AbstractChannel { + + /** + * 获取当前的连接状态进行缓存,然后发起连接操作。 + */ + @Override + public final void connect( + final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { + if (!promise.setUncancellable() || !ensureOpen(promise)) { + return; + } + + try { + if (connectPromise != null) { + // Already a connect in process. + throw new ConnectionPendingException(); + } + + boolean wasActive = isActive(); + if (doConnect(remoteAddress, localAddress)) { + fulfillConnectPromise(promise, wasActive); + } else { + connectPromise = promise; + requestedRemoteAddress = remoteAddress; + + // Schedule connect timeout. + int connectTimeoutMillis = config().getConnectTimeoutMillis(); + if (connectTimeoutMillis > 0) { + connectTimeoutFuture = eventLoop().schedule(new Runnable() { + @Override + public void run() { + ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise; + ConnectTimeoutException cause = + new ConnectTimeoutException("connection timed out: " + remoteAddress); + if (connectPromise != null && connectPromise.tryFailure(cause)) { + close(voidPromise()); + } + } + }, connectTimeoutMillis, TimeUnit.MILLISECONDS); + } + + promise.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isCancelled()) { + if (connectTimeoutFuture != null) { + connectTimeoutFuture.cancel(false); + } + connectPromise = null; + close(voidPromise()); + } + } + }); + } + } catch (Throwable t) { + promise.tryFailure(annotateConnectException(t, remoteAddress)); + closeIfClosed(); + } + } + + /** + * 对 TCP三次握手连接结果 进行判断 + */ + @Override + public final void finishConnect() { + // Note this method is invoked by the event loop only if the connection attempt was + // neither cancelled nor timed out. + + assert eventLoop().inEventLoop(); + + try { + boolean wasActive = isActive(); + doFinishConnect(); + fulfillConnectPromise(connectPromise, wasActive); + } catch (Throwable t) { + fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); + } finally { + // Check for null as the connectTimeoutFuture is only created if a connectTimeoutMillis > 0 is used + // See https://github.com/netty/netty/issues/1770 + if (connectTimeoutFuture != null) { + connectTimeoutFuture.cancel(false); + } + connectPromise = null; + } + } + } +} +``` \ No newline at end of file diff --git a/docs/Netty/Netty技术细节源码分析/FastThreadLocal源码分析.md b/docs/Netty/Netty技术细节源码分析/FastThreadLocal源码分析.md new file mode 100644 index 0000000..e2ece3d --- /dev/null +++ b/docs/Netty/Netty技术细节源码分析/FastThreadLocal源码分析.md @@ -0,0 +1,186 @@ +# Netty 的 FastThreadLocal 源码解析 + +该文中涉及到的 Netty 源码版本为 4.1.6。 + +## Netty 的 FastThreadLocal 是什么 + +> A special variant of ThreadLocal that yields higher access performance when accessed from a FastThreadLocalThread. +> Internally, a FastThreadLocal uses a constant index in an array, instead of using hash code and hash table, to look for a variable. Although seemingly very subtle, it yields slight performance advantage over using a hash table, and it is useful when accessed frequently. +> To take advantage of this thread-local variable, your thread must be a FastThreadLocalThread or its subtype. By default, all threads created by DefaultThreadFactory are FastThreadLocalThread due to this reason. +> Note that the fast path is only possible on threads that extend FastThreadLocalThread, because it requires a special field to store the necessary state. An access by any other kind of thread falls back to a regular ThreadLocal. + +以上是 Netty 官方文档中关于 FastThreadLocal 的介绍。 + +简而言之,FastThreadLocal 是在 ThreadLocal 实现上的一种变种,相比 ThreadLocal 内部通过将自身 hash 的方式在 hashTable 上定位需要的变量存储位置,FastThreadLocal 选择在数组上的一个固定的常量位置来存放线程本地变量,这样的操作看起来并没有太大区别,但是相比 ThreadLocal 的确体现了性能上的优势,尤其是在读操作频繁的场景下。 + +## 如何使用 FastThreadLocal + +如果想要得到 FastThreadLocal 的速度优势,必须通过 FastThreadLocalThread 或者其子类的线程,才可以使用,因为这个原因,Netty 的 DefaultThreadFactory,其内部默认线程工厂的 newThread()方法就是直接初始化一个 FastThreadLocalThread ,以便期望在 ThreadLocal 的操作中,得到其性能上带来的优势。 + +```java +protected Thread newThread(Runnable r, String name) { + return new FastThreadLocalThread(threadGroup, r, name); +} +``` + +## FastThreadLocal 的源码实现 + +### FastThreadLocal 被访问的入口 + +当需要用到 FastThreadLocal 的时候,想必和 jdk 原生的 ThreadLocal 的 api 类似,都是通过初始化一个新的 FastThreadLocal 之后,通过其 set()方法初始化并放入一个变量作为线程本地变量存储。 + +```java +public final void set(V value) { + if (value != InternalThreadLocalMap.UNSET) { + set(InternalThreadLocalMap.get(), value); + } else { + remove(); + } +} +``` + +因此,在 FastThreadLocal 的 set()方法中,可以看到,存储本地线程变量的数据结构是一个 InternalThreadLocalMap。 + +```java +private InternalThreadLocalMap threadLocalMap; +``` + +在 FastThreadLocalThread 中,因为本身 threadLocalMap 就是其中的一个成员,能够快速得到返回。而其他线程实现,就将面临没有这个成员的尴尬,Netty 也给出了相应的兼容。 + +```java +public static InternalThreadLocalMap get() { + Thread thread = Thread.currentThread(); + if (thread instanceof FastThreadLocalThread) { + return fastGet((FastThreadLocalThread) thread); + } else { + return slowGet(); + } +} +``` + +InternalThreadLocalMap 的 get()方法中,当前线程如果是 FastThreadLocalThread 或是其子类的实现,变直接返回其 InternalThreadLocalMap 进行操作,但对于不属于上述条件的线程,Netty 通过 slowGet()的方式,也将返回一个 InternalThreadLocalMap。 + +```java +private static InternalThreadLocalMap slowGet() { + ThreadLocal slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap; + InternalThreadLocalMap ret = slowThreadLocalMap.get(); + if (ret == null) { + ret = new InternalThreadLocalMap(); + slowThreadLocalMap.set(ret); + } + return ret; +} +``` + +在 slowGet()方法中,当前线程对应的 InternalThreadLocalMap 会通过原生 jdk 下 ThreadLocal 的方式存储并通过 ThreadLocal 返回,因此,在这个场景下,使用的还是 jdk 原生的 ThreadLocal,但是只占用了原生 ThreadLocal 下的 Entry[]数组的一个位置,具体的变量还是存放在专门为 FastThreadLocal 服务的 InternalThreadLocalMap 中。 +在此,随着 InternalThreadLocalMap 的得到并返回,针对 FastThreadLocal 的 get 和 set 操作,也将变为操作 InternalThreadLocalMap 来达到目的,FastThreadLocal 性能优越的原因,也在 InternalThreadLocalMap 当中。 + +### InternalThreadLocalMap 的内部构造 + +```java +static final AtomicInteger nextIndex = new AtomicInteger(); + +Object[] indexedVariables; +``` + +InternalThreadlocalMap 主要由以上两个成员组成,其中 indexedVariables 作为一个 Object[]数组,直接用来存放 FastThreadLocal 对应的 value,每个 FastThreadLocal 对象都会在相应的线程的 ThreadLocalMap 中被分配到对应的 index,而这里的具体下标,则由以上的 nextIndex 成员在每个 FastThreadLocal 初始化的时候分配。 + +```java +private final int index; + +public FastThreadLocal() { + index = InternalThreadLocalMap.nextVariableIndex(); +} +``` + +每个 FastThreadLocal 在构造方法的过程中,都会通过 InternalThreadlocalMap 的 nextVariableIndex()返回 nextIndex 自加后的结果作为其在 InternalThreadlocalMap 上的下标。后续该 FastThreadLocal 在操作变量的时候可以直接通过该 index 定位到 Object[]数组上的位置。 + +```java +private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex(); +``` + +而数组上的下标有一个特殊位,一般在其首位也就是 0 的位置,这个位置在 FastThreadLocal 类被加载的时候作为静态变量被设置。在这个位置上,存放的是一个 FastThreadLocal 对象集合,每个存放到 InternalThreadlocalMap 中的 FastThreadLocal 都会被保存在首位的集合中。 + +```java +public static final Object UNSET = new Object(); +``` + +另外,为了具体区分保存的变量是 null 还是不存在当前变量,InternalThreadLocalMap 中定义了一个为 NULL 的成员变量,以便区分上述情况,在一开始,InternalThreadLocalMap 中的 indexedVariables 数组都是 NULL。 + +### FastThreadLocal 的 set()方法的源码分析 + +相比 FastThreadLocal 的 set 操作,get 方法的过程与逻辑都要简单的多,因此此处主要以其 set 方法为主。 + +```java +public final void set(V value) { + if (value != InternalThreadLocalMap.UNSET) { + set(InternalThreadLocalMap.get(), value); + } else { + remove(); + } +} + +public final void set(InternalThreadLocalMap threadLocalMap, V value) { + if (value != InternalThreadLocalMap.UNSET) { + if (threadLocalMap.setIndexedVariable(index, value)) { + addToVariablesToRemove(threadLocalMap, this); + } + } else { + remove(threadLocalMap); + } +} +``` + +在其 set()方法中,首先会判断 set 的值是否是 InternalThreadLocalMap 中的 NULL 对象来判断是 set 操作还是 remove 操作,如果不是,会通过 InternalThreadLocalMap.get()方法获取当前线程对应的 InternalThreadLocalMap,获取的过程在前文已经描述过。 +之后的主要流程主要分为两步: + +- 调用 InternalThreadLocalMap 的 setIndexedVariable()方法,将该 FastThreadLocal 成员在构造方法中获得到的 InternalThreadLocalMap 上的下标作为入参传入。 + +```java +public boolean setIndexedVariable(int index, Object value) { + Object[] lookup = indexedVariables; + if (index < lookup.length) { + Object oldValue = lookup[index]; + lookup[index] = value; + return oldValue == UNSET; + } else { + expandIndexedVariableTableAndSet(index, value); + return true; + } +} +``` + +在 InternalThreadLocalMap 的 setIndexedVariable()方法过程中,set 的过程并不复杂,找到对应的下标,并将对应的值放到 InternalThreadLocalMap 数组下标对应的位置上即宣告结束。但是,因为 FastThreadLocal 在构造过程中虽然预先获得了对应的下标,但是实际上的数组大小可能完全还没有达到相应的大小,就要在此处通过 expandIndexedVariableTableAndSet()方法进行扩容,由于是数组的缘故,只需要扩容后将原来的值复制过去,并将剩余的值用 NULL 对象填满即可。 + +- 如果上一步 set 成功,通过 addToVariablesToRemove()方法将该 FastThreadLocal 对象放入到 InternalThreadLocalMap 的数组中的首位集合中。在这个集合中,对于 FastThreadLocal 是一个强引用。 + +这样,对于 FastThreadLocal 的一次 set 操作即宣告结束。 + +## 相比 ThreadLocal,FastThreadLocal 到底快在哪里 + +- FastThreadLocal 在具体的定位的过程中,只需要根据在构造方法里获取得到的具体下标就可以定位到具体的数组位置进行变量的存取,而在 jdk 原生的 ThreadLocal 中,具体位置的下标获取不仅需要计算 ThreadLocal 的 hash 值,并需要在 hashTable 上根据 key 定位的结果,一旦定位之后的结果上已经存在其他 ThreadLocal 的变量,那么则是通过线性探测法,在 hashTable 上寻找下一个位置进行,相比 FastThreadLocal 定位的过程要复杂的多。 +- FastThreadLocal 由于采取数组的方式,当面对扩容的时候,只需要将原数组中的内容复制过去,并用 NULL 对象填满剩余位置即可,而在 ThreadLocal 中,由于 hashTable 的缘故,在扩容后还需要进行一轮 rehash,在这过程中,仍旧存在 hash 冲突的可能。 +- 在 FastThreadLocal 中,遍历当前线程的所有本地变量,只需要将数组首位的集合即可,不需要遍历数组上的每一个位置。 +- 在原生的 ThreadLocal 中,由于可能存在 ThreadLocal 被回收,但是当前线程仍旧存活的情况导致 ThreadLocal 对应的本地变量内存泄漏的问题,因此在 ThreadLocal 的每次操作后,都会进行启发式的内存泄漏检测,防止这样的问题产生,但也在每次操作后花费了额外的开销。而在 FastThreadLocal 的场景下,由于数组首位的 FastThreadLocal 集合中保持着所有 FastThreadLocal 对象的引用,因此当外部的 FastThreadLocal 的引用被置为 null,该 FastThreadLocal 对象仍旧保持着这个集合的引用,不会被回收掉,只需要在线程当前业务操作后,手动调用 FastThreadLocal 的 removeAll()方法,将会遍历数组首位集合,回收掉所有 FastThreadLocal 的变量,避免内存泄漏的产生,也减少了原生 ThreadLocal 的启发式检测开销。 + +```java +private static final class DefaultRunnableDecorator implements Runnable { + + private final Runnable r; + + DefaultRunnableDecorator(Runnable r) { + this.r = r; + } + + @Override + public void run() { + try { + r.run(); + } finally { + FastThreadLocal.removeAll(); + } + } +} +``` + +在 Netty 的 DefaultThreadFactory 中,每个线程在执行为任务后都会调用 FastThreadLocal 的 removeAll()方法。 diff --git a/docs/Netty/Netty技术细节源码分析/Recycler对象池原理分析.md b/docs/Netty/Netty技术细节源码分析/Recycler对象池原理分析.md new file mode 100644 index 0000000..b0c5123 --- /dev/null +++ b/docs/Netty/Netty技术细节源码分析/Recycler对象池原理分析.md @@ -0,0 +1,143 @@ +该文所涉及的 netty 源码版本为 4.1.6。 + +## Netty 的对象池 Recycler 是什么 + +Recycler 是 Netty 中基于 ThreadLocal 的轻量化的对象池实现。既然是基于 ThreadLocal,那么就可以将其理解为当前线程在通过对象池 Recycler 得到一个对象之后,在回收对象的时候,不需要将其销毁,而是放回到该线程的对象池中即可,在该线程下一次用到该对象的时候,不需要重新申请空间创建,而是直接重新从对象池中获取。 + +## Recycler 在 netty 中被如何使用 + +Recycler 对象池在 netty 中最重要的使用,就在于 netty 的池化 ByteBuf 的场景下。首先,何为池化?以 PooledDirectByteBuf 举例,每一个 PooledDirectByteBuf 在应用线程中使用完毕之后,并不会被释放,而是等待被重新利用,类比线程池每个线程在执行完毕之后不会被立即释放,而是等待下一次执行的时候被重新利用。所谓的对象池也是如此,池化减少了 ByteBuf 创建和销毁的开销,也是 netty 高性能表现的基石之一。 + +```java +private static final Recycler RECYCLER = new Recycler() { + @Override + protected PooledDirectByteBuf newObject(Handle handle) { + return new PooledDirectByteBuf(handle, 0); + } +}; + +static PooledDirectByteBuf newInstance(int maxCapacity) { + PooledDirectByteBuf buf = RECYCLER.get(); + buf.reuse(maxCapacity); + return buf; +} +``` + +PooledDirectByteBuf 在其类加载的过程中,初始化了一个静态的 RECYCLER 成员,通过重写其 newObject()方法达到使 Recycler 可以初始化一个 PooledDirectByteBuf。而在接下来的使用中,只需要通过静态方法 newInstance()就可以从 RECYCLER 对象池的 get()方法获取一个新的 PooledDirectByteBuf 对象返回,而重写的方法 newObject()中的入参 Handler 则提供了 recycle()方法给出了对象重新放入池中回收的能力,这里的具体实现在下文展开。因此,newInstance()方法和 recycle()方法就提供了对象池出池和入池的能力,也通过此,PooledDirectByteBuf 达到了池化的目标。 + +## Recycler 的实现原理分析 + +**Recycler 的实现原理很抽象,可以先直接阅读文末的例子再阅读这部分内容。** +Recycler 中,最核心的是两个通过 ThreadLocal 作为本地线程私有的两个成员,而其实现原理只需要围绕这两个成员分析,就可以对对象池的设计有直接的理解和认识。 + +- 第一个成员是在 Recycler 被定义的 Stack 成员对象。 + +```java +private final FastThreadLocal> threadLocal = new FastThreadLocal>() { + @Override + protected Stack initialValue() { + return new Stack(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor, + ratioMask, maxDelayedQueuesPerThread); + } +}; +``` + +顾名思义,这个 Stack 主体是一个堆栈,但是其还维护着一个链表,而链表中的每一个节点都是一个队列。 + +```java +private DefaultHandle[] elements; +private WeakOrderQueue cursor, prev; +``` + +上述的 elements 数组便是存放当前线程被回收的对象,当当前线程从该线程的 Recycler 对象池尝试获取新的对象的时候,首先就会从当前 Stack 的这个数组中尝试获取已经在先前被创建并且在当前线程被回收的对象,因为当对象池的对象在当前线程被调用 recycle()的时候,是会直接放到 elements 数组中等待下一次的利用。 那么问题来了,如果从该线程中被申请的这个对象是在另外一个线程中被调用 recycle()方法回收呢?那么该对象就会处于链表中的队列中,当堆栈数组中的对象不存在的时候,将会尝试把链表队列中的对象转移到数组中供当前线程获取。那么其他线程是如何把被回收的对象放到这些链表中的队列的呢?接下来就是另一个成员的使命了。 + +- 第二个成员是在 Recycler 中也是通过 ThreadLocal 所实现的一个线程本地变量,DELAYED_RECYCLED ,是一个 Stack 和队列的映射 Map。 + +```java +private static final FastThreadLocal, WeakOrderQueue>> DELAYED_RECYCLED = + new FastThreadLocal, WeakOrderQueue>>() { + @Override + protected Map, WeakOrderQueue> initialValue() { + return new WeakHashMap, WeakOrderQueue>(); + } +}; +``` + +第二个成员 DELAYED_RECYCLED 可以通过上文的 Stack 获取一个队列。 +在前一个成员的解释中提到,当别的线程调用另一个线程的对象池的 recycle()方法进行回收的时候,并不会直接落到持有对象池的线程的 Stack 数组当中,当然原因也很简单,在并发情况下这样的操作显然是线程不安全的,而加锁也会带来性能的开销。因此,netty 在 Recycler 对象池中通过更巧妙的方式解决这一问题。 +在前面提到,除了数组,Stack 还持有了一系列队列的组成的链表,这些链表中的每一个节点都是一个队列,这些队列又存放着别的线程所回收到当前线程对象池的对象。那么,这些队列就是各个线程针对持有对象池的专属回收队列,说起来很拗口,看下面的代码。 + +```java +private void pushLater(DefaultHandle item, Thread thread) { + // we don't want to have a ref to the queue as the value in our weak map + // so we null it out; to ensure there are no races with restoring it later + // we impose a memory ordering here (no-op on x86) + Map, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get(); + WeakOrderQueue queue = delayedRecycled.get(this); + if (queue == null) { + if (delayedRecycled.size() >= maxDelayedQueues) { + // Add a dummy queue so we know we should drop the object + delayedRecycled.put(this, WeakOrderQueue.DUMMY); + return; + } + // Check if we already reached the maximum number of delayed queues and if we can allocate at all. + if ((queue = WeakOrderQueue.allocate(this, thread)) == null) { + // drop object + return; + } + delayedRecycled.put(this, queue); + } else if (queue == WeakOrderQueue.DUMMY) { + // drop object + return; + } + + queue.add(item); +} + +private WeakOrderQueue(Stack stack, Thread thread) { + head = tail = new Link(); + owner = new WeakReference(thread); + synchronized (stack) { + next = stack.head; + stack.head = this; + } + + // Its important that we not store the Stack itself in the WeakOrderQueue as the Stack also is used in + // the WeakHashMap as key. So just store the enclosed AtomicInteger which should allow to have the + // Stack itself GCed. + availableSharedCapacity = stack.availableSharedCapacity; +} +``` + +pushLater()方法发生在当一个对象被回收的时候,当当前线程不是这个对象所申请的时候的线程时,将会通过该对象的 Stack 直接去通过 DELAYED_RECYCLED 映射到一条队列上,如果没有则创建并建立映射,再把该对象放入到该队列中,以上操作结束后该次回收即宣告结束 + +```java +private WeakOrderQueue(Stack stack, Thread thread) { + head = tail = new Link(); + owner = new WeakReference(thread); + synchronized (stack) { + next = stack.head; + stack.head = this; + } + + // Its important that we not store the Stack itself in the WeakOrderQueue as the Stack also is used in + // the WeakHashMap as key. So just store the enclosed AtomicInteger which should allow to have the + // Stack itself GCed. + availableSharedCapacity = stack.availableSharedCapacity; +} +``` + +如果在操作中,队列是被创建的,会把该队列放置在 Stack 中的链表里的头结点,保证创建该对象的线程在数组空了之后能够通过链表访问到该队列并将该队列中的回收对象重新放到数组中等待被下次重新利用,队列交给 A 线程的链表是唯一的阻塞操作。在这里通过一次阻塞操作,避免后续都不存在资源的竞争问题。 + +## 举一个例子来解释对象池的原理 + +_A 线程申请,A 线程回收的场景。_ + +- 显然,当对象的申请与回收是在一个线程中时,直接把对象放入到 A 线程的对象池中即可,不存在资源的竞争,简单轻松。 + +_A 线程申请,B 线程回收的场景。_ + +- 首先,当 A 线程通过其对象池申请了一个对象后,在 B 线程调用 recycle()方法回收该对象。显然,该对象是应该回收到 A 线程私有的对象池当中的,不然,该对象池也失去了其意义。 +- 那么 B 线程中,并不会直接将该对象放入到 A 线程的对象池中,如果这样操作在多线程场景下存在资源的竞争,只有增加性能的开销,才能保证并发情况下的线程安全,显然不是 netty 想要看到的。 +- 那么 B 线程会专门申请一个针对 A 线程回收的专属队列,在首次创建的时候会将该队列放入到 A 线程对象池的链表首节点(这里是唯一存在的资源竞争场景,需要加锁),并将被回收的对象放入到该专属队列中,宣告回收结束。 +- 在 A 线程的对象池数组耗尽之后,将会尝试把各个别的线程针对 A 线程的专属队列里的对象重新放入到对象池数组中,以便下次继续使用。 diff --git a/images/Dubbo/Dubbo整体架构图.png b/images/Dubbo/Dubbo整体架构图.png index c36ae75..fce59f9 100644 Binary files a/images/Dubbo/Dubbo整体架构图.png and b/images/Dubbo/Dubbo整体架构图.png differ diff --git a/images/Dubbo/dubbo-remoting-api的项目结构.png b/images/Dubbo/dubbo-remoting-api的项目结构.png index f981840..376f575 100644 Binary files a/images/Dubbo/dubbo-remoting-api的项目结构.png and b/images/Dubbo/dubbo-remoting-api的项目结构.png differ diff --git a/images/Dubbo/dubbo-remoting的工程结构.png b/images/Dubbo/dubbo-remoting的工程结构.png index 00ef2b6..4cc9f8d 100644 Binary files a/images/Dubbo/dubbo-remoting的工程结构.png and b/images/Dubbo/dubbo-remoting的工程结构.png differ diff --git a/images/JDK1.8/JUC全量UML地图.png b/images/JDK1.8/JUC全量UML地图.png index 5230960..297f8fc 100644 Binary files a/images/JDK1.8/JUC全量UML地图.png and b/images/JDK1.8/JUC全量UML地图.png differ diff --git a/images/Netty/ChannelHandler组件.png b/images/Netty/ChannelHandler组件.png new file mode 100644 index 0000000..61f8bdf Binary files /dev/null and b/images/Netty/ChannelHandler组件.png differ diff --git a/images/Netty/Channel组件.png b/images/Netty/Channel组件.png new file mode 100644 index 0000000..a4e3e46 Binary files /dev/null and b/images/Netty/Channel组件.png differ diff --git a/images/Netty/Netty的Channel组件.png b/images/Netty/Netty的Channel组件.png new file mode 100644 index 0000000..bf471fe Binary files /dev/null and b/images/Netty/Netty的Channel组件.png differ diff --git a/images/Netty/Selector和SelectionKey和Channel关系图.png b/images/Netty/Selector和SelectionKey和Channel关系图.png new file mode 100644 index 0000000..8f9a67e Binary files /dev/null and b/images/Netty/Selector和SelectionKey和Channel关系图.png differ diff --git a/images/mybatis/1575891988804.png b/images/mybatis/1575891988804.png new file mode 100644 index 0000000..2333dc4 Binary files /dev/null and b/images/mybatis/1575891988804.png differ diff --git a/images/mybatis/1575892046692.png b/images/mybatis/1575892046692.png new file mode 100644 index 0000000..03c5c05 Binary files /dev/null and b/images/mybatis/1575892046692.png differ diff --git a/images/mybatis/1575892167982.png b/images/mybatis/1575892167982.png new file mode 100644 index 0000000..10b0ef1 Binary files /dev/null and b/images/mybatis/1575892167982.png differ diff --git a/images/mybatis/1575892414120.png b/images/mybatis/1575892414120.png new file mode 100644 index 0000000..db2c6f9 Binary files /dev/null and b/images/mybatis/1575892414120.png differ diff --git a/images/mybatis/1575892511471.png b/images/mybatis/1575892511471.png new file mode 100644 index 0000000..5b2e76b Binary files /dev/null and b/images/mybatis/1575892511471.png differ diff --git a/images/mybatis/1575892645405.png b/images/mybatis/1575892645405.png new file mode 100644 index 0000000..e0c1e75 Binary files /dev/null and b/images/mybatis/1575892645405.png differ diff --git a/images/mybatis/1575892687076.png b/images/mybatis/1575892687076.png new file mode 100644 index 0000000..545d383 Binary files /dev/null and b/images/mybatis/1575892687076.png differ diff --git a/images/mybatis/1575892763661.png b/images/mybatis/1575892763661.png new file mode 100644 index 0000000..71d67be Binary files /dev/null and b/images/mybatis/1575892763661.png differ diff --git a/images/mybatis/1575894218362.png b/images/mybatis/1575894218362.png new file mode 100644 index 0000000..4648aa4 Binary files /dev/null and b/images/mybatis/1575894218362.png differ diff --git a/images/mybatis/1576027453035.png b/images/mybatis/1576027453035.png new file mode 100644 index 0000000..a78c57d Binary files /dev/null and b/images/mybatis/1576027453035.png differ diff --git a/images/mybatis/1576027589468.png b/images/mybatis/1576027589468.png new file mode 100644 index 0000000..681ec27 Binary files /dev/null and b/images/mybatis/1576027589468.png differ diff --git a/images/mybatis/1576027736912.png b/images/mybatis/1576027736912.png new file mode 100644 index 0000000..8191dc6 Binary files /dev/null and b/images/mybatis/1576027736912.png differ diff --git a/images/mybatis/1576028186530.png b/images/mybatis/1576028186530.png new file mode 100644 index 0000000..0a5b224 Binary files /dev/null and b/images/mybatis/1576028186530.png differ diff --git a/images/mybatis/1576028554094.png b/images/mybatis/1576028554094.png new file mode 100644 index 0000000..040c915 Binary files /dev/null and b/images/mybatis/1576028554094.png differ diff --git a/images/mybatis/1576028709743.png b/images/mybatis/1576028709743.png new file mode 100644 index 0000000..ad56764 Binary files /dev/null and b/images/mybatis/1576028709743.png differ diff --git a/images/mybatis/1576041628806.png b/images/mybatis/1576041628806.png new file mode 100644 index 0000000..8b4d9d4 Binary files /dev/null and b/images/mybatis/1576041628806.png differ diff --git a/images/mybatis/1576041889664.png b/images/mybatis/1576041889664.png new file mode 100644 index 0000000..6b6a844 Binary files /dev/null and b/images/mybatis/1576041889664.png differ diff --git a/images/mybatis/1576050247445.png b/images/mybatis/1576050247445.png new file mode 100644 index 0000000..2f6b0c5 Binary files /dev/null and b/images/mybatis/1576050247445.png differ diff --git a/images/mybatis/1576050482190.png b/images/mybatis/1576050482190.png new file mode 100644 index 0000000..a4ea8bd Binary files /dev/null and b/images/mybatis/1576050482190.png differ diff --git a/images/mybatis/1576050580581.png b/images/mybatis/1576050580581.png new file mode 100644 index 0000000..a0ad4a8 Binary files /dev/null and b/images/mybatis/1576050580581.png differ diff --git a/images/mybatis/1576050742205.png b/images/mybatis/1576050742205.png new file mode 100644 index 0000000..c9fc3fc Binary files /dev/null and b/images/mybatis/1576050742205.png differ diff --git a/images/mybatis/1576110788523.png b/images/mybatis/1576110788523.png new file mode 100644 index 0000000..737a2c7 Binary files /dev/null and b/images/mybatis/1576110788523.png differ diff --git a/images/mybatis/1576111307305.png b/images/mybatis/1576111307305.png new file mode 100644 index 0000000..31429b7 Binary files /dev/null and b/images/mybatis/1576111307305.png differ diff --git a/images/mybatis/1576112853347.png b/images/mybatis/1576112853347.png new file mode 100644 index 0000000..209c35b Binary files /dev/null and b/images/mybatis/1576112853347.png differ diff --git a/images/mybatis/1576112946984.png b/images/mybatis/1576112946984.png new file mode 100644 index 0000000..d21ed19 Binary files /dev/null and b/images/mybatis/1576112946984.png differ diff --git a/images/mybatis/1576113272209.png b/images/mybatis/1576113272209.png new file mode 100644 index 0000000..c61b1b2 Binary files /dev/null and b/images/mybatis/1576113272209.png differ diff --git a/images/mybatis/1576113287640.png b/images/mybatis/1576113287640.png new file mode 100644 index 0000000..938f647 Binary files /dev/null and b/images/mybatis/1576113287640.png differ diff --git a/images/mybatis/1576113345527.png b/images/mybatis/1576113345527.png new file mode 100644 index 0000000..9ad994b Binary files /dev/null and b/images/mybatis/1576113345527.png differ diff --git a/images/mybatis/1576113398394.png b/images/mybatis/1576113398394.png new file mode 100644 index 0000000..9f5428e Binary files /dev/null and b/images/mybatis/1576113398394.png differ diff --git a/images/mybatis/1576113864895.png b/images/mybatis/1576113864895.png new file mode 100644 index 0000000..d3b970c Binary files /dev/null and b/images/mybatis/1576113864895.png differ diff --git a/images/mybatis/1576114794663.png b/images/mybatis/1576114794663.png new file mode 100644 index 0000000..657d5ac Binary files /dev/null and b/images/mybatis/1576114794663.png differ diff --git a/images/mybatis/1576114876295.png b/images/mybatis/1576114876295.png new file mode 100644 index 0000000..368ee99 Binary files /dev/null and b/images/mybatis/1576114876295.png differ diff --git a/images/mybatis/1576114996613.png b/images/mybatis/1576114996613.png new file mode 100644 index 0000000..3bbe2a2 Binary files /dev/null and b/images/mybatis/1576114996613.png differ diff --git a/images/mybatis/1576117177349.png b/images/mybatis/1576117177349.png new file mode 100644 index 0000000..ea3b638 Binary files /dev/null and b/images/mybatis/1576117177349.png differ diff --git a/images/mybatis/1576117195387.png b/images/mybatis/1576117195387.png new file mode 100644 index 0000000..617257a Binary files /dev/null and b/images/mybatis/1576117195387.png differ diff --git a/images/mybatis/1576117304942.png b/images/mybatis/1576117304942.png new file mode 100644 index 0000000..2e2f334 Binary files /dev/null and b/images/mybatis/1576117304942.png differ diff --git a/images/mybatis/1576311527726.png b/images/mybatis/1576311527726.png new file mode 100644 index 0000000..98d4dee Binary files /dev/null and b/images/mybatis/1576311527726.png differ diff --git a/images/mybatis/1576311999030.png b/images/mybatis/1576311999030.png new file mode 100644 index 0000000..d59cc03 Binary files /dev/null and b/images/mybatis/1576311999030.png differ diff --git a/images/mybatis/1576312524112.png b/images/mybatis/1576312524112.png new file mode 100644 index 0000000..f5d5314 Binary files /dev/null and b/images/mybatis/1576312524112.png differ diff --git a/images/mybatis/1576312612783.png b/images/mybatis/1576312612783.png new file mode 100644 index 0000000..4daa74e Binary files /dev/null and b/images/mybatis/1576312612783.png differ diff --git a/images/mybatis/1576312777050.png b/images/mybatis/1576312777050.png new file mode 100644 index 0000000..4300009 Binary files /dev/null and b/images/mybatis/1576312777050.png differ diff --git a/images/mybatis/1576313598939.png b/images/mybatis/1576313598939.png new file mode 100644 index 0000000..c80758b Binary files /dev/null and b/images/mybatis/1576313598939.png differ diff --git a/images/mybatis/image-20191217103309934.png b/images/mybatis/image-20191217103309934.png new file mode 100644 index 0000000..f944799 Binary files /dev/null and b/images/mybatis/image-20191217103309934.png differ diff --git a/images/mybatis/image-20191217104008186.png b/images/mybatis/image-20191217104008186.png new file mode 100644 index 0000000..76f4cc0 Binary files /dev/null and b/images/mybatis/image-20191217104008186.png differ diff --git a/images/mybatis/image-20191217104450495.png b/images/mybatis/image-20191217104450495.png new file mode 100644 index 0000000..54eb206 Binary files /dev/null and b/images/mybatis/image-20191217104450495.png differ diff --git a/images/mybatis/image-20191217143939247.png b/images/mybatis/image-20191217143939247.png new file mode 100644 index 0000000..2692553 Binary files /dev/null and b/images/mybatis/image-20191217143939247.png differ diff --git a/images/mybatis/image-20191217144453261.png b/images/mybatis/image-20191217144453261.png new file mode 100644 index 0000000..d6eade5 Binary files /dev/null and b/images/mybatis/image-20191217144453261.png differ diff --git a/images/mybatis/image-20191217144739434.png b/images/mybatis/image-20191217144739434.png new file mode 100644 index 0000000..e7f6618 Binary files /dev/null and b/images/mybatis/image-20191217144739434.png differ diff --git a/images/mybatis/image-20191217145051629.png b/images/mybatis/image-20191217145051629.png new file mode 100644 index 0000000..ab9bb77 Binary files /dev/null and b/images/mybatis/image-20191217145051629.png differ diff --git a/images/mybatis/image-20191217145607956.png b/images/mybatis/image-20191217145607956.png new file mode 100644 index 0000000..75f421d Binary files /dev/null and b/images/mybatis/image-20191217145607956.png differ diff --git a/images/mybatis/image-20191217183853550.png b/images/mybatis/image-20191217183853550.png new file mode 100644 index 0000000..806d277 Binary files /dev/null and b/images/mybatis/image-20191217183853550.png differ diff --git a/images/mybatis/image-20191218082628696.png b/images/mybatis/image-20191218082628696.png new file mode 100644 index 0000000..e335480 Binary files /dev/null and b/images/mybatis/image-20191218082628696.png differ diff --git a/images/mybatis/image-20191219083223084.png b/images/mybatis/image-20191219083223084.png new file mode 100644 index 0000000..12080c8 Binary files /dev/null and b/images/mybatis/image-20191219083223084.png differ diff --git a/images/mybatis/image-20191219151245509.png b/images/mybatis/image-20191219151245509.png new file mode 100644 index 0000000..ebdcc67 Binary files /dev/null and b/images/mybatis/image-20191219151245509.png differ diff --git a/images/mybatis/image-20191219162402291.png b/images/mybatis/image-20191219162402291.png new file mode 100644 index 0000000..421a246 Binary files /dev/null and b/images/mybatis/image-20191219162402291.png differ diff --git a/images/spring/image-20200728094658684.png b/images/spring/image-20200728094658684.png index 048a53e..f29921d 100644 Binary files a/images/spring/image-20200728094658684.png and b/images/spring/image-20200728094658684.png differ diff --git a/images/spring/image-20200728105926218.png b/images/spring/image-20200728105926218.png index facdbc1..f3a177e 100644 Binary files a/images/spring/image-20200728105926218.png and b/images/spring/image-20200728105926218.png differ diff --git a/images/spring/image-20200728133037075.png b/images/spring/image-20200728133037075.png index bbda49d..8c7dc71 100644 Binary files a/images/spring/image-20200728133037075.png and b/images/spring/image-20200728133037075.png differ diff --git a/images/spring/image-20200729090322058.png b/images/spring/image-20200729090322058.png index d394779..9498014 100644 Binary files a/images/spring/image-20200729090322058.png and b/images/spring/image-20200729090322058.png differ diff --git a/images/spring/image-20200729144622440.png b/images/spring/image-20200729144622440.png index ae0bca9..fa1ec1b 100644 Binary files a/images/spring/image-20200729144622440.png and b/images/spring/image-20200729144622440.png differ diff --git a/images/spring/image-20200729145518089.png b/images/spring/image-20200729145518089.png index 338bd89..39fca74 100644 Binary files a/images/spring/image-20200729145518089.png and b/images/spring/image-20200729145518089.png differ diff --git a/images/spring/image-20200729145637688.png b/images/spring/image-20200729145637688.png index 1ff2d7d..6366a11 100644 Binary files a/images/spring/image-20200729145637688.png and b/images/spring/image-20200729145637688.png differ diff --git a/images/spring/image-20200729145835608.png b/images/spring/image-20200729145835608.png index 944b890..2c09659 100644 Binary files a/images/spring/image-20200729145835608.png and b/images/spring/image-20200729145835608.png differ diff --git a/images/spring/image-20200729160650401.png b/images/spring/image-20200729160650401.png index 90bab8f..850dfca 100644 Binary files a/images/spring/image-20200729160650401.png and b/images/spring/image-20200729160650401.png differ diff --git a/images/spring/image-20200729161647214.png b/images/spring/image-20200729161647214.png index c82a97c..02c0653 100644 Binary files a/images/spring/image-20200729161647214.png and b/images/spring/image-20200729161647214.png differ diff --git a/images/spring/image-20200729162023837.png b/images/spring/image-20200729162023837.png index 8a830d0..c904c27 100644 Binary files a/images/spring/image-20200729162023837.png and b/images/spring/image-20200729162023837.png differ diff --git a/images/spring/image-20200729163303000.png b/images/spring/image-20200729163303000.png index fdbfb98..f3d5893 100644 Binary files a/images/spring/image-20200729163303000.png and b/images/spring/image-20200729163303000.png differ