diff --git a/src/main/java/com/msb/model/ClientFrame.java b/src/main/java/com/msb/model/ClientFrame.java index 2d8e08b..f17880d 100644 --- a/src/main/java/com/msb/model/ClientFrame.java +++ b/src/main/java/com/msb/model/ClientFrame.java @@ -22,20 +22,24 @@ import java.awt.event.WindowEvent; */ public class ClientFrame extends Frame { - public ClientFrame() { + public static final ClientFrame INSTANCE = new ClientFrame(); + + private TextArea ta; - Client client = new Client("localhost", 8888); - client.clientStart(); + private Client client; - TextArea ta = new TextArea(); + public ClientFrame() { + + ta = new TextArea(); TextField tf = new TextField(); this.setSize(600, 400); this.add(ta, BorderLayout.CENTER); this.add(tf, BorderLayout.SOUTH); - this.setVisible(true); +// this.setVisible(true); this.addWindowListener(new WindowAdapter() { @Override public void windowClosing(WindowEvent e) { + client.close(); System.exit(1); } }); @@ -45,15 +49,27 @@ public class ClientFrame extends Frame { tf.addActionListener(new ActionListener() { @Override public void actionPerformed(ActionEvent e) { - ta.setText(ta.getText() + tf.getText() + "\r\n"); + client.send(tf.getText()); +// ta.setText(ta.getText() + tf.getText() + "\r\n"); tf.setText(""); - client.getChannel().writeAndFlush(ta.getText().getBytes()); } }); + + } + + public void updateText(String msg) { + ta.setText(ta.getText() + msg + System.getProperty("line.separator")); + } + + public void connectToServer() { + client = new Client(); + client.connect(); } public static void main(String[] args) { - new ClientFrame(); + ClientFrame clientFrame = INSTANCE; + clientFrame.setVisible(true); + clientFrame.connectToServer(); } } diff --git a/src/main/java/com/msb/socket/Client.java b/src/main/java/com/msb/socket/Client.java index e6ac106..495b946 100644 --- a/src/main/java/com/msb/socket/Client.java +++ b/src/main/java/com/msb/socket/Client.java @@ -5,12 +5,15 @@ package com.msb.socket;/** * @Version: 1.0 */ +import com.msb.model.ClientFrame; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.ReferenceCountUtil; /** *@ClassName Client @@ -21,31 +24,34 @@ import io.netty.channel.socket.nio.NioSocketChannel; */ public class Client { + private Channel channel; + public void connect() { - //线程池 - EventLoopGroup group = new NioEventLoopGroup(1); - Bootstrap bootstrap = new Bootstrap(); + + EventLoopGroup group = new NioEventLoopGroup(1); //线程池 + Bootstrap bootstrap = new Bootstrap(); //辅助启动的类 try { - ChannelFuture future = bootstrap.group(group) - .channel(NioSocketChannel.class) - .handler(new ClientChannelInitializer()) + ChannelFuture future = bootstrap.group(group) //指定线程池 + .channel(NioSocketChannel.class) //指定channel类型 + .handler(new ClientChannelInitializer()) //channel的处理器 .connect("localhost", 8888); - + //监听connect成功还是失败 future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if(channelFuture.isSuccess()) { System.out.println("connected!"); + channel = channelFuture.channel(); //连接成功以后初始化channel } else { System.out.println("not connect!"); } } }); - future.sync(); + future.sync(); //等待 - future.channel().closeFuture().sync(); + future.channel().closeFuture().sync(); //channelFuture的结束 } catch (InterruptedException e) { e.printStackTrace(); @@ -54,6 +60,15 @@ public class Client { } } + public void send(String msg) { + ByteBuf buf = Unpooled.copiedBuffer(msg.getBytes()); + channel.writeAndFlush(buf); + } + + public void close() { + this.send("bye"); + } + public static void main(String[] args) { Client client = new Client(); client.connect(); @@ -73,9 +88,24 @@ class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // super.channelRead(ctx, msg); - ByteBuf buf = (ByteBuf) msg; - byte[] bytes = new byte[buf.readableBytes()]; - buf.getBytes(buf.readerIndex(), bytes); - System.out.println(new String(bytes)); + ByteBuf buf = null; + try { + buf = (ByteBuf) msg; + byte[] bytes = new byte[buf.readableBytes()]; + buf.getBytes(buf.readerIndex(), bytes); +// System.out.println(new String(bytes)); + ClientFrame.INSTANCE.updateText(new String(bytes)); + } finally { + if(buf != null) { + ReferenceCountUtil.release(buf); + } + } + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { +// super.channelActive(ctx); + ByteBuf buf = Unpooled.copiedBuffer("hello".getBytes()); + ctx.writeAndFlush(buf); } } diff --git a/src/main/java/com/msb/socket/Server.java b/src/main/java/com/msb/socket/Server.java index 65538bd..101a969 100644 --- a/src/main/java/com/msb/socket/Server.java +++ b/src/main/java/com/msb/socket/Server.java @@ -6,15 +6,15 @@ package com.msb.socket;/** */ import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.EventLoopGroup; +import io.netty.buffer.ByteBuf; +import io.netty.channel.*; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.util.concurrent.EventExecutor; +import io.netty.util.concurrent.EventExecutorGroup; import io.netty.util.concurrent.GlobalEventExecutor; /** @@ -28,57 +28,76 @@ public class Server { private int port; + public static final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + public Server(int port) { this.port = port; } public void serverStart() { - EventLoopGroup bootGroup = new NioEventLoopGroup(1); - EventLoopGroup workGroup = new NioEventLoopGroup(3); + EventLoopGroup bossGroup = new NioEventLoopGroup(1); + EventLoopGroup workGroup = new NioEventLoopGroup(2); ServerBootstrap bootstrap = new ServerBootstrap(); - bootstrap.group(bootGroup, workGroup) - .channel(NioServerSocketChannel.class) - .childHandler(new ChannelInitializer() { - @Override - protected void initChannel(SocketChannel socketChannel) throws Exception { - socketChannel.pipeline().addLast(new Handler()); - } - }); - try { - bootstrap.bind(this.port).sync(); + ChannelFuture future = bootstrap.group(bossGroup, workGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel socketChannel) throws Exception { + socketChannel.pipeline().addLast(new ServerChildHandler()); + } + }).bind(this.port).sync(); //这个sync是什么阻塞。这个sync是看bind有没有连接成功,成功了才继续往下执行 + + future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { - bootGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } public static void main(String[] args) { - new Server(8888); + new Server(8888).serverStart(); } } -class Handler extends ChannelInboundHandlerAdapter { +class ServerChildHandler extends ChannelInboundHandlerAdapter { private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { -// super.channelRead(ctx, msg); - channelGroup.forEach(channel -> { + /*channelGroup.forEach(channel -> { if( ! channel.equals(ctx.channel())) { channel.writeAndFlush(msg); } - }); + });*/ + + ByteBuf buf = (ByteBuf) msg; + byte[] bytes = new byte[buf.readableBytes()]; + buf.getBytes(buf.readerIndex(), bytes); + String message = new String(bytes); + if("bye".equals(message)) { + Server.clients.remove(ctx.channel()); + ctx.close(); + } else { + Server.clients.writeAndFlush(msg); + } + + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + Server.clients.add(ctx.channel()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { -// super.exceptionCaught(ctx, cause); cause.printStackTrace(); + //移除出现异常的客户端channel,并关闭连接 + Server.clients.remove(ctx.channel()); ctx.close(); } }