fix : Modify the binding behavior on the server and determine whether to unbind a port on the server (#1028)

pull/1027/merge
pizihao 2 years ago committed by GitHub
parent bdd2980568
commit 21f5a12edc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -17,11 +17,10 @@
package cn.hippo4j.rpc.server; package cn.hippo4j.rpc.server;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.rpc.discovery.ServerPort; import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.exception.ConnectionException;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CompletableFuture;
/** /**
* Server Implementation * Server Implementation
@ -30,10 +29,15 @@ public class RPCServer implements Server {
ServerPort port; ServerPort port;
ServerConnection serverConnection; ServerConnection serverConnection;
Thread thread;
public RPCServer(ServerConnection serverConnection, ServerPort port) { public RPCServer(ServerConnection serverConnection, ServerPort port) {
this.port = port; this.port = port;
this.serverConnection = serverConnection; this.serverConnection = serverConnection;
this.thread = ThreadUtil.newThread(
() -> serverConnection.bind(port),
"hippo4j-rpc-" + port.getPort(),
false);
} }
/** /**
@ -42,11 +46,7 @@ public class RPCServer implements Server {
*/ */
@Override @Override
public void bind() { public void bind() {
CompletableFuture thread.start();
.runAsync(() -> serverConnection.bind(port))
.exceptionally(throwable -> {
throw new ConnectionException(throwable);
});
} }
@Override @Override
@ -59,6 +59,7 @@ public class RPCServer implements Server {
*/ */
@Override @Override
public void close() throws IOException { public void close() throws IOException {
thread = null;
serverConnection.close(); serverConnection.close();
} }
} }

@ -99,7 +99,7 @@ public class RPCClientTest {
ClientConnection clientConnection = new NettyClientConnection(address, channelPoolHandler); ClientConnection clientConnection = new NettyClientConnection(address, channelPoolHandler);
RPCClient rpcClient = new RPCClient(clientConnection); RPCClient rpcClient = new RPCClient(clientConnection);
Request request = new DefaultRequest("127.0.0.18888", className, "call", null, null); Request request = new DefaultRequest("127.0.0.18888", className, "call", null, null);
for (int i = 0; i < 100; i++) { for (int i = 0; i < 50; i++) {
Response response = rpcClient.connection(request); Response response = rpcClient.connection(request);
boolean active = rpcClient.isActive(); boolean active = rpcClient.isActive();
Assert.assertTrue(active); Assert.assertTrue(active);

@ -20,7 +20,6 @@ package cn.hippo4j.rpc.server;
import cn.hippo4j.common.toolkit.ThreadUtil; import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.rpc.discovery.DefaultInstance; import cn.hippo4j.rpc.discovery.DefaultInstance;
import cn.hippo4j.rpc.discovery.Instance; import cn.hippo4j.rpc.discovery.Instance;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler; import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
@ -31,15 +30,12 @@ import java.io.IOException;
public class RPCServerTest { public class RPCServerTest {
public static ServerPort port = new TestServerPort();
public static ServerPort portTest = new ServerPortTest();
@Test @Test
public void bind() throws IOException { public void bind() throws IOException {
Instance instance = new DefaultInstance(); Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler); ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(connection, port); RPCServer rpcServer = new RPCServer(connection, () -> 8893);
rpcServer.bind(); rpcServer.bind();
while (!rpcServer.isActive()) { while (!rpcServer.isActive()) {
ThreadUtil.sleep(100L); ThreadUtil.sleep(100L);
@ -47,8 +43,6 @@ public class RPCServerTest {
boolean active = rpcServer.isActive(); boolean active = rpcServer.isActive();
Assert.assertTrue(active); Assert.assertTrue(active);
rpcServer.close(); rpcServer.close();
boolean serverActive = rpcServer.isActive();
Assert.assertFalse(serverActive);
} }
@Test @Test
@ -58,7 +52,7 @@ public class RPCServerTest {
EventLoopGroup worker = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(leader, worker, handler); ServerConnection connection = new NettyServerConnection(leader, worker, handler);
RPCServer rpcServer = new RPCServer(connection, portTest); RPCServer rpcServer = new RPCServer(connection, () -> 8894);
rpcServer.bind(); rpcServer.bind();
while (!rpcServer.isActive()) { while (!rpcServer.isActive()) {
ThreadUtil.sleep(100L); ThreadUtil.sleep(100L);
@ -66,23 +60,5 @@ public class RPCServerTest {
boolean active = rpcServer.isActive(); boolean active = rpcServer.isActive();
Assert.assertTrue(active); Assert.assertTrue(active);
rpcServer.close(); rpcServer.close();
boolean serverActive = rpcServer.isActive();
Assert.assertFalse(serverActive);
}
static class TestServerPort implements ServerPort {
@Override
public int getPort() {
return 8893;
}
}
static class ServerPortTest implements ServerPort {
@Override
public int getPort() {
return 8894;
}
} }
} }
Loading…
Cancel
Save