坦克Netty版-实现多人聊天操作

master
bingor 3 years ago
parent d5408506d8
commit dfc7a937b5

@ -22,20 +22,24 @@ import java.awt.event.WindowEvent;
*/ */
public class ClientFrame extends Frame { public class ClientFrame extends Frame {
public ClientFrame() { public static final ClientFrame INSTANCE = new ClientFrame();
private TextArea ta;
Client client = new Client("localhost", 8888); private Client client;
client.clientStart();
TextArea ta = new TextArea(); public ClientFrame() {
ta = new TextArea();
TextField tf = new TextField(); TextField tf = new TextField();
this.setSize(600, 400); this.setSize(600, 400);
this.add(ta, BorderLayout.CENTER); this.add(ta, BorderLayout.CENTER);
this.add(tf, BorderLayout.SOUTH); this.add(tf, BorderLayout.SOUTH);
this.setVisible(true); // this.setVisible(true);
this.addWindowListener(new WindowAdapter() { this.addWindowListener(new WindowAdapter() {
@Override @Override
public void windowClosing(WindowEvent e) { public void windowClosing(WindowEvent e) {
client.close();
System.exit(1); System.exit(1);
} }
}); });
@ -45,15 +49,27 @@ public class ClientFrame extends Frame {
tf.addActionListener(new ActionListener() { tf.addActionListener(new ActionListener() {
@Override @Override
public void actionPerformed(ActionEvent e) { public void actionPerformed(ActionEvent e) {
ta.setText(ta.getText() + tf.getText() + "\r\n"); client.send(tf.getText());
// ta.setText(ta.getText() + tf.getText() + "\r\n");
tf.setText(""); tf.setText("");
client.getChannel().writeAndFlush(ta.getText().getBytes());
} }
}); });
}
public void updateText(String msg) {
ta.setText(ta.getText() + msg + System.getProperty("line.separator"));
}
public void connectToServer() {
client = new Client();
client.connect();
} }
public static void main(String[] args) { public static void main(String[] args) {
new ClientFrame(); ClientFrame clientFrame = INSTANCE;
clientFrame.setVisible(true);
clientFrame.connectToServer();
} }
} }

@ -5,12 +5,15 @@ package com.msb.socket;/**
* @Version: 1.0 * @Version: 1.0
*/ */
import com.msb.model.ClientFrame;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*; import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.ReferenceCountUtil;
/** /**
*@ClassName Client *@ClassName Client
@ -21,31 +24,34 @@ import io.netty.channel.socket.nio.NioSocketChannel;
*/ */
public class Client { public class Client {
private Channel channel;
public void connect() { public void connect() {
//线程池
EventLoopGroup group = new NioEventLoopGroup(1); EventLoopGroup group = new NioEventLoopGroup(1); //线程池
Bootstrap bootstrap = new Bootstrap(); Bootstrap bootstrap = new Bootstrap(); //辅助启动的类
try { try {
ChannelFuture future = bootstrap.group(group) ChannelFuture future = bootstrap.group(group) //指定线程池
.channel(NioSocketChannel.class) .channel(NioSocketChannel.class) //指定channel类型
.handler(new ClientChannelInitializer()) .handler(new ClientChannelInitializer()) //channel的处理器
.connect("localhost", 8888); .connect("localhost", 8888);
//监听connect成功还是失败
future.addListener(new ChannelFutureListener() { future.addListener(new ChannelFutureListener() {
@Override @Override
public void operationComplete(ChannelFuture channelFuture) throws Exception { public void operationComplete(ChannelFuture channelFuture) throws Exception {
if(channelFuture.isSuccess()) { if(channelFuture.isSuccess()) {
System.out.println("connected!"); System.out.println("connected!");
channel = channelFuture.channel(); //连接成功以后初始化channel
} else { } else {
System.out.println("not connect!"); System.out.println("not connect!");
} }
} }
}); });
future.sync(); future.sync(); //等待
future.channel().closeFuture().sync(); future.channel().closeFuture().sync(); //channelFuture的结束
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
@ -54,6 +60,15 @@ public class Client {
} }
} }
public void send(String msg) {
ByteBuf buf = Unpooled.copiedBuffer(msg.getBytes());
channel.writeAndFlush(buf);
}
public void close() {
this.send("bye");
}
public static void main(String[] args) { public static void main(String[] args) {
Client client = new Client(); Client client = new Client();
client.connect(); client.connect();
@ -73,9 +88,24 @@ class ClientHandler extends ChannelInboundHandlerAdapter {
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// super.channelRead(ctx, msg); // super.channelRead(ctx, msg);
ByteBuf buf = (ByteBuf) msg; ByteBuf buf = null;
byte[] bytes = new byte[buf.readableBytes()]; try {
buf.getBytes(buf.readerIndex(), bytes); buf = (ByteBuf) msg;
System.out.println(new String(bytes)); byte[] bytes = new byte[buf.readableBytes()];
buf.getBytes(buf.readerIndex(), bytes);
// System.out.println(new String(bytes));
ClientFrame.INSTANCE.updateText(new String(bytes));
} finally {
if(buf != null) {
ReferenceCountUtil.release(buf);
}
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// super.channelActive(ctx);
ByteBuf buf = Unpooled.copiedBuffer("hello".getBytes());
ctx.writeAndFlush(buf);
} }
} }

@ -6,15 +6,15 @@ package com.msb.socket;/**
*/ */
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandlerContext; import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.*;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.GlobalEventExecutor;
/** /**
@ -28,57 +28,76 @@ public class Server {
private int port; private int port;
public static final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
public Server(int port) { public Server(int port) {
this.port = port; this.port = port;
} }
public void serverStart() { public void serverStart() {
EventLoopGroup bootGroup = new NioEventLoopGroup(1); EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workGroup = new NioEventLoopGroup(3); EventLoopGroup workGroup = new NioEventLoopGroup(2);
ServerBootstrap bootstrap = new ServerBootstrap(); ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bootGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new Handler());
}
});
try { try {
bootstrap.bind(this.port).sync(); ChannelFuture future = bootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ServerChildHandler());
}
}).bind(this.port).sync(); //这个sync是什么阻塞。这个sync是看bind有没有连接成功成功了才继续往下执行
future.channel().closeFuture().sync();
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} finally { } finally {
bootGroup.shutdownGracefully(); bossGroup.shutdownGracefully();
workGroup.shutdownGracefully(); workGroup.shutdownGracefully();
} }
} }
public static void main(String[] args) { public static void main(String[] args) {
new Server(8888); new Server(8888).serverStart();
} }
} }
class Handler extends ChannelInboundHandlerAdapter { class ServerChildHandler extends ChannelInboundHandlerAdapter {
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// super.channelRead(ctx, msg); /*channelGroup.forEach(channel -> {
channelGroup.forEach(channel -> {
if( ! channel.equals(ctx.channel())) { if( ! channel.equals(ctx.channel())) {
channel.writeAndFlush(msg); channel.writeAndFlush(msg);
} }
}); });*/
ByteBuf buf = (ByteBuf) msg;
byte[] bytes = new byte[buf.readableBytes()];
buf.getBytes(buf.readerIndex(), bytes);
String message = new String(bytes);
if("bye".equals(message)) {
Server.clients.remove(ctx.channel());
ctx.close();
} else {
Server.clients.writeAndFlush(msg);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Server.clients.add(ctx.channel());
} }
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// super.exceptionCaught(ctx, cause);
cause.printStackTrace(); cause.printStackTrace();
//移除出现异常的客户端channel,并关闭连接
Server.clients.remove(ctx.channel());
ctx.close(); ctx.close();
} }
} }

Loading…
Cancel
Save