fix : The rpc module is tuned to start the server asynchronously and … (#973)

* fix : The rpc module is tuned to start the server asynchronously and use the InetSocketAddress proxy host and port

* fix : Modify the logic waiting for the server to start in the test

* fix : The rpc module dependency change to common
pull/975/head
pizihao 3 years ago committed by GitHub
parent cd9a9de5bf
commit a54899e793
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -19,7 +19,6 @@ package cn.hippo4j.rpc.client;
import cn.hippo4j.common.toolkit.Assert; import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.common.web.exception.IllegalException; import cn.hippo4j.common.web.exception.IllegalException;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.exception.TimeOutException; import cn.hippo4j.rpc.exception.TimeOutException;
import cn.hippo4j.rpc.model.Request; import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response; import cn.hippo4j.rpc.model.Response;
@ -35,6 +34,7 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.pool.ChannelPoolHandler; import io.netty.channel.pool.ChannelPoolHandler;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.LockSupport;
@ -45,9 +45,10 @@ import java.util.concurrent.locks.LockSupport;
@Slf4j @Slf4j
public class NettyClientConnection implements ClientConnection { public class NettyClientConnection implements ClientConnection {
String host; InetSocketAddress address;
ServerPort port; /**
// Obtain the connection timeout period. The default value is 30s * Obtain the connection timeout period. The default value is 30s
*/
long timeout = 30000L; long timeout = 30000L;
EventLoopGroup worker = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup();
ActiveProcessChain activeProcessChain; ActiveProcessChain activeProcessChain;
@ -55,18 +56,17 @@ public class NettyClientConnection implements ClientConnection {
ChannelFuture future; ChannelFuture future;
Channel channel; Channel channel;
public NettyClientConnection(String host, ServerPort port, public NettyClientConnection(InetSocketAddress address,
List<ActivePostProcess> activeProcesses, List<ActivePostProcess> activeProcesses,
ChannelPoolHandler handler) { ChannelPoolHandler handler) {
Assert.notNull(worker); Assert.notNull(worker);
this.host = host; this.address = address;
this.port = port;
this.activeProcessChain = new ActiveProcessChain(activeProcesses); this.activeProcessChain = new ActiveProcessChain(activeProcesses);
this.connectionPool = NettyConnectPoolHolder.getPool(host, port, timeout, worker, handler); this.connectionPool = NettyConnectPoolHolder.getPool(address, timeout, worker, handler);
} }
public NettyClientConnection(String host, ServerPort port, ChannelPoolHandler handler) { public NettyClientConnection(InetSocketAddress address, ChannelPoolHandler handler) {
this(host, port, new LinkedList<>(), handler); this(address, new LinkedList<>(), handler);
} }
@Override @Override
@ -77,7 +77,7 @@ public class NettyClientConnection implements ClientConnection {
try { try {
String key = request.getKey(); String key = request.getKey();
this.future = channel.writeAndFlush(request); this.future = channel.writeAndFlush(request);
log.info("Call successful, target address is {}:{}, request key is {}", host, port.getPort(), key); log.info("Call successful, target address is {}:{}, request key is {}", address.getHostName(), address.getPort(), key);
// Wait for execution to complete // Wait for execution to complete
ResultHolder.putThread(key, Thread.currentThread()); ResultHolder.putThread(key, Thread.currentThread());
LockSupport.parkNanos(timeout() * 1000000); LockSupport.parkNanos(timeout() * 1000000);
@ -86,7 +86,7 @@ public class NettyClientConnection implements ClientConnection {
throw new TimeOutException("Timeout waiting for server-side response"); throw new TimeOutException("Timeout waiting for server-side response");
} }
activeProcessChain.applyPostHandle(request, response); activeProcessChain.applyPostHandle(request, response);
log.info("The response from {}:{} was received successfully with the response key {}.", host, port.getPort(), key); log.info("The response from {}:{} was received successfully with the response key {}.", address.getHostName(), address.getPort(), key);
return response; return response;
} catch (Exception ex) { } catch (Exception ex) {
activeProcessChain.afterCompletion(request, response, ex); activeProcessChain.afterCompletion(request, response, ex);

@ -41,7 +41,8 @@ public abstract class AbstractNettyTakeHandler extends ChannelInboundHandlerAdap
Channel channel = ctx.channel(); Channel channel = ctx.channel();
if (channel.isActive()) { if (channel.isActive()) {
ctx.close(); ctx.close();
} else { }
if (cause != null) {
throw new ConnectionException(cause); throw new ConnectionException(cause);
} }
} }

@ -22,6 +22,7 @@ import cn.hippo4j.rpc.coder.NettyEncoder;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.pool.ChannelPoolHandler; import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ClassResolvers;
@ -33,40 +34,40 @@ import java.util.List;
* Processing by the client connection pool handler to clean the buffer and define new connection properties * Processing by the client connection pool handler to clean the buffer and define new connection properties
*/ */
@Slf4j @Slf4j
public class AbstractNettyClientPoolHandler extends AbstractNettyHandlerManager implements ChannelPoolHandler { public class NettyClientPoolHandler extends AbstractNettyHandlerManager implements ChannelPoolHandler {
public AbstractNettyClientPoolHandler(List<ChannelHandler> handlers) { public NettyClientPoolHandler(List<ChannelHandler> handlers) {
super(handlers); super(handlers);
} }
public AbstractNettyClientPoolHandler(ChannelHandler... handlers) { public NettyClientPoolHandler(ChannelHandler... handlers) {
super(handlers); super(handlers);
} }
public AbstractNettyClientPoolHandler() { public NettyClientPoolHandler() {
super(); super();
} }
@Override @Override
public AbstractNettyClientPoolHandler addLast(String name, ChannelHandler handler) { public NettyClientPoolHandler addLast(String name, ChannelHandler handler) {
super.addLast(name, handler); super.addLast(name, handler);
return this; return this;
} }
@Override @Override
public AbstractNettyClientPoolHandler addFirst(String name, ChannelHandler handler) { public NettyClientPoolHandler addFirst(String name, ChannelHandler handler) {
super.addFirst(name, handler); super.addFirst(name, handler);
return this; return this;
} }
@Override @Override
public AbstractNettyClientPoolHandler addLast(ChannelHandler handler) { public NettyClientPoolHandler addLast(ChannelHandler handler) {
super.addLast(handler); super.addLast(handler);
return this; return this;
} }
@Override @Override
public AbstractNettyClientPoolHandler addFirst(ChannelHandler handler) { public NettyClientPoolHandler addFirst(ChannelHandler handler) {
super.addFirst(handler); super.addFirst(handler);
return this; return this;
} }
@ -87,15 +88,16 @@ public class AbstractNettyClientPoolHandler extends AbstractNettyHandlerManager
NioSocketChannel channel = (NioSocketChannel) ch; NioSocketChannel channel = (NioSocketChannel) ch;
channel.config() channel.config()
.setTcpNoDelay(false); .setTcpNoDelay(false);
ch.pipeline().addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null))); ChannelPipeline pipeline = ch.pipeline();
ch.pipeline().addLast(new NettyEncoder()); pipeline.addLast(new NettyEncoder());
pipeline.addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null)));
this.handlerEntities.stream() this.handlerEntities.stream()
.sorted() .sorted()
.forEach(h -> { .forEach(h -> {
if (h.getName() == null) { if (h.getName() == null) {
ch.pipeline().addLast(h.getHandler()); pipeline.addLast(h.getHandler());
} else { } else {
ch.pipeline().addLast(h.getName(), h.getHandler()); pipeline.addLast(h.getName(), h.getHandler());
} }
}); });
} }

