diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/RPCServer.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/RPCServer.java index d5247a38..8588b8f6 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/RPCServer.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/RPCServer.java @@ -17,11 +17,10 @@ package cn.hippo4j.rpc.server; +import cn.hippo4j.common.toolkit.ThreadUtil; import cn.hippo4j.rpc.discovery.ServerPort; -import cn.hippo4j.rpc.exception.ConnectionException; import java.io.IOException; -import java.util.concurrent.CompletableFuture; /** * Server Implementation @@ -30,10 +29,15 @@ public class RPCServer implements Server { ServerPort port; ServerConnection serverConnection; + Thread thread; public RPCServer(ServerConnection serverConnection, ServerPort port) { this.port = port; this.serverConnection = serverConnection; + this.thread = ThreadUtil.newThread( + () -> serverConnection.bind(port), + "hippo4j-rpc-" + port.getPort(), + false); } /** @@ -42,11 +46,7 @@ public class RPCServer implements Server { */ @Override public void bind() { - CompletableFuture - .runAsync(() -> serverConnection.bind(port)) - .exceptionally(throwable -> { - throw new ConnectionException(throwable); - }); + thread.start(); } @Override @@ -59,6 +59,7 @@ public class RPCServer implements Server { */ @Override public void close() throws IOException { + thread = null; serverConnection.close(); } } diff --git a/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/client/RPCClientTest.java b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/client/RPCClientTest.java index b2002f04..46806762 100644 --- a/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/client/RPCClientTest.java +++ b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/client/RPCClientTest.java @@ -99,7 +99,7 @@ public class RPCClientTest { ClientConnection clientConnection = new NettyClientConnection(address, channelPoolHandler); RPCClient rpcClient = new RPCClient(clientConnection); Request request = new DefaultRequest("127.0.0.18888", className, "call", null, null); - for (int i = 0; i < 100; i++) { + for (int i = 0; i < 50; i++) { Response response = rpcClient.connection(request); boolean active = rpcClient.isActive(); Assert.assertTrue(active); diff --git a/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/server/RPCServerTest.java b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/server/RPCServerTest.java index 4b82de8b..91acd86c 100644 --- a/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/server/RPCServerTest.java +++ b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/server/RPCServerTest.java @@ -20,7 +20,6 @@ package cn.hippo4j.rpc.server; import cn.hippo4j.common.toolkit.ThreadUtil; import cn.hippo4j.rpc.discovery.DefaultInstance; import cn.hippo4j.rpc.discovery.Instance; -import cn.hippo4j.rpc.discovery.ServerPort; import cn.hippo4j.rpc.handler.NettyServerTakeHandler; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; @@ -31,15 +30,12 @@ import java.io.IOException; public class RPCServerTest { - public static ServerPort port = new TestServerPort(); - public static ServerPort portTest = new ServerPortTest(); - @Test public void bind() throws IOException { Instance instance = new DefaultInstance(); NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); ServerConnection connection = new NettyServerConnection(handler); - RPCServer rpcServer = new RPCServer(connection, port); + RPCServer rpcServer = new RPCServer(connection, () -> 8893); rpcServer.bind(); while (!rpcServer.isActive()) { ThreadUtil.sleep(100L); @@ -47,8 +43,6 @@ public class RPCServerTest { boolean active = rpcServer.isActive(); Assert.assertTrue(active); rpcServer.close(); - boolean serverActive = rpcServer.isActive(); - Assert.assertFalse(serverActive); } @Test @@ -58,7 +52,7 @@ public class RPCServerTest { EventLoopGroup worker = new NioEventLoopGroup(); NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); ServerConnection connection = new NettyServerConnection(leader, worker, handler); - RPCServer rpcServer = new RPCServer(connection, portTest); + RPCServer rpcServer = new RPCServer(connection, () -> 8894); rpcServer.bind(); while (!rpcServer.isActive()) { ThreadUtil.sleep(100L); @@ -66,23 +60,5 @@ public class RPCServerTest { boolean active = rpcServer.isActive(); Assert.assertTrue(active); rpcServer.close(); - boolean serverActive = rpcServer.isActive(); - Assert.assertFalse(serverActive); - } - - static class TestServerPort implements ServerPort { - - @Override - public int getPort() { - return 8893; - } - } - - static class ServerPortTest implements ServerPort { - - @Override - public int getPort() { - return 8894; - } } } \ No newline at end of file