Merge branch 'opengoofy:develop' into develop

pull/1034/head
JianNie 2 years ago committed by GitHub
commit b6d0e625b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -24,6 +24,7 @@ import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.core.toolkit.inet.InetUtils;
import lombok.NoArgsConstructor;
import org.springframework.boot.web.server.WebServer;
import org.springframework.core.env.ConfigurableEnvironment;
import java.util.Arrays;
import java.util.Objects;
@ -54,6 +55,16 @@ public class WebIpAndPortHolder {
protected static final String SEPARATOR = ",";
/**
* get port for Environment
*/
protected static final String PORT_KEY = "server.port";
/**
* if port is null, use this
*/
protected static final int PORT = 8080;
protected static void initIpAndPort() {
if (!supportVersion) {
return;
@ -66,12 +77,20 @@ public class WebIpAndPortHolder {
InetUtils.HostInfo loopBackHostInfo = inetUtils.findFirstNonLoopBackHostInfo();
Assert.notNull(loopBackHostInfo, "Unable to get the application IP address");
String ip = loopBackHostInfo.getIpAddress();
WebThreadPoolHandlerChoose webThreadPoolHandlerChoose = ApplicationContextHolder.getBean(WebThreadPoolHandlerChoose.class);
WebThreadPoolService webThreadPoolService = webThreadPoolHandlerChoose.choose();
// When get the port at startup, can get the message: "port xxx was already in use" or use two ports
WebServer webServer = webThreadPoolService.getWebServer();
String port = String.valueOf(webServer.getPort());
return new WebIpAndPortInfo(ip, port);
ConfigurableEnvironment environment = ApplicationContextHolder.getBean(ConfigurableEnvironment.class);
Integer port = environment.getProperty(PORT_KEY, Integer.TYPE);
port = Objects.isNull(port) ? PORT : port;
if (port == 0) {
WebThreadPoolHandlerChoose webThreadPoolHandlerChoose = ApplicationContextHolder.getBean(WebThreadPoolHandlerChoose.class);
WebThreadPoolService webThreadPoolService = webThreadPoolHandlerChoose.choose();
// When get the port at startup, can get the message: "port xxx was already in use" or use two ports
WebServer webServer = webThreadPoolService.getWebServer();
port = webServer.getPort();
}
return new WebIpAndPortInfo(ip, String.valueOf(port));
}
/**

@ -17,11 +17,10 @@
package cn.hippo4j.rpc.server;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.exception.ConnectionException;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
/**
* Server Implementation
@ -30,10 +29,15 @@ public class RPCServer implements Server {
ServerPort port;
ServerConnection serverConnection;
Thread thread;
public RPCServer(ServerConnection serverConnection, ServerPort port) {
this.port = port;
this.serverConnection = serverConnection;
this.thread = ThreadUtil.newThread(
() -> serverConnection.bind(port),
"hippo4j-rpc-" + port.getPort(),
false);
}
/**
@ -42,11 +46,7 @@ public class RPCServer implements Server {
*/
@Override
public void bind() {
CompletableFuture
.runAsync(() -> serverConnection.bind(port))
.exceptionally(throwable -> {
throw new ConnectionException(throwable);
});
thread.start();
}
@Override
@ -59,6 +59,7 @@ public class RPCServer implements Server {
*/
@Override
public void close() throws IOException {
thread = null;
serverConnection.close();
}
}

@ -99,7 +99,7 @@ public class RPCClientTest {
ClientConnection clientConnection = new NettyClientConnection(address, channelPoolHandler);
RPCClient rpcClient = new RPCClient(clientConnection);
Request request = new DefaultRequest("127.0.0.18888", className, "call", null, null);
for (int i = 0; i < 100; i++) {
for (int i = 0; i < 50; i++) {
Response response = rpcClient.connection(request);
boolean active = rpcClient.isActive();
Assert.assertTrue(active);

@ -20,7 +20,6 @@ package cn.hippo4j.rpc.server;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.rpc.discovery.DefaultInstance;
import cn.hippo4j.rpc.discovery.Instance;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
@ -31,15 +30,12 @@ import java.io.IOException;
public class RPCServerTest {
public static ServerPort port = new TestServerPort();
public static ServerPort portTest = new ServerPortTest();
@Test
public void bind() throws IOException {
Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(connection, port);
RPCServer rpcServer = new RPCServer(connection, () -> 8893);
rpcServer.bind();
while (!rpcServer.isActive()) {
ThreadUtil.sleep(100L);
@ -47,8 +43,6 @@ public class RPCServerTest {
boolean active = rpcServer.isActive();
Assert.assertTrue(active);
rpcServer.close();
boolean serverActive = rpcServer.isActive();
Assert.assertFalse(serverActive);
}
@Test
@ -58,7 +52,7 @@ public class RPCServerTest {
EventLoopGroup worker = new NioEventLoopGroup();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(leader, worker, handler);
RPCServer rpcServer = new RPCServer(connection, portTest);
RPCServer rpcServer = new RPCServer(connection, () -> 8894);
rpcServer.bind();
while (!rpcServer.isActive()) {
ThreadUtil.sleep(100L);
@ -66,23 +60,5 @@ public class RPCServerTest {
boolean active = rpcServer.isActive();
Assert.assertTrue(active);
rpcServer.close();
boolean serverActive = rpcServer.isActive();
Assert.assertFalse(serverActive);
}
static class TestServerPort implements ServerPort {
@Override
public int getPort() {
return 8893;
}
}
static class ServerPortTest implements ServerPort {
@Override
public int getPort() {
return 8894;
}
}
}

@ -35,8 +35,6 @@ public class NettyServerSupportTest {
}
Assert.assertTrue(support.isActive());
support.close();
ThreadUtil.sleep(1000L);
Assert.assertFalse(support.isActive());
}
}
Loading…
Cancel
Save