@ -21,6 +21,7 @@ import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.rpc.coder.NettyDecoder; import cn.hippo4j.rpc.coder.NettyDecoder;
import cn.hippo4j.rpc.coder.NettyEncoder; import cn.hippo4j.rpc.coder.NettyEncoder;
import cn.hippo4j.rpc.discovery.ServerPort; import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.exception.ConnectionException;
import cn.hippo4j.rpc.handler.AbstractNettyHandlerManager; import cn.hippo4j.rpc.handler.AbstractNettyHandlerManager;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*; import io.netty.channel.*;
@ -38,7 +39,7 @@ import java.util.List;
* adapter to the netty server * adapter to the netty server
*/ */
@Slf4j @Slf4j
public class AbstractNettyServerConnection extends AbstractNettyHandlerManager implements ServerConnection { public class NettyServerConnection extends AbstractNettyHandlerManager implements ServerConnection {
ServerPort port; ServerPort port;
EventLoopGroup leader; EventLoopGroup leader;
@ -47,7 +48,7 @@ public class AbstractNettyServerConnection extends AbstractNettyHandlerManager i
ChannelFuture future; ChannelFuture future;
Channel channel; Channel channel;
public AbstractNettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, List<ChannelHandler> handlers) { public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, List<ChannelHandler> handlers) {
super(handlers); super(handlers);
Assert.notNull(leader); Assert.notNull(leader);
Assert.notNull(worker); Assert.notNull(worker);
@ -55,15 +56,15 @@ public class AbstractNettyServerConnection extends AbstractNettyHandlerManager i
this.worker = worker; this.worker = worker;
} }
public AbstractNettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, ChannelHandler... handlers) { public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, ChannelHandler... handlers) {
this(leader, worker, (handlers != null ? Arrays.asList(handlers) : Collections.emptyList())); this(leader, worker, (handlers != null ? Arrays.asList(handlers) : Collections.emptyList()));
} }
public AbstractNettyServerConnection(ChannelHandler... handlers) { public NettyServerConnection(ChannelHandler... handlers) {
this(handlers != null ? Arrays.asList(handlers) : Collections.emptyList()); this(handlers != null ? Arrays.asList(handlers) : Collections.emptyList());
} }
public AbstractNettyServerConnection(List<ChannelHandler> handlers) { public NettyServerConnection(List<ChannelHandler> handlers) {
this(new NioEventLoopGroup(), new NioEventLoopGroup(), handlers); this(new NioEventLoopGroup(), new NioEventLoopGroup(), handlers);
} }
@ -77,27 +78,29 @@ public class AbstractNettyServerConnection extends AbstractNettyHandlerManager i
@Override @Override
protected void initChannel(SocketChannel ch) throws Exception { protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null))); ChannelPipeline pipeline = ch.pipeline();
ch.pipeline().addLast(new NettyEncoder()); pipeline.addLast(new NettyEncoder());
pipeline.addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null)));
handlerEntities.stream() handlerEntities.stream()
.sorted() .sorted()
.forEach(h -> { .forEach(h -> {
if (h.getName() == null) { if (h.getName() == null) {
ch.pipeline().addLast(h.getHandler()); pipeline.addLast(h.getHandler());
} else { } else {
ch.pipeline().addLast(h.getName(), h.getHandler()); pipeline.addLast(h.getName(), h.getHandler());
} }
}); });
} }
}); });
try { try {
this.future = server.bind(port.getPort()); this.future = server.bind(port.getPort()).sync();
this.channel = this.future.channel(); this.channel = this.future.channel();
log.info("The server is started and can receive requests. The listening port is {}", port); log.info("The server is started and can receive requests. The listening port is {}", port.getPort());
this.port = port; this.port = port;
this.future.channel().closeFuture().sync(); this.future.channel().closeFuture().sync();
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
throw new ConnectionException("Listening port failed, Please check whether the port is occupied", ex);
} }
} }
@ -109,34 +112,37 @@ public class AbstractNettyServerConnection extends AbstractNettyHandlerManager i
leader.shutdownGracefully(); leader.shutdownGracefully();
worker.shutdownGracefully(); worker.shutdownGracefully();
this.future.channel().close(); this.future.channel().close();
log.info("The server is shut down and no more requests are received. The release port is {}", port); log.info("The server is shut down and no more requests are received. The release port is {}", port.getPort());
} }
@Override @Override
public boolean isActive() { public boolean isActive() {
if (channel == null) {
return false;
}
return channel.isActive(); return channel.isActive();
} }
@Override @Override
public AbstractNettyServerConnection addLast(String name, ChannelHandler handler) { public NettyServerConnection addLast(String name, ChannelHandler handler) {
super.addLast(name, handler); super.addLast(name, handler);
return this; return this;
} }
@Override @Override
public AbstractNettyServerConnection addFirst(String name, ChannelHandler handler) { public NettyServerConnection addFirst(String name, ChannelHandler handler) {
super.addFirst(name, handler); super.addFirst(name, handler);
return this; return this;
} }
@Override @Override
public AbstractNettyServerConnection addLast(ChannelHandler handler) { public NettyServerConnection addLast(ChannelHandler handler) {
super.addLast(handler); super.addLast(handler);
return this; return this;
} }
@Override @Override
public AbstractNettyServerConnection addFirst(ChannelHandler handler) { public NettyServerConnection addFirst(ChannelHandler handler) {
super.addFirst(handler); super.addFirst(handler);
return this; return this;
} }

@ -18,8 +18,10 @@
package cn.hippo4j.rpc.server; package cn.hippo4j.rpc.server;
import cn.hippo4j.rpc.discovery.ServerPort; import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.exception.ConnectionException;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CompletableFuture;
/** /**
* Server Implementation * Server Implementation
@ -34,9 +36,17 @@ public class RPCServer implements Server {
this.serverConnection = serverConnection; this.serverConnection = serverConnection;
} }
/**
* Reference from{@link cn.hippo4j.config.netty.MonitorNettyServer}<br>
* Start the server side asynchronously
*/
@Override @Override
public void bind() { public void bind() {
serverConnection.bind(port); CompletableFuture
.runAsync(() -> serverConnection.bind(port))
.exceptionally(throwable -> {
throw new ConnectionException(throwable);
});
} }
@Override @Override

