netty和NIO

pull/2/head
quanhengf 5 years ago
parent 1ff7bb0851
commit 8c826e9978

@ -0,0 +1,394 @@
网上关于各种IO的博文已经多到飞起如果你是大神可以跳过我这个菜鸟的拙文本博文没有什么特别NB的东西只是集百家之长并且以自己感到简单舒适的方式输出自己的理解及学习过程中的经验。
## IO及基本概念
#### 1、流的概念和作用
**流**:代表任何有能力产出数据的数据源对象或者是有能力接受数据的接收端对象。<Thinking in Java>
**流的本质**:数据传输,根据数据传输特性将流抽象为各种类,方便更直观的进行数据操作。
**流的作用**:为数据源和目的地建立一个输送通道。
Java中将输入输出抽象称为流就好像水管将两个容器连接起来。流是一组有顺序的有起点和终点的字节集合是对数据传输的总称或抽象。即数据在两设备间的传输称为流。
每个流只能是输入流或输出流的一种,不能同时具备两个功能,输入流只能进行读操作,对输出流只能进行写操作。在一个数据传输通道中,如果既要写入数据,又要读取数据,则要分别提供两个流。
#### 2、IO模型
五种IO模型包括阻塞IO、非阻塞IO、信号驱动IO、IO多路转接、异步IO。其中前四个被称为同步IO。在网络环境下可以将IO分为两步
1.等;
2.数据搬迁。
所以如果要想提高IO效率需要降低等待的时间。
##### 2.1 阻塞IOBlocking I/O
在内核将数据准备好之前系统调用会一直等待所有的套接字Socket默认的是阻塞方式。
![在这里插入图片描述](https://img-blog.csdnimg.cn/20191121192630209.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM4MDM4Mzk2,size_16,color_FFFFFF,t_70)
Java中的socket.read()会调用native read()而Java中的native方法会调用操作系统底层的dll而dll是C/C++编写的图中的recvfrom其实是C语言socket编程中的一个方法。所以其实我们在Java中调用socket.read()最后也会调用到图中的recvfrom方法。
应用程序(也就是我们的代码)想要读取数据就会调用recvfrom而recvfrom会通知OS来执行OS就会判断数据报是否准备好(比如判断是否收到了一个完整的UDP报文如果收到UDP报文不完整那么就继续等待)。当数据包准备好了之后OS就会将数据从内核空间拷贝到用户空间(因为我们的用户程序只能获取用户空间的内存,无法直接获取内核空间的内存)。拷贝完成之后socket.read()就会解除阻塞并得到read的结果。
BIO中的阻塞就是阻塞在2个地方
1. OS等待数据报(通过网络发送过来)准备好。
2. 将数据从内核空间拷贝到用户空间。
在这2个时候我们的BIO程序就是占着茅坑不拉屎啥事情都不干。
##### 2.2 非阻塞IONoblocking I/O
![在这里插入图片描述](https://img-blog.csdnimg.cn/20191121193031873.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM4MDM4Mzk2,size_16,color_FFFFFF,t_70)
每次应用进程询问内核是否有数据报准备好当有数据报准备好时就进行拷贝数据报的操作从内核拷贝到用户空间和拷贝完成返回的这段时间应用进程是阻塞的。但在没有数据报准备好时并不会阻塞程序内核直接返回未准备就绪的信号等待应用进程的下一个轮寻。但是轮寻对于CPU来说是较大的浪费一般只有在特定的场景下才使用。
Java的NIO就是采用这种方式当我们new了一个socket后我们可以设置它是非阻塞的。比如
```java
// 初始化一个 serverSocketChannel
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8000));
// 设置serverSocketChannel为非阻塞模式
// 即 accept()会立即得到返回
serverSocketChannel.configureBlocking(false);
```
上面的代码是设置ServerSocketChannel为非阻塞SocketChannel也可以设置。
从图中可以看到当设置为非阻塞后我们的socket.read()方法就会立即得到一个返回结果(成功 or 失败)我们可以根据返回结果执行不同的逻辑比如在失败时我们可以做一些其他的事情。但事实上这种方式也是低效的因为我们不得不使用轮询方法区一直问OS“我的数据好了没啊”。
**BIO 不会在recvfrom询问数据是否准备好时阻塞但还是会在将数据从内核空间拷贝到用户空间时阻塞。一定要注意这个地方Non-Blocking还是会阻塞的。**
##### 2.3 IO多路复用I/O Multiplexing
![在这里插入图片描述](https://img-blog.csdnimg.cn/20191121194503981.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM4MDM4Mzk2,size_16,color_FFFFFF,t_70)
传统情况下client与server通信需要一个3个socket(客户端的socket服务端的serversocket服务端中用来和客户端通信的socket)而在IO多路复用中客户端与服务端通信需要的不是socket而是3个channel通过channel可以完成与socket同样的操作channel的底层还是使用的socket进行通信但是多个channel只对应一个socket(可能不只是一个但是socket的数量一定少于channel数量)这样仅仅通过少量的socket就可以完成更多的连接提高了client容量。
其中,不同的操作系统,对此有不同的实现:
Windowsselector
Linuxepoll
Mackqueue
其中epollkqueue比selector更为高效这是因为他们监听方式的不同。selector的监听是通过轮询FD_SETSIZE来问每一个socket“你改变了吗假若监听到时间那么selector就会调用相应的时间处理器进行处理。但是epoll与kqueue不同他们把socket与事件绑定在一起当监听到socket变化时立即可以调用相应的处理。
**selectorepollkqueue都属于Reactor IO设计。**
##### 2.4 信号驱动Signal driven IO
![在这里插入图片描述](https://img-blog.csdnimg.cn/20191121195059827.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM4MDM4Mzk2,size_16,color_FFFFFF,t_70)
信号驱动IO模型应用进程告诉内核当数据报准备好的时候给我发送一个信号对SIGIO信号进行捕捉并且调用我的信号处理函数来获取数据报。
##### 2.5 异步IOAsynchronous I/O
![在这里插入图片描述](https://img-blog.csdnimg.cn/20191121195315354.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM4MDM4Mzk2,size_16,color_FFFFFF,t_70)
Asynchronous IO调用中是真正的无阻塞其他IO model中多少会有点阻塞。程序发起read操作之后立刻就可以开始去做其它的事。而在内核角度当它受到一个asynchronous read之后首先它会立刻返回所以不会对用户进程产生任何block。然后kernel会等待数据准备完成然后将数据拷贝到用户内存当这一切都完成之后kernel会给用户进程发送一个signal告诉它read操作完成了。
可以看出阻塞程度阻塞IO>非阻塞IO>多路转接IO>信号驱动IO>异步IO效率是由低到高的。
##### 2.6 Blocking IO 与Non-Blocking IO 区别?
阻塞或非阻塞只涉及程序和OSBlocking IO 会一直block程序直到OS返回而Non-Block IO在OS内核准备数据包的情况下会立即得到返回。
##### 2.7 Asynchronous IO 与 Synchronous IO
只要有block就是同步IO完全没有block则是异步IO。所以我们之前所说的Blocking IO、Non-Blocking IO、IO Multiplex均为Synchronous IO只有Asynchronous IO为异步IO。
##### 2.8 Non-Blocking IO 不是会立即返回没有阻塞吗?
**Non-Blocking IO在数据包准备时是非阻塞的但是在将数据从内核空间拷贝到用户空间还是会阻塞**。而Asynchronous IO则不一样当进程发起IO 操作之后就直接返回再也不理睬了由内核完成读写完成读写操作后kernel发送一个信号告诉进程说IO完成。在这整个过程中进程完全没有被block。
#### 3、IO模式Reactor与Proactor
##### 3.1 Reactor
Reactor(反应器)的设计是一种事件驱动思想比如Java NIO中socket过来时有四种事件
connectable
acceptable
readable
writable
我们为每一种事件都编写一个处理器然后设置每个socket要监听哪种情况随后就可以调用对应的处理器。
![在这里插入图片描述](https://img-blog.csdnimg.cn/20191121200143647.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM4MDM4Mzk2,size_16,color_FFFFFF,t_70)
图中的input就可以当作socket中间的Service Hanlder&event dispatch的作用就是监听每一个socket(需要实现把socket注册进来并指定要监听哪种情况)然后给socket派发不同的事件。
##### 3.2 Proactor
![在这里插入图片描述](https://img-blog.csdnimg.cn/2019112120035031.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM4MDM4Mzk2,size_16,color_FFFFFF,t_70)
Proactor与Reactor较为类似以读取数据为例
**Reactor模式**
1. 应用程序注册读就绪事件和相关联的事件处理器
2. 事件分离器等待事件的发生
3. 当发生 读就绪事件 的时候,事件分离器调用第一步注册的事件处理器
4. 事件处理器首先执行实际的读取操作,然后根据读取到的内容进行进一步的处理
**Proactor模式**
1. 应用程序调用一个异步读取操作然后注册相应的事件处理器此时事件处理器不关注读取就绪事件而是关注读取完成事件这是区别于Reactor的关键。
2. 事件分离器等待读取操作完成事件
3. 在事件分离器等待读取操作完成的时候操作系统调用内核线程完成读取操作异步IO都是操作系统负责将数据读写到应用传递进来的缓冲区供应用程序操作操作系统扮演了重要角色并将读取的内容放入用户传递过来的缓存区中。这也是区别于Reactor的一点Proactor中应用程序需要传递缓存区。
4. 事件分离器捕获到读取完成事件后,激活应用程序注册的事件处理器,事件处理器直接从缓存区读取数据,而不需要进行实际的读取操作。
**区别**
Reactor中监听是否有可读或可写事件然后读/写操作是由程序进行的。而Proactor中直接监听读/写操作是否完成,也就是说读/写操作是否OS来完成并将读写数据放入一个缓冲区提供给程序。
#### 4、同步与异步阻塞与非阻塞
同步/异步(描述网络通信模式,适用于请求-响应模型)
同步:发送方发送请求后,需要等待接收响应,否则将一直等待
异步发送方发送请求后不需要等待响应可以继续发送下一个请求或者主动挂起线程并释放CPU
阻塞/非阻塞(描述进程的函数调用方式)
阻塞IO 调用会一直阻塞,直至结果返回才能继续执行
非阻塞IO 调用会立即返回,不需要等待结果,并可以执行下一个 IO 调用
总结,同步异步和阻塞非阻塞是两个不同的概念,用简单的数据库查询来举一个例子:
如果发送一个请求,需要等待数据库响应,否则将一直等待,这就是同步
如果发送一个请求,不需要数据库响应,就可以继续发送下一个请求(NIO模式、回调通知模式)或者主动将任务插入队列中主动挂起线程并释放CPU(异步队列模式),这就是异步
一般来说,同步是最简单的编程方式,而异步编程虽然需要一定技术,但是却能提升系统性能。对于阻塞与非阻塞,阻塞的实时响应性更好,但在高并发情况下阻塞线程数会急剧增加,导致大量的上下文切换会引起挂起/唤醒线程的性能损耗,而非阻塞的性能吞吐量更高,但由于其是顺序执行每一个事件,一旦处理某一个事件过久,会影响后续事件的处理,因此实时响应性较差。
## Java中的BIO
#### 传统Socket阻塞案例代码
```java
public class TraditionalSocketDemo {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(7777);
System.out.println("服务端启动...");
while (true) {
// 获取socket套接字
// accept()阻塞点
Socket socket = serverSocket.accept();
System.out.println("有新客户端连接上来了...");
// 获取客户端输入流
java.io.InputStream is = socket.getInputStream();
byte[] b = new byte[1024];
while (true) {
// 循环读取数据
// read() 阻塞点
int data = is.read(b);
if (data != -1) {
String info = new String(b, 0, data, "GBK");
System.out.println(info);
} else {
break;
}
}
}
}
}
```
在debugger代码的过程中会发现服务端启动只有当客户端就绪后才进行下一步操作如果客户端没有就绪线程阻塞客户端发送请求程序才继续往下执行如果客户端没有发出请求线程阻塞**上面的代码有两个阻塞点**
1. **等待客户端就绪**
2. **等待OS将数据从内核拷贝到用户空间应用程序可以操作的内存空间**
#### 传统bio多线程版本
```java
public class TraditionalSocketDemo2 {
public static void main(String[] args) throws IOException {
ServerSocket serverSocket = new ServerSocket(7777);
System.out.println("服务端启动...");
while (true) {
// 获取socket套接字
// accept()阻塞点
final Socket socket = serverSocket.accept();
System.out.println("有新客户端连接上来了...");
new Thread(new Runnable() {
@Override
public void run() {
try {
// 获取客户端输入流
InputStream is = socket.getInputStream();
byte[] b = new byte[1024];
while (true) {
// 循环读取数据
// read() 阻塞点
int data = is.read(b);
if (data != -1) {
String info = new String(b, 0, data, "GBK");
System.out.println(info);
} else {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
}
```
能够解决传统的BIO问题但是会出现多少个客户端多少个线程请求和线程的个数1:1关系操作系统资源耗尽服务端挂了。使用线程池虽然能控制服务线程的数量但应对高并发量的访问时依然会导致大量线程处于阻塞状态严重影响服务效率。
## Java中的NIO
NIO是一种基于通道和缓冲区的I/O方式它可以使用Native函数库直接分配堆外内存区别于JVM的运行时数据区然后通过一个存储在java堆里面的DirectByteBuffer对象作为这块内存的直接引用进行操作。这样能在一些场景显著提高性能因为避免了在Java堆和Native堆中来回复制数据。
#### 1、Java NIO组件
NIO主要有三大核心部分Channel(通道)Buffer(缓冲区), Selector选择器。传统IO是基于字节流和字符流进行操作基于流而NIO基于Channel和Buffer(缓冲区)进行操作数据总是从通道读取到缓冲区中或者从缓冲区写入到通道中。Selector(选择区)用于监听多个通道的事件(比如:连接打开,数据到达)。因此,单个线程可以监听多个数据通道。
##### 1.1 Buffer
Buffer缓冲区是一个用于存储特定基本类型数据的容器。除了boolean外其余每种基本类型都有一个对应的buffer类。Buffer类的子类有ByteBuffer, CharBuffer, DoubleBuffer, FloatBuffer, IntBuffer, LongBuffer, ShortBuffer 。
##### 1.2 Channel
Channel通道表示到实体如硬件设备、文件、**网络套接字**或可以执行一个或多个不同 I/O 操作如读取或写入的程序组件的开放的连接。Channel接口的常用实现类有FileChannel对应文件IO、DatagramChannel对应UDP、SocketChannel和ServerSocketChannel对应TCP的客户端和服务器端。**Channel和IO中的Stream(流)是差不多一个等级的。只不过Stream是单向的譬如InputStream, OutputStream.而Channel是双向的既可以用来进行读操作又可以用来进行写操作**。
##### 1.3 Selector
Selector选择器用于监听多个通道的事件比如连接打开数据到达。因此单个的线程可以监听多个数据通道。即用选择器借助单一线程就可对数量庞大的活动I/O通道实施监控和维护。
写就绪相对有一点特殊一般来说你不应该注册写事件。写操作的就绪条件为底层缓冲区有空闲空间而写缓冲区绝大部分时间都是有空闲空间的所以当你注册写事件后写操作一直是就绪的选择处理线程全占用整个CPU资源。所以只有当你确实有数据要写时再注册写操作并在写完以后马上取消注册。
基于阻塞式I/O的多线程模型中Server为每个Client连接创建一个处理线程每个处理线程阻塞式等待可能达到的数据一旦数据到达则立即处理请求、返回处理结果并再次进入等待状态。由于每个Client连接有一个单独的处理线程为其服务因此可保证良好的响应时间。但当系统负载增大并发请求增多Server端需要的线程数会增加对于操作系统来说线程之间上下文切换的开销很大而且每个线程都要占用系统的一些资源如内存。因此使用的线程越少越好。
但是现代的操作系统和CPU在多任务方面表现的越来越好所以多线程的开销随着时间的推移变得越来越小了。实际上如果一个CPU有多个内核不使用多任务可能是在浪费CPU能力。
传统的IO处理方式一个线程处理一个网络连接
![在这里插入图片描述](https://img-blog.csdnimg.cn/2019112120352588.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM4MDM4Mzk2,size_16,color_FFFFFF,t_70)
NIO处理方式一个线程可以管理过个网络连接
![在这里插入图片描述](https://img-blog.csdnimg.cn/20191121203602279.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM4MDM4Mzk2,size_16,color_FFFFFF,t_70)
#### 2、NIO服务器端如何实现非阻塞
服务器上所有Channel需要向Selector注册而Selector则负责监视这些Socket的IO状态(观察者)当其中任意一个或者多个Channel具有可用的IO操作时该Selector的select()方法将会返回大于0的整数该整数值就表示该Selector上有多少个Channel具有可用的IO操作并提供了selectedKeys方法来返回这些Channel对应的SelectionKey集合(一个SelectionKey对应一个就绪的通道)。正是通过Selector使得服务器端只需要不断地调用Selector实例的select()方法即可知道当前所有Channel是否有需要处理的IO操作。注java NIO就是多路复用IOjdk7之后底层是epoll模型。
#### 3、Java NIO的简单实现
##### 3.1 服务端
```java
public class NioServer {
private int port;
private Selector selector;
private ExecutorService service = Executors.newFixedThreadPool(5);
public static void main(String[] args){
new NioServer(8080).start();
}
public NioServer(int port) {
this.port = port;
}
public void init() {
ServerSocketChannel ssc = null;
try {
ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(port));
selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("NioServer started ......");
} catch (IOException e) {
e.printStackTrace();
}finally {
}
}
public void accept(SelectionKey key) {
try {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ);
System.out.println("accept a client : " + sc.socket().getInetAddress().getHostName());
} catch (IOException e) {
e.printStackTrace();
}
}
public void start() {
this.init();
while (true) {
try {
int events = selector.select();
if (events > 0) {
Iterator<SelectionKey> selectionKeys = selector.selectedKeys().iterator();
while (selectionKeys.hasNext()) {
SelectionKey key = selectionKeys.next();
selectionKeys.remove();
if (key.isAcceptable()) {
accept(key);
} else {
service.submit(new NioServerHandler(key));
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static class NioServerHandler implements Runnable{
private SelectionKey selectionKey;
public NioServerHandler(SelectionKey selectionKey) {
this.selectionKey = selectionKey;
}
@Override
public void run() {
try {
if (selectionKey.isReadable()) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
socketChannel.read(buffer);
buffer.flip();
System.out.println("收到客户端"+socketChannel.socket().getInetAddress().getHostName()+"的数据:"+new String(buffer.array()));
//将数据添加到key中
ByteBuffer outBuffer = ByteBuffer.wrap(buffer.array());
socketChannel.write(outBuffer);// 将消息回送给客户端
selectionKey.cancel();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
```
##### 3.2 客户端
```java
public class NioClient {
private static final String host = "127.0.0.1";
private static final int port = 8080;
private Selector selector;
public static void main(String[] args){
for (int i=0;i<3;i++) {
new Thread(new Runnable() {
@Override
public void run() {
NioClient client = new NioClient();
client.connect(host, port);
client.listen();
}
}).start();
}
}
public void connect(String host, int port) {
try {
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
this.selector = Selector.open();
sc.register(selector, SelectionKey.OP_CONNECT);
sc.connect(new InetSocketAddress(host, port));
} catch (IOException e) {
e.printStackTrace();
}
}
public void listen() {
while (true) {
try {
int events = selector.select();
if (events > 0) {
Iterator<SelectionKey> selectionKeys = selector.selectedKeys().iterator();
while (selectionKeys.hasNext()) {
SelectionKey selectionKey = selectionKeys.next();
selectionKeys.remove();
//连接事件
if (selectionKey.isConnectable()) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
if (socketChannel.isConnectionPending()) {
socketChannel.finishConnect();
}
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
socketChannel.write(ByteBuffer.wrap(("Hello this is " + Thread.currentThread().getName()).getBytes()));
} else if (selectionKey.isReadable()) {
SocketChannel sc = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
sc.read(buffer);
buffer.flip();
System.out.println("收到服务端的数据:"+new String(buffer.array()));
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
```
## Java中的AIO
JDK1.7升级了NIO类库升级后的NIO类库被称为NIO 2.0。Java正式提供了异步文件I/O操作同时提供了与UNIX网络编程事件驱动I/O对应的AIO。NIO 2.0引入了新的异步通道的概念,并提供了异步文件通道和异步套接字通道的实现。
异步通道获取获取操作结果方式:
1. 使用java.util.concurrent.Future类表示异步操作的结果
2. 在执行异步操作的时候传入一个java.nio.channels操作完成后胡回调CompletionHandler接口的实现类。
NIO 2.0的异步套接字通道是真正的异步非阻塞I/O对应于UNIX网络编程中的事件驱动I/OAIO
Loading…
Cancel
Save