diff --git a/src/main/java/com/msb/io/nio/PoolServer.java b/src/main/java/com/msb/io/nio/PoolServer.java new file mode 100644 index 0000000..ccddf74 --- /dev/null +++ b/src/main/java/com/msb/io/nio/PoolServer.java @@ -0,0 +1,112 @@ +package com.msb.io.nio;/** + * @Author bingor + * @Date 2022/10/26 17:25 + * @Description: com.msb.io.nio + * @Version: 1.0 + */ + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + *@ClassName PoolServer + *@Description TODO + *@Author bingor + *@Date 2022/10/26 17:25 + *@Version 3.0 + */ +public class PoolServer { + + private Selector selector; + private ExecutorService pool = Executors.newFixedThreadPool(10); + + public void initServer(int port) throws IOException { + ServerSocketChannel ssc = ServerSocketChannel.open(); + ssc.bind(new InetSocketAddress(port)); + ssc.configureBlocking(false); + this.selector = Selector.open(); + ssc.register(this.selector, SelectionKey.OP_ACCEPT); + } + + public void listen() throws IOException { + //轮询访问selector + while (true) { + selector.select(); + Set selectionKeys = selector.selectedKeys(); + Iterator iterator = selectionKeys.iterator(); + while (iterator.hasNext()) { + SelectionKey key = iterator.next(); + iterator.remove(); + + if(key.isAcceptable()) { + ServerSocketChannel scc = (ServerSocketChannel) key.channel(); + SocketChannel sc = scc.accept(); + sc.configureBlocking(false); + sc.register(key.selector(), SelectionKey.OP_READ); + } else if(key.isReadable()) { + key.interestOps(key.interestOps()&(~SelectionKey.OP_READ)); + pool.execute(new ThreadHandlerChannel(key)); + } + } + } + + + } + + public static void main(String[] args) throws IOException { + PoolServer poolServer = new PoolServer(); + poolServer.initServer(8888); + poolServer.listen(); + } + +} + +class ThreadHandlerChannel extends Thread { + private SelectionKey key; + public ThreadHandlerChannel(SelectionKey key) { + this.key = key; + } + + @Override + public void run() { + SocketChannel sc = (SocketChannel) key.channel(); + ByteBuffer buffer = ByteBuffer.allocate(1024); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + try { + int size = 0; + while ((size = sc.read(buffer)) > 0) { + buffer.flip(); + baos.write(buffer.array(), 0, size); + buffer.clear(); + } + baos.close(); + + byte[] content = baos.toByteArray(); + ByteBuffer writeBuf = ByteBuffer.allocate(content.length); + writeBuf.put(content); + writeBuf.flip(); + sc.write(writeBuf); + if(size == -1) { + sc.close(); + } else { + this.key.interestOps(this.key.interestOps()|SelectionKey.OP_READ); + this.key.selector().wakeup(); + } + } catch (IOException ex) { + ex.printStackTrace(); + } + + + + } +} diff --git a/src/main/java/com/msb/io/nio/Server.java b/src/main/java/com/msb/io/nio/Server.java new file mode 100644 index 0000000..30be4ea --- /dev/null +++ b/src/main/java/com/msb/io/nio/Server.java @@ -0,0 +1,67 @@ +package com.msb.io.nio;/** + * @Author bingor + * @Date 2022/10/26 14:04 + * @Description: com.msb.io.nio + * @Version: 1.0 + */ + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.*; +import java.util.Iterator; +import java.util.Set; + +/** + *@ClassName Server + *@Description TODO + *@Author bingor + *@Date 2022/10/26 14:04 + *@Version 3.0 + */ +public class Server { + + public static void main(String[] args) throws IOException { + ServerSocketChannel ssc = ServerSocketChannel.open(); + ssc.bind(new InetSocketAddress(8888)); + ssc.configureBlocking(false); + + System.out.println("server started, listening on :" + ssc.getLocalAddress()); + + Selector selector = Selector.open(); + ssc.register(selector, SelectionKey.OP_ACCEPT); + + //轮询 + while(true) { + selector.select(); + Set set = selector.selectedKeys(); + Iterator iterator = set.iterator(); + while (iterator.hasNext()) { + SelectionKey key = iterator.next(); + iterator.remove(); + handle(key); + } + } + + } + + private static void handle(SelectionKey key) throws IOException { + if(key.isAcceptable()) { + ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); + ssc.configureBlocking(false); + ssc.register(key.selector(), SelectionKey.OP_READ); + } else if(key.isReadable()) { + SocketChannel sc = (SocketChannel) key.channel(); + ByteBuffer buffer = ByteBuffer.allocate(512); + buffer.clear(); + int len = sc.read(buffer); + if(len != -1) { + System.out.println(new String(buffer.array(), 0, len)); + } + + ByteBuffer bufferToWrite = ByteBuffer.wrap("hello, client".getBytes()); + sc.write(bufferToWrite); + } + } + +}