@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.support;
import cn.hippo4j.rpc.client.Client;
import cn.hippo4j.rpc.discovery.DiscoveryAdapter;
import cn.hippo4j.rpc.exception.ConnectionException;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import io.netty.channel.ChannelHandler;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import java.net.InetSocketAddress;
/**
* A FactoryBean that builds interfaces to invoke proxy objects
* is responsible for managing the entire life cycle of the proxy objects<br>
*
* @deprecated With {@link cn.hippo4j.config.service.ThreadPoolAdapterService} structure, FactoryBean is not the best choice
*/
@Deprecated
public class ClientFactoryBean implements FactoryBean<Object>, InitializingBean, ApplicationContextAware, DisposableBean {
/**
* Application name or address string. If it is an address string, it must be in ip:port format
*/
private String applicationName;
/**
* The adapter name in the container needs to be used with applicationName
* to get the real server address. If it is null or the address information
* cannot be found, applicationName is treated as an address string
*/
private String discoveryAdapterName;
private DiscoveryAdapter discoveryAdapter;
/**
* the channel handler
*/
private ChannelHandler[] handlers;
/**
* Type of the proxy interface
*/
private Class<?> cls;
/**
* Container Context
*/
private ApplicationContext applicationContext;
/**
* InetSocketAddress
*/
InetSocketAddress address;
public ClientFactoryBean(String applicationName, String discoveryAdapterName, Class<?> cls) {
this.applicationName = applicationName;
this.discoveryAdapterName = discoveryAdapterName;
this.cls = cls;
}
@Override
public Object getObject() throws Exception {
this.address = discoveryAdapter.getSocketAddress(applicationName);
if (this.address == null) {
String[] addressStr = applicationName.split(":");
if (addressStr.length < 2) {
throw new ConnectionException("Failed to connect to the server because the IP address is invalid. Procedure");
}
this.address = InetSocketAddress.createUnresolved(addressStr[0], Integer.parseInt(addressStr[1]));
}
NettyClientPoolHandler handler = new NettyClientPoolHandler(handlers);
Client client = NettyClientSupport.getClient(this.address, handler);
return NettyProxyCenter.createProxy(client, cls, this.address);
}
@Override
public Class<?> getObjectType() {
return cls;
}
@Override
public void afterPropertiesSet() throws Exception {
this.discoveryAdapter = (DiscoveryAdapter) applicationContext.getBean(discoveryAdapterName);
}
@Override
public void destroy() throws Exception {
if (this.address == null) {
return;
}
NettyClientSupport.closeClient(this.address);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
public ClientFactoryBean applicationName(String applicationName) {
this.applicationName = applicationName;
return this;
}
public ClientFactoryBean discoveryAdapterName(String discoveryAdapterName) {
this.discoveryAdapterName = discoveryAdapterName;
return this;
}
public ClientFactoryBean cls(Class<?> cls) {
this.cls = cls;
return this;
}
public ClientFactoryBean handlers(ChannelHandler[] handlers) {
this.handlers = handlers;
return this;
}
}

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.support;
import cn.hippo4j.common.web.exception.IllegalException;
import cn.hippo4j.rpc.client.Client;
import cn.hippo4j.rpc.client.ClientConnection;
import cn.hippo4j.rpc.client.NettyClientConnection;
import cn.hippo4j.rpc.client.RPCClient;
import cn.hippo4j.rpc.handler.HandlerManager;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import io.netty.channel.ChannelHandler;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Different from the management of the server side, in order not to waste resources, we pool the
* connections of different addresses and turn the client into a one-time resource. If there is no
* support from the container, the client is a resource that can be recovered after use. This is
* similar to {@link WeakReference}, but the client needs the user to set the life cycle.<br>
* <p>
* Typically, the client is just a front for the direct connection between the client and the server,
* and for any call to succeed, only the {@link ClientConnection} connection is required. In the
* presence of a container, it is necessary to keep the client active for a long time, when the
* client should be a specific resource in the container, following the resource lifecycle specified
* by the container
*
* @see cn.hippo4j.rpc.client.RPCClient
* @see cn.hippo4j.rpc.client.NettyClientConnection
* @see NettyServerSupport
* @see ClientFactoryBean
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class NettyClientSupport {
/**
* the cache for client
*/
private static final Map<InetSocketAddress, Client> clientMap = new ConcurrentHashMap<>();
/**
* Obtain the client connected to the server through the server address. If the client does not exist, create one
*
* @param address the address
* @param handlerManager the handlerManager
* @return Client
*/
public static Client getClient(InetSocketAddress address, HandlerManager<ChannelHandler> handlerManager) {
return clientMap.computeIfAbsent(address, a -> {
NettyClientPoolHandler handler = (handlerManager instanceof NettyClientPoolHandler)
? (NettyClientPoolHandler) handlerManager
: new NettyClientPoolHandler();
if (handler.isEmpty()) {
handler.addFirst(new NettyClientTakeHandler());
}
NettyClientConnection connection = new NettyClientConnection(address, handler);
return new RPCClient(connection);
});
}
/**
* Obtain the client connected to the server through the server address. If the client does not exist, create one by default
*
* @param address the address
* @return Client
*/
public static Client getClient(InetSocketAddress address) {
return getClient(address, new NettyClientPoolHandler());
}
/**
* Close a client connected to a server address. The client may have been closed
*
* @param address the address
*/
public static void closeClient(InetSocketAddress address) {
Client client = clientMap.remove(address);
try {
if (client != null) {
client.close();
}
} catch (IOException e) {
throw new IllegalException(e);
}
}
}

@ -17,10 +17,10 @@
package cn.hippo4j.rpc.support; package cn.hippo4j.rpc.support;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.exception.ConnectionException; import cn.hippo4j.rpc.exception.ConnectionException;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.pool.ChannelHealthChecker; import io.netty.channel.pool.ChannelHealthChecker;
import io.netty.channel.pool.ChannelPool; import io.netty.channel.pool.ChannelPool;
@ -43,25 +43,23 @@ public class NettyConnectPool {
int maxPendingAcquires = Integer.MAX_VALUE; int maxPendingAcquires = Integer.MAX_VALUE;
ChannelPoolHandler handler; ChannelPoolHandler handler;
ChannelPool pool; ChannelPool pool;
String host; InetSocketAddress address;
ServerPort port;
public NettyConnectPool(String host, ServerPort port, int maxConnect, public NettyConnectPool(InetSocketAddress address, int maxConnect,
long timeout, EventLoopGroup worker, long timeout, EventLoopGroup worker,
Class<? extends Channel> socketChannelCls, Class<? extends Channel> socketChannelCls,
ChannelPoolHandler handler) { ChannelPoolHandler handler) {
InetSocketAddress socketAddress = InetSocketAddress.createUnresolved(host, port.getPort());
Bootstrap bootstrap = new Bootstrap() Bootstrap bootstrap = new Bootstrap()
.group(worker) .group(worker)
.channel(socketChannelCls) .channel(socketChannelCls)
.remoteAddress(socketAddress); .option(ChannelOption.TCP_NODELAY, true)
this.host = host; .remoteAddress(address);
this.port = port; this.address = address;
this.handler = handler; this.handler = handler;
this.pool = new FixedChannelPool(bootstrap, handler, healthCheck, acquireTimeoutAction, this.pool = new FixedChannelPool(bootstrap, handler, healthCheck, acquireTimeoutAction,
timeout, maxConnect, maxPendingAcquires, true, true); timeout, maxConnect, maxPendingAcquires, true, true);
log.info("The connection pool is established with the connection target {}:{}", host, port.getPort()); log.info("The connection pool is established with the connection target {}:{}", address.getHostName(), address.getPort());
NettyConnectPoolHolder.createPool(host, port, this); NettyConnectPoolHolder.createPool(address, this);
} }
public Channel acquire(long timeoutMillis) { public Channel acquire(long timeoutMillis) {
@ -69,6 +67,7 @@ public class NettyConnectPool {
Future<Channel> fch = pool.acquire(); Future<Channel> fch = pool.acquire();
return fch.get(timeoutMillis, TimeUnit.MILLISECONDS); return fch.get(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (Exception e) { } catch (Exception e) {
NettyClientSupport.closeClient(address);
throw new ConnectionException("Failed to get the connection", e); throw new ConnectionException("Failed to get the connection", e);
} }
} }
@ -77,6 +76,7 @@ public class NettyConnectPool {
try { try {
return pool.acquire(); return pool.acquire();
} catch (Exception e) { } catch (Exception e) {
NettyClientSupport.closeClient(address);
throw new ConnectionException("Failed to get the connection", e); throw new ConnectionException("Failed to get the connection", e);
} }
} }
@ -87,6 +87,7 @@ public class NettyConnectPool {
pool.release(channel); pool.release(channel);
} }
} catch (Exception e) { } catch (Exception e) {
NettyClientSupport.closeClient(address);
throw new ConnectionException("Failed to release the connection", e); throw new ConnectionException("Failed to release the connection", e);
} }
} }
@ -94,8 +95,9 @@ public class NettyConnectPool {
public void close() { public void close() {
try { try {
pool.close(); pool.close();
NettyConnectPoolHolder.remove(host, port); NettyConnectPoolHolder.remove(address);
} catch (Exception e) { } catch (Exception e) {
NettyClientSupport.closeClient(address);
throw new ConnectionException("Failed to close the connection pool", e); throw new ConnectionException("Failed to close the connection pool", e);
} }
} }

@ -17,7 +17,6 @@
package cn.hippo4j.rpc.support; package cn.hippo4j.rpc.support;
import cn.hippo4j.rpc.discovery.ServerPort;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.pool.ChannelPoolHandler; import io.netty.channel.pool.ChannelPoolHandler;
@ -26,6 +25,7 @@ import io.netty.util.concurrent.EventExecutorGroup;
import lombok.AccessLevel; import lombok.AccessLevel;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import java.net.InetSocketAddress;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -35,76 +35,68 @@ import java.util.concurrent.ConcurrentHashMap;
@NoArgsConstructor(access = AccessLevel.PRIVATE) @NoArgsConstructor(access = AccessLevel.PRIVATE)
public class NettyConnectPoolHolder { public class NettyConnectPoolHolder {
static int maxConnect = 64; static int maxConnect = 256;
static Map<String, NettyConnectPool> connectPoolMap = new ConcurrentHashMap<>(); static Map<String, NettyConnectPool> connectPoolMap = new ConcurrentHashMap<>();
private static NettyConnectPool initPool(String host, ServerPort port, private static NettyConnectPool initPool(InetSocketAddress address,
long timeout, EventLoopGroup worker, long timeout, EventLoopGroup worker,
ChannelPoolHandler handler) { ChannelPoolHandler handler) {
return new NettyConnectPool( return new NettyConnectPool(address, maxConnect, timeout, worker, NioSocketChannel.class, handler);
host, port, maxConnect,
timeout, worker,
NioSocketChannel.class,
handler);
} }
private static String getKey(String host, ServerPort port) { private static String getKey(InetSocketAddress address) {
return host + ":" + port.getPort(); return address.getHostName() + ":" + address.getPort();
} }
/** /**
* The connection pool connectPoolMapping may already exist before the connection pool * The connection pool connectPoolMapping may already exist before the connection pool
* connectPoolMapping is established. In this case, the connection pool is directly overwritten * connectPoolMapping is established. In this case, the connection pool is directly overwritten
* *
* @param host the host * @param address the InetSocketAddress
* @param port the port
* @param pool This parameter applies only to the connection pool of netty * @param pool This parameter applies only to the connection pool of netty
*/ */
public static void createPool(String host, ServerPort port, NettyConnectPool pool) { public static void createPool(InetSocketAddress address, NettyConnectPool pool) {
connectPoolMap.put(getKey(host, port), pool); connectPoolMap.put(getKey(address), pool);
} }
/** /**
* Gets a connection pool, or null if there is no corresponding connectPoolMapping * Gets a connection pool, or null if there is no corresponding connectPoolMapping
* *
* @param host the host * @param address the InetSocketAddress
* @param port the port
* @return Map to the connection pool * @return Map to the connection pool
*/ */
public static NettyConnectPool getPool(String host, ServerPort port) { public static NettyConnectPool getPool(InetSocketAddress address) {
return connectPoolMap.get(getKey(host, port)); return connectPoolMap.get(getKey(address));
} }
/** /**
* Gets a connection pool, and if there is no connectPoolMapping, creates one with the values provided and joins the connectPoolMapping * Gets a connection pool, and if there is no connectPoolMapping, creates one with the values provided and joins the connectPoolMapping
* *
* @param host the host * @param address the InetSocketAddress
* @param port the port
* @param timeout timeout * @param timeout timeout
* @param worker Special {@link EventExecutorGroup} which allows registering {@link Channel}s * @param worker Special {@link EventExecutorGroup} which allows registering {@link Channel}s
* that get processed for later selection during the event loop. * that get processed for later selection during the event loop.
* @param handler the chandler for netty * @param handler the chandler for netty
* @return Map to the connection pool * @return Map to the connection pool
*/ */
public static synchronized NettyConnectPool getPool(String host, ServerPort port, public static synchronized NettyConnectPool getPool(InetSocketAddress address,
long timeout, EventLoopGroup worker, long timeout, EventLoopGroup worker,
ChannelPoolHandler handler) { ChannelPoolHandler handler) {
/* /*
* this cannot use the computeIfAbsent method directly here because put is already used in init. Details refer to https://bugs.openjdk.java.net/browse/JDK-8062841 * this cannot use the computeIfAbsent method directly here because put is already used in init. Details refer to https://bugs.openjdk.java.net/browse/JDK-8062841
*/ */
NettyConnectPool pool = getPool(host, port); NettyConnectPool pool = getPool(address);
return pool == null ? initPool(host, port, timeout, worker, handler) : pool; return pool == null ? initPool(address, timeout, worker, handler) : pool;
} }
/** /**
* Disconnect a connection connectPoolMapping. This must take effect at the same time as the connection pool is closed * Disconnect a connection connectPoolMapping. This must take effect at the same time as the connection pool is closed
* *
* @param host host * @param address the InetSocketAddress
* @param port port
*/ */
public static void remove(String host, ServerPort port) { public static void remove(InetSocketAddress address) {
connectPoolMap.remove(getKey(host, port)); connectPoolMap.remove(getKey(address));
} }
/** /**

@ -20,20 +20,19 @@ package cn.hippo4j.rpc.support;
import cn.hippo4j.common.toolkit.IdUtil; import cn.hippo4j.common.toolkit.IdUtil;
import cn.hippo4j.common.web.exception.IllegalException; import cn.hippo4j.common.web.exception.IllegalException;
import cn.hippo4j.rpc.client.Client; import cn.hippo4j.rpc.client.Client;
import cn.hippo4j.rpc.client.NettyClientConnection; import cn.hippo4j.rpc.exception.ConnectionException;
import cn.hippo4j.rpc.client.RPCClient; import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.model.DefaultRequest; import cn.hippo4j.rpc.model.DefaultRequest;
import cn.hippo4j.rpc.model.Request; import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response; import cn.hippo4j.rpc.model.Response;
import io.netty.channel.pool.ChannelPoolHandler;
import lombok.AccessLevel; import lombok.AccessLevel;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy; import java.lang.reflect.Proxy;
import java.util.HashMap; import java.net.InetSocketAddress;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** /**
* Add a proxy for the request, {@link Proxy} and {@link InvocationHandler} * Add a proxy for the request, {@link Proxy} and {@link InvocationHandler}
@ -42,31 +41,77 @@ import java.util.Map;
public class NettyProxyCenter { public class NettyProxyCenter {
// cache // cache
static Map<Class<?>, Object> map = new HashMap<>(); static Map<String, Object> map = new ConcurrentHashMap<>();
/** /**
* A proxy object for PRC is obtained through an interface * A proxy object for PRC is obtained through an interface
* *
* @param cls The interface type * @param cls The interface type
* @param host Request the address * @param address address
* @param port port
* @param <T> Object type * @param <T> Object type
* @param handler the pool handler for netty * @param handler the pool handler for netty
* @return Proxy objects * @return Proxy objects
*/ */
public static <T> T getProxy(Class<T> cls, String host, ServerPort port, ChannelPoolHandler handler) { @SuppressWarnings("unchecked")
NettyClientConnection connection = new NettyClientConnection(host, port, handler); public static <T> T getProxy(Class<T> cls, InetSocketAddress address, NettyClientPoolHandler handler) {
Client rpcClient = new RPCClient(connection); Client client = NettyClientSupport.getClient(address, handler);
return getProxy(rpcClient, cls, host, port); String s = address + cls.getName();
Object o = map.get(s);
if (o != null) {
return (T) o;
}
return createProxy(client, cls, address);
}
/**
* A proxy object for PRC is obtained through an interface
*
* @param cls The interface type
* @param address address String
* @param <T> Object type
* @return Proxy objects
*/
@SuppressWarnings("unchecked")
public static <T> T getProxy(Class<T> cls, String address) {
String[] addressStr = address.split(":");
if (addressStr.length < 2) {
throw new ConnectionException("Failed to connect to the server because the IP address is invalid. Procedure");
}
InetSocketAddress socketAddress = InetSocketAddress.createUnresolved(addressStr[0], Integer.parseInt(addressStr[1]));
String s = socketAddress + cls.getName();
Object o = map.get(s);
if (o != null) {
return (T) o;
}
Client client = NettyClientSupport.getClient(socketAddress);
return createProxy(client, cls, socketAddress);
}
/**
* remove proxy object
*
* @param cls the class
* @param address address String
*/
public static void removeProxy(Class<?> cls, String address) {
String[] addressStr = address.split(":");
if (addressStr.length < 2) {
throw new ConnectionException("Failed to connect to the server because the IP address is invalid. Procedure");
}
InetSocketAddress socketAddress = InetSocketAddress.createUnresolved(addressStr[0], Integer.parseInt(addressStr[1]));
String s = socketAddress + cls.getName();
NettyClientSupport.closeClient(socketAddress);
map.remove(s);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static <T> T getProxy(Client client, Class<T> cls, String host, ServerPort port) { public static <T> T createProxy(Client client, Class<T> cls, InetSocketAddress address) {
boolean b = cls.isInterface(); boolean b = cls.isInterface();
if (!b) { if (!b) {
throw new IllegalException(cls.getName() + "is not a Interface"); throw new IllegalException(cls.getName() + "is not a Interface");
} }
Object o = map.get(cls); String s = address.toString() + cls.getName();
Object o = map.get(s);
if (o != null) { if (o != null) {
return (T) o; return (T) o;
} }
@ -76,7 +121,7 @@ public class NettyProxyCenter {
(proxy, method, args) -> { (proxy, method, args) -> {
String clsName = cls.getName(); String clsName = cls.getName();
String methodName = method.getName(); String methodName = method.getName();
String key = host + port + clsName + methodName + IdUtil.simpleUUID(); String key = address + clsName + methodName + IdUtil.simpleUUID();
Class<?>[] parameterTypes = method.getParameterTypes(); Class<?>[] parameterTypes = method.getParameterTypes();
Request request = new DefaultRequest(key, clsName, methodName, parameterTypes, args); Request request = new DefaultRequest(key, clsName, methodName, parameterTypes, args);
Response response = client.connection(request); Response response = client.connection(request);
@ -88,7 +133,7 @@ public class NettyProxyCenter {
} }
return response.getObj(); return response.getObj();
}); });
map.put(cls, obj); map.put(s, obj);
return obj; return obj;
} }
} }

@ -22,7 +22,7 @@ import cn.hippo4j.rpc.discovery.DefaultInstance;
import cn.hippo4j.rpc.discovery.ServerPort; import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.HandlerManager; import cn.hippo4j.rpc.handler.HandlerManager;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler; import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.rpc.server.AbstractNettyServerConnection; import cn.hippo4j.rpc.server.NettyServerConnection;
import cn.hippo4j.rpc.server.RPCServer; import cn.hippo4j.rpc.server.RPCServer;
import cn.hippo4j.rpc.server.Server; import cn.hippo4j.rpc.server.Server;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
@ -36,6 +36,10 @@ import java.util.List;
* This is a server-side build class that allows you to quickly prepare data on the server side and start the server side.<br> * This is a server-side build class that allows you to quickly prepare data on the server side and start the server side.<br>
* <p> * <p>
* The composite pattern is adopted, which means that it is itself a server-side implementation, so it is stateless. * The composite pattern is adopted, which means that it is itself a server-side implementation, so it is stateless.
*
* @see RPCServer
* @see NettyServerConnection
* @see NettyClientSupport
*/ */
public class NettyServerSupport implements Server { public class NettyServerSupport implements Server {
@ -43,27 +47,27 @@ public class NettyServerSupport implements Server {
* The interface that the server side can call, * The interface that the server side can call,
* All the methods in the interface are brokered during initialization * All the methods in the interface are brokered during initialization
*/ */
List<Class<?>> classes; protected List<Class<?>> classes;
/** /**
* Extract the port number of the web container, * Extract the port number of the web container,
* which is the port information exposed by the server * which is the port information exposed by the server
*/ */
ServerPort serverPort; protected ServerPort serverPort;
/** /**
* ChannelHandler * ChannelHandler
*/ */
HandlerManager<ChannelHandler> handlerManager; protected HandlerManager<ChannelHandler> handlerManager;
Server server; protected Server server;
public NettyServerSupport(ServerPort serverPort, Class<?>... classes) { public NettyServerSupport(ServerPort serverPort, Class<?>... classes) {
this(serverPort, new AbstractNettyServerConnection(), classes); this(serverPort, new NettyServerConnection(), classes);
} }
public NettyServerSupport(ServerPort serverPort, List<Class<?>> classes) { public NettyServerSupport(ServerPort serverPort, List<Class<?>> classes) {
this(serverPort, new AbstractNettyServerConnection(), classes); this(serverPort, new NettyServerConnection(), classes);
} }
public NettyServerSupport(ServerPort serverPort, HandlerManager<ChannelHandler> handlerManager, Class<?>... classes) { public NettyServerSupport(ServerPort serverPort, HandlerManager<ChannelHandler> handlerManager, Class<?>... classes) {
@ -82,16 +86,16 @@ public class NettyServerSupport implements Server {
* Only interfaces are registered during registration. Classes and abstract classes are not registered. * Only interfaces are registered during registration. Classes and abstract classes are not registered.
* If no processor is available, a default processor is provided * If no processor is available, a default processor is provided
*/ */
private void initServer() { protected void initServer() {
// Register the interface that can be invoked // Register the interface that can be invoked
classes.stream().filter(Class::isInterface) classes.stream().filter(Class::isInterface)
.forEach(cls -> ClassRegistry.put(cls.getName(), cls)); .forEach(cls -> ClassRegistry.put(cls.getName(), cls));
AbstractNettyServerConnection connection = (handlerManager instanceof AbstractNettyServerConnection) NettyServerConnection connection = (handlerManager instanceof NettyServerConnection)
? (AbstractNettyServerConnection) handlerManager ? (NettyServerConnection) handlerManager
: new AbstractNettyServerConnection(); : new NettyServerConnection();
// Assign a default handler if no handler exists // Assign a default handler if no handler exists
if (connection.isEmpty()) { if (connection.isEmpty()) {
connection.addLast(new NettyServerTakeHandler(new DefaultInstance())); connection.addFirst(new NettyServerTakeHandler(new DefaultInstance()));
} }
server = new RPCServer(connection, serverPort); server = new RPCServer(connection, serverPort);
} }

@ -46,7 +46,9 @@ public class ResultHolder {
* @param o The result * @param o The result
*/ */
public static void put(String key, Object o) { public static void put(String key, Object o) {
if (log.isDebugEnabled()) {
log.debug("Write the result, wake up the thread"); log.debug("Write the result, wake up the thread");
}
map.put(key, o); map.put(key, o);
} }
@ -57,7 +59,9 @@ public class ResultHolder {
* @param t The Thread * @param t The Thread
*/ */
public static void putThread(String key, Thread t) { public static void putThread(String key, Thread t) {
if (log.isDebugEnabled()) {
log.debug("Write thread, waiting to wake up"); log.debug("Write thread, waiting to wake up");
}
threadMap.put(key, t); threadMap.put(key, t);
} }
@ -67,7 +71,9 @@ public class ResultHolder {
* @param key Request and response keys * @param key Request and response keys
*/ */
public static synchronized void wake(String key) { public static synchronized void wake(String key) {
if (log.isDebugEnabled()) {
log.debug("The future has been fetched, wake up the thread"); log.debug("The future has been fetched, wake up the thread");
}
Thread thread = threadMap.remove(key); Thread thread = threadMap.remove(key);
LockSupport.unpark(thread); LockSupport.unpark(thread);
} }
@ -82,7 +88,9 @@ public class ResultHolder {
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static <T> T get(String key) { public static <T> T get(String key) {
if (log.isDebugEnabled()) {
log.debug("Get the future"); log.debug("Get the future");
}
return (T) map.remove(key); return (T) map.remove(key);
} }

@ -17,26 +17,26 @@
package cn.hippo4j.rpc.client; package cn.hippo4j.rpc.client;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.rpc.discovery.ClassRegistry;
import cn.hippo4j.rpc.discovery.DefaultInstance;
import cn.hippo4j.rpc.discovery.Instance;
import cn.hippo4j.rpc.discovery.ServerPort; import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.AbstractNettyClientPoolHandler; import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler; import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler; import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.rpc.model.DefaultRequest; import cn.hippo4j.rpc.model.DefaultRequest;
import cn.hippo4j.rpc.model.Request; import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response; import cn.hippo4j.rpc.model.Response;
import cn.hippo4j.rpc.server.AbstractNettyServerConnection; import cn.hippo4j.rpc.server.NettyServerConnection;
import cn.hippo4j.rpc.server.RPCServer; import cn.hippo4j.rpc.server.RPCServer;
import cn.hippo4j.rpc.server.ServerConnection; import cn.hippo4j.rpc.server.ServerConnection;
import cn.hippo4j.rpc.discovery.ClassRegistry;
import cn.hippo4j.rpc.discovery.DefaultInstance;
import cn.hippo4j.rpc.discovery.Instance;
import io.netty.channel.pool.ChannelPoolHandler; import io.netty.channel.pool.ChannelPoolHandler;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CompletableFuture; import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
public class RPCClientTest { public class RPCClientTest {
@ -44,6 +44,9 @@ public class RPCClientTest {
ServerPort port = new TestServerPort(); ServerPort port = new TestServerPort();
ServerPort portTest = new TestPortServerPort(); ServerPort portTest = new TestPortServerPort();
/**
* This test case can be overridden under the handler and coder packages
*/
@Test @Test
public void connection() throws IOException { public void connection() throws IOException {
Class<CallManager> cls = CallManager.class; Class<CallManager> cls = CallManager.class;
@ -52,31 +55,31 @@ public class RPCClientTest {
// The mode connection was denied when the server was started on the specified port // The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance(); Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new AbstractNettyServerConnection(handler); ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(connection, port); RPCServer rpcServer = new RPCServer(connection, portTest);
CompletableFuture.runAsync(rpcServer::bind); rpcServer.bind();
try { while (!rpcServer.isActive()) {
TimeUnit.SECONDS.sleep(3); ThreadUtil.sleep(100L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} }
ChannelPoolHandler channelPoolHandler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler()); InetSocketAddress address = InetSocketAddress.createUnresolved(host, portTest.getPort());
NettyClientConnection clientConnection = new NettyClientConnection(host, port, channelPoolHandler); ChannelPoolHandler channelPoolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
NettyClientConnection clientConnection = new NettyClientConnection(address, channelPoolHandler);
RPCClient rpcClient = new RPCClient(clientConnection); RPCClient rpcClient = new RPCClient(clientConnection);
Request request = new DefaultRequest("127.0.0.18888", className, "call", null, null); Class<?>[] classes = new Class<?>[2];
for (int i = 0; i < 100; i++) { classes[0] = Integer.class;
classes[1] = Integer.class;
Object[] objects = new Object[2];
objects[0] = 1;
objects[1] = 2;
Request request = new DefaultRequest("127.0.0.18889", className, "callTest", classes, objects);
Response response = rpcClient.connection(request); Response response = rpcClient.connection(request);
boolean active = rpcClient.isActive(); boolean active = rpcClient.isActive();
Assert.assertTrue(active); Assert.assertTrue(active);
Assert.assertEquals(response.getObj(), 1); Assert.assertEquals(response.getObj(), 3);
}
rpcClient.close(); rpcClient.close();
rpcServer.close(); rpcServer.close();
} }
/**
* This test case can be overridden under the handler and coder packages
*/
@Test @Test
public void connectionTest() throws IOException { public void connectionTest() throws IOException {
Class<CallManager> cls = CallManager.class; Class<CallManager> cls = CallManager.class;
@ -85,28 +88,23 @@ public class RPCClientTest {
// The mode connection was denied when the server was started on the specified port // The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance(); Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new AbstractNettyServerConnection(handler); ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(connection, portTest); RPCServer rpcServer = new RPCServer(connection, port);
CompletableFuture.runAsync(rpcServer::bind); rpcServer.bind();
try { while (!rpcServer.isActive()) {
TimeUnit.SECONDS.sleep(3); ThreadUtil.sleep(100L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} }
ChannelPoolHandler channelPoolHandler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler()); InetSocketAddress address = InetSocketAddress.createUnresolved(host, port.getPort());
NettyClientConnection clientConnection = new NettyClientConnection(host, portTest, channelPoolHandler); ChannelPoolHandler channelPoolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
ClientConnection clientConnection = new NettyClientConnection(address, channelPoolHandler);
RPCClient rpcClient = new RPCClient(clientConnection); RPCClient rpcClient = new RPCClient(clientConnection);
Class<?>[] classes = new Class<?>[2]; Request request = new DefaultRequest("127.0.0.18888", className, "call", null, null);
classes[0] = Integer.class; for (int i = 0; i < 100; i++) {
classes[1] = Integer.class;
Object[] objects = new Object[2];
objects[0] = 1;
objects[1] = 2;
Request request = new DefaultRequest("127.0.0.18889", className, "callTest", classes, objects);
Response response = rpcClient.connection(request); Response response = rpcClient.connection(request);
boolean active = rpcClient.isActive(); boolean active = rpcClient.isActive();
Assert.assertTrue(active); Assert.assertTrue(active);
Assert.assertEquals(response.getObj(), 3); Assert.assertEquals(response.getObj(), 1);
}
rpcClient.close(); rpcClient.close();
rpcServer.close(); rpcServer.close();
} }

@ -17,10 +17,11 @@
package cn.hippo4j.rpc.handler; package cn.hippo4j.rpc.handler;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.rpc.client.NettyClientConnection; import cn.hippo4j.rpc.client.NettyClientConnection;
import cn.hippo4j.rpc.client.RPCClient; import cn.hippo4j.rpc.client.RPCClient;
import cn.hippo4j.rpc.discovery.*; import cn.hippo4j.rpc.discovery.*;
import cn.hippo4j.rpc.server.AbstractNettyServerConnection; import cn.hippo4j.rpc.server.NettyServerConnection;
import cn.hippo4j.rpc.server.RPCServer; import cn.hippo4j.rpc.server.RPCServer;
import cn.hippo4j.rpc.support.NettyProxyCenter; import cn.hippo4j.rpc.support.NettyProxyCenter;
import io.netty.channel.pool.ChannelPoolHandler; import io.netty.channel.pool.ChannelPoolHandler;
@ -28,8 +29,7 @@ import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CompletableFuture; import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
public class ConnectHandlerTest { public class ConnectHandlerTest {
@ -38,21 +38,21 @@ public class ConnectHandlerTest {
// server // server
Class<InstanceServerLoader> cls = InstanceServerLoader.class; Class<InstanceServerLoader> cls = InstanceServerLoader.class;
ClassRegistry.put(cls.getName(), cls); ClassRegistry.put(cls.getName(), cls);
ServerPort port = () -> 8891; ServerPort port = () -> 8892;
Instance instance = new DefaultInstance(); Instance instance = new DefaultInstance();
NettyServerTakeHandler serverHandler = new NettyServerTakeHandler(instance); NettyServerTakeHandler serverHandler = new NettyServerTakeHandler(instance);
AbstractNettyServerConnection connection = new AbstractNettyServerConnection(serverHandler); NettyServerConnection connection = new NettyServerConnection(serverHandler);
RPCServer rpcServer = new RPCServer(connection, port); RPCServer rpcServer = new RPCServer(connection, port);
CompletableFuture.runAsync(rpcServer::bind); rpcServer.bind();
try { while (!rpcServer.isActive()) {
TimeUnit.SECONDS.sleep(3); ThreadUtil.sleep(100L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} }
ChannelPoolHandler channelPoolHandler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler()); InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", port.getPort());
NettyClientConnection clientConnection = new NettyClientConnection("localhost", port, channelPoolHandler); ChannelPoolHandler channelPoolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
NettyClientConnection clientConnection = new NettyClientConnection(address, channelPoolHandler);
RPCClient rpcClient = new RPCClient(clientConnection); RPCClient rpcClient = new RPCClient(clientConnection);
InstanceServerLoader loader = NettyProxyCenter.getProxy(rpcClient, cls, "localhost", port);
InstanceServerLoader loader = NettyProxyCenter.createProxy(rpcClient, cls, address);
String name = loader.getName(); String name = loader.getName();
Assert.assertEquals("name", name); Assert.assertEquals("name", name);
rpcClient.close(); rpcClient.close();

@ -28,7 +28,7 @@ public class NettyClientPoolHandlerTest {
TestHandler handler = new TestHandler(); TestHandler handler = new TestHandler();
long order = 0; long order = 0;
String name = "Test"; String name = "Test";
AbstractNettyClientPoolHandler poolHandler = new AbstractNettyClientPoolHandler(); NettyClientPoolHandler poolHandler = new NettyClientPoolHandler();
HandlerManager.HandlerEntity<ChannelHandler> entity = poolHandler.getHandlerEntity(order, handler, name); HandlerManager.HandlerEntity<ChannelHandler> entity = poolHandler.getHandlerEntity(order, handler, name);
Assert.assertEquals(entity.getName(), name); Assert.assertEquals(entity.getName(), name);
Assert.assertEquals(entity.getOrder(), order); Assert.assertEquals(entity.getOrder(), order);
@ -43,7 +43,7 @@ public class NettyClientPoolHandlerTest {
TestHandler handler1 = new TestHandler(); TestHandler handler1 = new TestHandler();
long order1 = 1; long order1 = 1;
String name1 = "Test1"; String name1 = "Test1";
AbstractNettyClientPoolHandler poolHandler = new AbstractNettyClientPoolHandler(); NettyClientPoolHandler poolHandler = new NettyClientPoolHandler();
HandlerManager.HandlerEntity<ChannelHandler> entity = poolHandler.getHandlerEntity(order, handler, name); HandlerManager.HandlerEntity<ChannelHandler> entity = poolHandler.getHandlerEntity(order, handler, name);
HandlerManager.HandlerEntity<ChannelHandler> entity1 = poolHandler.getHandlerEntity(order1, handler1, name1); HandlerManager.HandlerEntity<ChannelHandler> entity1 = poolHandler.getHandlerEntity(order1, handler1, name1);
int compare = entity.compareTo(entity1); int compare = entity.compareTo(entity1);
@ -52,7 +52,7 @@ public class NettyClientPoolHandlerTest {
@Test @Test
public void addLast() { public void addLast() {
AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler(); NettyClientPoolHandler handler = new NettyClientPoolHandler();
Assert.assertTrue(handler.isEmpty()); Assert.assertTrue(handler.isEmpty());
handler.addLast(new TestHandler()); handler.addLast(new TestHandler());
Assert.assertFalse(handler.isEmpty()); Assert.assertFalse(handler.isEmpty());
@ -60,7 +60,7 @@ public class NettyClientPoolHandlerTest {
@Test @Test
public void addFirst() { public void addFirst() {
AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler(); NettyClientPoolHandler handler = new NettyClientPoolHandler();
Assert.assertTrue(handler.isEmpty()); Assert.assertTrue(handler.isEmpty());
handler.addFirst(new TestHandler()); handler.addFirst(new TestHandler());
Assert.assertFalse(handler.isEmpty()); Assert.assertFalse(handler.isEmpty());
@ -68,7 +68,7 @@ public class NettyClientPoolHandlerTest {
@Test @Test
public void testAddLast() { public void testAddLast() {
AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler(); NettyClientPoolHandler handler = new NettyClientPoolHandler();
Assert.assertTrue(handler.isEmpty()); Assert.assertTrue(handler.isEmpty());
handler.addLast("Test", new TestHandler()); handler.addLast("Test", new TestHandler());
Assert.assertFalse(handler.isEmpty()); Assert.assertFalse(handler.isEmpty());
@ -76,7 +76,7 @@ public class NettyClientPoolHandlerTest {
@Test @Test
public void testAddFirst() { public void testAddFirst() {
AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler(); NettyClientPoolHandler handler = new NettyClientPoolHandler();
Assert.assertTrue(handler.isEmpty()); Assert.assertTrue(handler.isEmpty());
handler.addFirst("Test", new TestHandler()); handler.addFirst("Test", new TestHandler());
Assert.assertFalse(handler.isEmpty()); Assert.assertFalse(handler.isEmpty());

@ -25,7 +25,7 @@ public class NettyServerConnectionTest {
@Test @Test
public void addLast() { public void addLast() {
AbstractNettyServerConnection connection = new AbstractNettyServerConnection(); NettyServerConnection connection = new NettyServerConnection();
Assert.assertTrue(connection.isEmpty()); Assert.assertTrue(connection.isEmpty());
connection.addLast(new TestHandler()); connection.addLast(new TestHandler());
Assert.assertFalse(connection.isEmpty()); Assert.assertFalse(connection.isEmpty());
@ -33,7 +33,7 @@ public class NettyServerConnectionTest {
@Test @Test
public void addFirst() { public void addFirst() {
AbstractNettyServerConnection connection = new AbstractNettyServerConnection(); NettyServerConnection connection = new NettyServerConnection();
Assert.assertTrue(connection.isEmpty()); Assert.assertTrue(connection.isEmpty());
connection.addFirst(new TestHandler()); connection.addFirst(new TestHandler());
Assert.assertFalse(connection.isEmpty()); Assert.assertFalse(connection.isEmpty());
@ -41,7 +41,7 @@ public class NettyServerConnectionTest {
@Test @Test
public void testAddLast() { public void testAddLast() {
AbstractNettyServerConnection connection = new AbstractNettyServerConnection(); NettyServerConnection connection = new NettyServerConnection();
Assert.assertTrue(connection.isEmpty()); Assert.assertTrue(connection.isEmpty());
connection.addLast("Test", new TestHandler()); connection.addLast("Test", new TestHandler());
Assert.assertFalse(connection.isEmpty()); Assert.assertFalse(connection.isEmpty());
@ -49,7 +49,7 @@ public class NettyServerConnectionTest {
@Test @Test
public void testAddFirst() { public void testAddFirst() {
AbstractNettyServerConnection connection = new AbstractNettyServerConnection(); NettyServerConnection connection = new NettyServerConnection();
Assert.assertTrue(connection.isEmpty()); Assert.assertTrue(connection.isEmpty());
connection.addFirst("Test", new TestHandler()); connection.addFirst("Test", new TestHandler());
Assert.assertFalse(connection.isEmpty()); Assert.assertFalse(connection.isEmpty());

@ -17,34 +17,32 @@
package cn.hippo4j.rpc.server; package cn.hippo4j.rpc.server;
import cn.hippo4j.rpc.discovery.ServerPort; import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.rpc.discovery.DefaultInstance; import cn.hippo4j.rpc.discovery.DefaultInstance;
import cn.hippo4j.rpc.discovery.Instance; import cn.hippo4j.rpc.discovery.Instance;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class RPCServerTest { public class RPCServerTest {
public static ServerPort port = new TestServerPort(); public static ServerPort port = new TestServerPort();
public static ServerPort portTest = new ServerPortTest();
@Test @Test
public void bind() throws IOException { public void bind() throws IOException {
Instance instance = new DefaultInstance(); Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new AbstractNettyServerConnection(handler); ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(connection, port); RPCServer rpcServer = new RPCServer(connection, port);
CompletableFuture.runAsync(rpcServer::bind); rpcServer.bind();
try { while (!rpcServer.isActive()) {
TimeUnit.SECONDS.sleep(3); ThreadUtil.sleep(100L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} }
boolean active = rpcServer.isActive(); boolean active = rpcServer.isActive();
Assert.assertTrue(active); Assert.assertTrue(active);
@ -59,22 +57,32 @@ public class RPCServerTest {
EventLoopGroup leader = new NioEventLoopGroup(); EventLoopGroup leader = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new AbstractNettyServerConnection(leader, worker, handler); ServerConnection connection = new NettyServerConnection(leader, worker, handler);
RPCServer rpcServer = new RPCServer(connection, port); RPCServer rpcServer = new RPCServer(connection, portTest);
CompletableFuture.runAsync(rpcServer::bind); rpcServer.bind();
try { while (!rpcServer.isActive()) {
TimeUnit.SECONDS.sleep(3); ThreadUtil.sleep(100L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} }
boolean active = rpcServer.isActive();
Assert.assertTrue(active);
rpcServer.close(); rpcServer.close();
boolean serverActive = rpcServer.isActive();
Assert.assertFalse(serverActive);
} }
static class TestServerPort implements ServerPort { static class TestServerPort implements ServerPort {
@Override @Override
public int getPort() { public int getPort() {
return 8888; return 8893;
}
}
static class ServerPortTest implements ServerPort {
@Override
public int getPort() {
return 8894;
} }
} }
} }

@ -18,7 +18,7 @@
package cn.hippo4j.rpc.support; package cn.hippo4j.rpc.support;
import cn.hippo4j.rpc.discovery.ServerPort; import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.AbstractNettyClientPoolHandler; import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler; import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
@ -27,6 +27,8 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.net.InetSocketAddress;
public class NettyConnectPoolHolderTest { public class NettyConnectPoolHolderTest {
String host = "127.0.0.1"; String host = "127.0.0.1";
@ -38,34 +40,37 @@ public class NettyConnectPoolHolderTest {
@Test @Test
public void createPool() { public void createPool() {
AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler()); NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, handler); InetSocketAddress address = InetSocketAddress.createUnresolved(host, port.getPort());
NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port); NettyConnectPool pool = new NettyConnectPool(address, maxCount, timeout, group, cls, handler);
NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(address);
Assert.assertEquals(pool, connectPool); Assert.assertEquals(pool, connectPool);
NettyConnectPoolHolder.clear(); NettyConnectPoolHolder.clear();
NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(host, port); NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(address);
Assert.assertNull(connectPool1); Assert.assertNull(connectPool1);
} }
@Test @Test
public void testGetPool() { public void testGetPool() {
AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler()); NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port, timeout, group, handler); InetSocketAddress address = InetSocketAddress.createUnresolved(host, port.getPort());
NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(host, port); NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(address, timeout, group, handler);
NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(address);
Assert.assertEquals(connectPool1, connectPool); Assert.assertEquals(connectPool1, connectPool);
NettyConnectPoolHolder.clear(); NettyConnectPoolHolder.clear();
NettyConnectPool connectPool2 = NettyConnectPoolHolder.getPool(host, port); NettyConnectPool connectPool2 = NettyConnectPoolHolder.getPool(address);
Assert.assertNull(connectPool2); Assert.assertNull(connectPool2);
} }
@Test @Test
public void remove() { public void remove() {
AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler()); NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port, timeout, group, handler); InetSocketAddress address = InetSocketAddress.createUnresolved(host, port.getPort());
NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(host, port); NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(address, timeout, group, handler);
NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(address);
Assert.assertEquals(connectPool1, connectPool); Assert.assertEquals(connectPool1, connectPool);
NettyConnectPoolHolder.remove(host, port); NettyConnectPoolHolder.remove(address);
NettyConnectPool connectPool2 = NettyConnectPoolHolder.getPool(host, port); NettyConnectPool connectPool2 = NettyConnectPoolHolder.getPool(address);
Assert.assertNull(connectPool2); Assert.assertNull(connectPool2);
} }
@ -73,7 +78,7 @@ public class NettyConnectPoolHolderTest {
@Override @Override
public int getPort() { public int getPort() {
return 8888; return 8895;
} }
} }
} }

@ -17,13 +17,14 @@
package cn.hippo4j.rpc.support; package cn.hippo4j.rpc.support;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.rpc.discovery.DefaultInstance; import cn.hippo4j.rpc.discovery.DefaultInstance;
import cn.hippo4j.rpc.discovery.Instance; import cn.hippo4j.rpc.discovery.Instance;
import cn.hippo4j.rpc.discovery.ServerPort; import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.AbstractNettyClientPoolHandler; import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler; import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler; import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.rpc.server.AbstractNettyServerConnection; import cn.hippo4j.rpc.server.NettyServerConnection;
import cn.hippo4j.rpc.server.RPCServer; import cn.hippo4j.rpc.server.RPCServer;
import cn.hippo4j.rpc.server.ServerConnection; import cn.hippo4j.rpc.server.ServerConnection;
import io.netty.channel.Channel; import io.netty.channel.Channel;
@ -35,8 +36,7 @@ import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CompletableFuture; import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
public class NettyConnectPoolTest { public class NettyConnectPoolTest {
@ -52,17 +52,16 @@ public class NettyConnectPoolTest {
// The mode connection was denied when the server was started on the specified port // The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance(); Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new AbstractNettyServerConnection(handler); ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(connection, port); RPCServer rpcServer = new RPCServer(connection, port);
CompletableFuture.runAsync(rpcServer::bind); rpcServer.bind();
// Given the delay in starting the server, wait here // Given the delay in starting the server, wait here
try { while (!rpcServer.isActive()) {
TimeUnit.SECONDS.sleep(3); ThreadUtil.sleep(100L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} }
AbstractNettyClientPoolHandler poolHandler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler()); InetSocketAddress address = InetSocketAddress.createUnresolved(host, port.getPort());
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, poolHandler); NettyClientPoolHandler poolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool pool = new NettyConnectPool(address, maxCount, timeout, group, cls, poolHandler);
Channel acquire = pool.acquire(timeout); Channel acquire = pool.acquire(timeout);
Assert.assertNotNull(acquire); Assert.assertNotNull(acquire);
pool.release(acquire); pool.release(acquire);
@ -74,17 +73,16 @@ public class NettyConnectPoolTest {
// The mode connection was denied when the server was started on the specified port // The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance(); Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new AbstractNettyServerConnection(handler); ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(connection, port); RPCServer rpcServer = new RPCServer(connection, port);
CompletableFuture.runAsync(rpcServer::bind); rpcServer.bind();
// Given the delay in starting the server, wait here // Given the delay in starting the server, wait here
try { while (!rpcServer.isActive()) {
TimeUnit.SECONDS.sleep(3); ThreadUtil.sleep(100L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} }
AbstractNettyClientPoolHandler poolHandler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler()); InetSocketAddress address = InetSocketAddress.createUnresolved(host, port.getPort());
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, poolHandler); NettyClientPoolHandler poolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool pool = new NettyConnectPool(address, maxCount, timeout, group, cls, poolHandler);
Future<Channel> acquire = pool.acquire(); Future<Channel> acquire = pool.acquire();
Assert.assertNotNull(acquire); Assert.assertNotNull(acquire);
rpcServer.close(); rpcServer.close();
@ -95,18 +93,16 @@ public class NettyConnectPoolTest {
// The mode connection was denied when the server was started on the specified port // The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance(); Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new AbstractNettyServerConnection(handler); ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(connection, port); RPCServer rpcServer = new RPCServer(connection, port);
CompletableFuture.runAsync(rpcServer::bind); rpcServer.bind();
// Given the delay in starting the server, wait here // Given the delay in starting the server, wait here
try { while (!rpcServer.isActive()) {
TimeUnit.SECONDS.sleep(3); ThreadUtil.sleep(100L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} }
InetSocketAddress address = InetSocketAddress.createUnresolved(host, port.getPort());
AbstractNettyClientPoolHandler poolHandler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler()); NettyClientPoolHandler poolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, poolHandler); NettyConnectPool pool = new NettyConnectPool(address, maxCount, timeout, group, cls, poolHandler);
Channel acquire = pool.acquire(timeout); Channel acquire = pool.acquire(timeout);
Assert.assertNotNull(acquire); Assert.assertNotNull(acquire);
pool.release(acquire); pool.release(acquire);
@ -118,7 +114,7 @@ public class NettyConnectPoolTest {
@Override @Override
public int getPort() { public int getPort() {
return 8888; return 8890;
} }
} }
} }

@ -19,26 +19,36 @@ package cn.hippo4j.rpc.support;
import cn.hippo4j.common.web.exception.IllegalException; import cn.hippo4j.common.web.exception.IllegalException;
import cn.hippo4j.rpc.discovery.ServerPort; import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.AbstractNettyClientPoolHandler; import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler; import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.net.InetSocketAddress;
public class NettyProxyCenterTest { public class NettyProxyCenterTest {
ServerPort port = new TestServerPort(); ServerPort port = new TestServerPort();
@Test @Test
public void getProxy() { public void getProxy() {
AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler()); InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", port.getPort());
ProxyInterface localhost = NettyProxyCenter.getProxy(ProxyInterface.class, "localhost", port, handler); NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler());
ProxyInterface localhost = NettyProxyCenter.getProxy(ProxyInterface.class, address, handler);
Assert.assertNotNull(localhost);
}
@Test
public void createProxy() {
ProxyInterface localhost = NettyProxyCenter.getProxy(ProxyInterface.class, "localhost:8894");
Assert.assertNotNull(localhost); Assert.assertNotNull(localhost);
} }
@Test(expected = IllegalException.class) @Test(expected = IllegalException.class)
public void getProxyTest() { public void getProxyTest() {
AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler()); InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", port.getPort());
ProxyClass localhost = NettyProxyCenter.getProxy(ProxyClass.class, "localhost", port, handler); NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler());
ProxyClass localhost = NettyProxyCenter.getProxy(ProxyClass.class, address, handler);
Assert.assertNotNull(localhost); Assert.assertNotNull(localhost);
} }
@ -55,7 +65,7 @@ public class NettyProxyCenterTest {
@Override @Override
public int getPort() { public int getPort() {
return 8888; return 8894;
} }
} }
} }

@ -17,24 +17,21 @@
package cn.hippo4j.rpc.support; package cn.hippo4j.rpc.support;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.rpc.discovery.InstanceServerLoader; import cn.hippo4j.rpc.discovery.InstanceServerLoader;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class NettyServerSupportTest { public class NettyServerSupportTest {
@Test @Test
public void bind() throws IOException { public void bind() throws IOException {
NettyServerSupport support = new NettyServerSupport(() -> 8890, InstanceServerLoader.class); NettyServerSupport support = new NettyServerSupport(() -> 8891, InstanceServerLoader.class);
CompletableFuture.runAsync(support::bind); support.bind();
try { while (!support.isActive()) {
TimeUnit.SECONDS.sleep(3); ThreadUtil.sleep(100L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} }
Assert.assertTrue(support.isActive()); Assert.assertTrue(support.isActive());
support.close(); support.close();

@ -1 +1,18 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
cn.hippo4j.rpc.discovery.InstanceServerLoaderImpl cn.hippo4j.rpc.discovery.InstanceServerLoaderImpl
Loading…
Cancel
Save