Rpc simplify (#1212)

* fix : Add annotations to classes of rpc module Add version: @since 1.5.1 (#1187) (#812)

* fix : Simplify the rpc module, remove unnecessary classes and methods (#1187) (#812)

* fix : Adjust unit tests against simplified code(#1187) (#812)

* fix : Adjust unit tests against simplified code(#1187) (#812)

* fix : Add new unit tests(#1187) (#812)
pull/1219/head
pizihao 2 years ago committed by GitHub
parent f1321052c1
commit 9a1089f640
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -23,12 +23,37 @@ import cn.hippo4j.rpc.model.Response;
import java.io.Closeable; import java.io.Closeable;
/** /**
* the client for RPC, Explain the role of the client in the request * <h3>CLIENT</h3>
* The highest level interface for the client, it does not care how to communicate with the server,
* nor can it know the specific connection information. The client plays the role of message producer
* in the whole connection.By sending a Request to the server ({@link Request}), the client will be
* able to communicate with the server. Wait for the server's Response ({@link Response})
* <h3>METHOD</h3>
* <ul>
* <li>{@link #connection(Request)}</li>
* <li>{@link #isActive()}</li>
* <li>{@link #close()}</li>
* </ul>
* You can usually use the client in this way:
* <pre>
* Request request = new Request();
* try(Client client = new Client()){
* Response response = client.connection(request);
* }
* </pre>
*
* <b>The client implements Closeable and supports automatic shutdown, However, you can manually
* disable it when you want to use it</b>
*
* @since 1.5.1
*/ */
public interface Client extends Closeable { public interface Client extends Closeable {
/** /**
* Start the client and try to send and receive data * Start the client and try to send and receive data
*
* @param request Request information, Requested methods and parameters
* @return response Response from server side
*/ */
Response connection(Request request); Response connection(Request request);

@ -22,7 +22,9 @@ import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response; import cn.hippo4j.rpc.model.Response;
/** /**
* Applicable to client connections * Applicable to client connections<br>
*
* @since 1.5.1
*/ */
public interface ClientConnection extends Connection { public interface ClientConnection extends Connection {

@ -22,8 +22,6 @@ import cn.hippo4j.common.web.exception.IllegalException;
import cn.hippo4j.rpc.exception.TimeOutException; import cn.hippo4j.rpc.exception.TimeOutException;
import cn.hippo4j.rpc.model.Request; import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response; import cn.hippo4j.rpc.model.Response;
import cn.hippo4j.rpc.process.ActivePostProcess;
import cn.hippo4j.rpc.process.ActiveProcessChain;
import cn.hippo4j.rpc.support.NettyConnectPool; import cn.hippo4j.rpc.support.NettyConnectPool;
import cn.hippo4j.rpc.support.NettyConnectPoolHolder; import cn.hippo4j.rpc.support.NettyConnectPoolHolder;
import cn.hippo4j.rpc.support.ResultHolder; import cn.hippo4j.rpc.support.ResultHolder;
@ -35,12 +33,13 @@ import io.netty.channel.pool.ChannelPoolHandler;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.LinkedList; import java.util.Optional;
import java.util.List;
import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.LockSupport;
/** /**
* Client implemented using netty * Client implemented using netty
*
* @since 1.5.1
*/ */
@Slf4j @Slf4j
public class NettyClientConnection implements ClientConnection { public class NettyClientConnection implements ClientConnection {
@ -51,33 +50,28 @@ public class NettyClientConnection implements ClientConnection {
*/ */
long timeout = 30000L; long timeout = 30000L;
EventLoopGroup worker = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup();
ActiveProcessChain activeProcessChain;
NettyConnectPool connectionPool; NettyConnectPool connectionPool;
ChannelFuture future; ChannelFuture future;
Channel channel; Channel channel;
public NettyClientConnection(InetSocketAddress address, public NettyClientConnection(InetSocketAddress address,
List<ActivePostProcess> activeProcesses,
ChannelPoolHandler handler) { ChannelPoolHandler handler) {
Assert.notNull(worker); Assert.notNull(worker);
this.address = address; this.address = address;
this.activeProcessChain = new ActiveProcessChain(activeProcesses);
this.connectionPool = NettyConnectPoolHolder.getPool(address, timeout, worker, handler); this.connectionPool = NettyConnectPoolHolder.getPool(address, timeout, worker, handler);
} }
public NettyClientConnection(InetSocketAddress address, ChannelPoolHandler handler) {
this(address, new LinkedList<>(), handler);
}
@Override @Override
public Response connect(Request request) { public Response connect(Request request) {
activeProcessChain.applyPreHandle(request);
this.channel = connectionPool.acquire(timeout); this.channel = connectionPool.acquire(timeout);
Response response = null; boolean debugEnabled = log.isDebugEnabled();
Response response;
try { try {
String key = request.getKey(); String key = request.getKey();
this.future = channel.writeAndFlush(request); this.future = channel.writeAndFlush(request);
log.info("Call successful, target address is {}:{}, request key is {}", address.getHostName(), address.getPort(), key); if (debugEnabled) {
log.debug("Call successful, target address is {}:{}, request key is {}", address.getHostName(), address.getPort(), key);
}
// Wait for execution to complete // Wait for execution to complete
ResultHolder.putThread(key, Thread.currentThread()); ResultHolder.putThread(key, Thread.currentThread());
LockSupport.parkNanos(timeout() * 1000000); LockSupport.parkNanos(timeout() * 1000000);
@ -85,14 +79,13 @@ public class NettyClientConnection implements ClientConnection {
if (response == null) { if (response == null) {
throw new TimeOutException("Timeout waiting for server-side response"); throw new TimeOutException("Timeout waiting for server-side response");
} }
activeProcessChain.applyPostHandle(request, response); if (debugEnabled) {
log.info("The response from {}:{} was received successfully with the response key {}.", address.getHostName(), address.getPort(), key); log.debug("The response from {}:{} was received successfully with the response key {}.", address.getHostName(), address.getPort(), key);
}
return response; return response;
} catch (Exception ex) { } catch (Exception ex) {
activeProcessChain.afterCompletion(request, response, ex);
throw new IllegalException(ex); throw new IllegalException(ex);
} finally { } finally {
activeProcessChain.afterCompletion(request, response, null);
connectionPool.release(this.channel); connectionPool.release(this.channel);
} }
} }
@ -108,13 +101,13 @@ public class NettyClientConnection implements ClientConnection {
} }
@Override @Override
public synchronized void close() { public void close() {
if (this.channel == null) { Optional.ofNullable(this.channel)
return; .ifPresent(c -> {
}
worker.shutdownGracefully(); worker.shutdownGracefully();
this.future.channel().close(); this.future.channel().close();
this.channel.close(); this.channel.close();
});
} }
@Override @Override

@ -23,7 +23,10 @@ import cn.hippo4j.rpc.model.Response;
import java.io.IOException; import java.io.IOException;
/** /**
* The client, which provides a closing mechanism, maintains a persistent connection if not closed * The client, which provides a closing mechanism, maintains a persistent connection if not closed<br>
* Delegate the method to the {@link ClientConnection} for implementation
*
* @since 1.5.1
*/ */
public class RPCClient implements Client { public class RPCClient implements Client {

@ -24,6 +24,8 @@ import java.io.OutputStream;
/** /**
* object OutputStream * object OutputStream
*
* @since 1.5.1
*/ */
public class CompactObjectOutputStream extends ObjectOutputStream { public class CompactObjectOutputStream extends ObjectOutputStream {

@ -1,50 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.coder;
import cn.hippo4j.rpc.exception.CoderException;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.serialization.ClassResolver;
import io.netty.handler.codec.serialization.ObjectDecoder;
/**
* According to the decoder for java objects implemented by ObjectDecoder,
* it is necessary to ensure that the transmitted objects can be serialized
*/
public class NettyDecoder extends ObjectDecoder {
public NettyDecoder(ClassResolver classResolver) {
super(classResolver);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) {
ByteBuf byteBuf = in.retainedDuplicate();
try {
Object o = super.decode(ctx, in);
if (o == null) {
return byteBuf;
} else {
return o;
}
} catch (Exception e) {
throw new CoderException("The encoding is abnormal, which may be caused by the failure of the transfer object to be deserialized");
}
}
}

@ -29,6 +29,8 @@ import java.io.Serializable;
/** /**
* this is a encoder, For custom gluing and unpacking<br> * this is a encoder, For custom gluing and unpacking<br>
* {@link io.netty.handler.codec.serialization.ObjectEncoder} * {@link io.netty.handler.codec.serialization.ObjectEncoder}
*
* @since 1.5.1
*/ */
public class NettyEncoder extends MessageToByteEncoder<Serializable> { public class NettyEncoder extends MessageToByteEncoder<Serializable> {

@ -25,6 +25,8 @@ import java.util.concurrent.ConcurrentHashMap;
/** /**
* the registration center for Client and Server * the registration center for Client and Server
*
* @since 1.5.1
*/ */
@NoArgsConstructor(access = AccessLevel.PRIVATE) @NoArgsConstructor(access = AccessLevel.PRIVATE)
public class ClassRegistry { public class ClassRegistry {

@ -27,6 +27,8 @@ import java.util.ServiceLoader;
* You simply create an instance of a class based on its name and specific type. * You simply create an instance of a class based on its name and specific type.
* Load through the ServiceLoader first. If the load fails, load directly through the instantiation. * Load through the ServiceLoader first. If the load fails, load directly through the instantiation.
* If it is an interface, throw an exception. This is not elegant implementation * If it is an interface, throw an exception. This is not elegant implementation
*
* @since 1.5.1
*/ */
public class DefaultInstance implements Instance { public class DefaultInstance implements Instance {

@ -22,6 +22,8 @@ import java.net.InetSocketAddress;
/** /**
* The adaptation layer of different service centers is used to know * The adaptation layer of different service centers is used to know
* the host of different services through the registration center * the host of different services through the registration center
*
* @since 1.5.1
*/ */
public interface DiscoveryAdapter { public interface DiscoveryAdapter {

@ -19,6 +19,8 @@ package cn.hippo4j.rpc.discovery;
/** /**
* Instance interface to get an instance * Instance interface to get an instance
*
* @since 1.5.1
*/ */
public interface Instance { public interface Instance {

@ -19,6 +19,8 @@ package cn.hippo4j.rpc.discovery;
/** /**
* Gets the top-level interface of the instance port * Gets the top-level interface of the instance port
*
* @since 1.5.1
*/ */
@FunctionalInterface @FunctionalInterface
public interface ServerPort { public interface ServerPort {

@ -21,6 +21,8 @@ import cn.hippo4j.common.config.ApplicationContextHolder;
/** /**
* Adapter Spring, The requested object is managed by spring * Adapter Spring, The requested object is managed by spring
*
* @since 1.5.1
*/ */
public class SpringContextInstance implements Instance { public class SpringContextInstance implements Instance {

@ -20,6 +20,8 @@ package cn.hippo4j.rpc.exception;
/** /**
* During decoding and encoding, if an exception occurs, an exception of type {@link CoderException} is thrown, * During decoding and encoding, if an exception occurs, an exception of type {@link CoderException} is thrown,
* which is not different from a {@link RuntimeException}, but is more explicit about the type of exception * which is not different from a {@link RuntimeException}, but is more explicit about the type of exception
*
* @since 1.5.1
*/ */
public class CoderException extends RuntimeException { public class CoderException extends RuntimeException {

@ -21,6 +21,8 @@ package cn.hippo4j.rpc.exception;
* If an exception occurs during the connection between the server and the client, an exception of type * If an exception occurs during the connection between the server and the client, an exception of type
* {@link ConnectionException} is thrown, which is not different from {@link RuntimeException}, but is more explicit * {@link ConnectionException} is thrown, which is not different from {@link RuntimeException}, but is more explicit
* about the type of exception * about the type of exception
*
* @since 1.5.1
*/ */
public class ConnectionException extends RuntimeException { public class ConnectionException extends RuntimeException {

@ -20,6 +20,8 @@ package cn.hippo4j.rpc.exception;
/** /**
* If there is a timeout between the server and the client, you will get a {@link TimeOutException}, * If there is a timeout between the server and the client, you will get a {@link TimeOutException},
* which is not different from {@link RuntimeException}, but it will be more explicit about the type of exception, right * which is not different from {@link RuntimeException}, but it will be more explicit about the type of exception, right
*
* @since 1.5.1
*/ */
public class TimeOutException extends RuntimeException { public class TimeOutException extends RuntimeException {

@ -26,6 +26,8 @@ import java.util.stream.Collectors;
/** /**
* Processor manager for ChannelHandler in netty * Processor manager for ChannelHandler in netty
*
* @since 1.5.1
*/ */
public abstract class AbstractNettyHandlerManager implements HandlerManager<ChannelHandler> { public abstract class AbstractNettyHandlerManager implements HandlerManager<ChannelHandler> {
@ -81,28 +83,4 @@ public abstract class AbstractNettyHandlerManager implements HandlerManager<Chan
this.handlerEntities.add(getHandlerEntity(firstIndex.getAndIncrement(), handler, name)); this.handlerEntities.add(getHandlerEntity(firstIndex.getAndIncrement(), handler, name));
return this; return this;
} }
/**
* {@inheritDoc}
*
* @param handler handler
* @return NettyHandlerManager
*/
public AbstractNettyHandlerManager addLast(ChannelHandler handler) {
Assert.notNull(handler);
this.handlerEntities.add(getHandlerEntity(lastIndex.getAndIncrement(), handler, null));
return this;
}
/**
* {@inheritDoc}
*
* @param handler handler
* @return NettyHandlerManager
*/
public AbstractNettyHandlerManager addFirst(ChannelHandler handler) {
Assert.notNull(handler);
this.handlerEntities.add(getHandlerEntity(firstIndex.getAndDecrement(), handler, null));
return this;
}
} }

@ -24,8 +24,12 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Optional;
/** /**
* the abstract base of {@link ConnectHandler} and {@link ChannelInboundHandlerAdapter} * the abstract base of {@link ConnectHandler} and {@link ChannelInboundHandlerAdapter}
*
* @since 1.5.1
*/ */
public abstract class AbstractNettyTakeHandler extends ChannelInboundHandlerAdapter implements ConnectHandler { public abstract class AbstractNettyTakeHandler extends ChannelInboundHandlerAdapter implements ConnectHandler {
@ -42,9 +46,10 @@ public abstract class AbstractNettyTakeHandler extends ChannelInboundHandlerAdap
if (channel.isActive()) { if (channel.isActive()) {
ctx.close(); ctx.close();
} }
if (cause != null) { Optional.ofNullable(cause)
.ifPresent(t -> {
throw new ConnectionException(cause); throw new ConnectionException(cause);
} });
} }
/** /**

@ -25,6 +25,7 @@ import cn.hippo4j.rpc.model.Response;
* must be specified, such as serialization and parsing, requesting and receiving * must be specified, such as serialization and parsing, requesting and receiving
* requests, and so on<br> * requests, and so on<br>
* *
* @since 1.5.1
*/ */
public interface ConnectHandler { public interface ConnectHandler {

@ -20,7 +20,14 @@ package cn.hippo4j.rpc.handler;
import java.io.Closeable; import java.io.Closeable;
/** /**
* Represents a network request connection and provides IO layer support * Represents a network request connection and provides IO layer support<br>
* <p>
* This is not a strict and stateless Connection interface, it contains the necessary
* operations that should be done in the connection. It is more like integrating the
* connection and the connection channel together, so creating {@link Connection} is
* very resource intensive, for which caching is recommended
*
* @since 1.5.1
*/ */
public interface Connection extends Closeable { public interface Connection extends Closeable {

@ -17,12 +17,16 @@
package cn.hippo4j.rpc.handler; package cn.hippo4j.rpc.handler;
import cn.hippo4j.common.web.exception.IllegalException;
import io.netty.channel.ChannelHandler;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;
/** /**
* Manage the Handler used in the processing.<br> * Manage the Handler used in the processing.<br>
* The Handler must be able to exist multiple times and be invoked once in a single execution * The Handler must be able to exist multiple times and be invoked once in a single execution
*
* @since 1.5.1
*/ */
public interface HandlerManager<T> { public interface HandlerManager<T> {
@ -42,20 +46,6 @@ public interface HandlerManager<T> {
*/ */
HandlerManager<T> addFirst(String name, T handler); HandlerManager<T> addFirst(String name, T handler);
/**
* Add handler to the end of the Handler chain, without specifying a name
*
* @param handler handler
*/
HandlerManager<T> addLast(T handler);
/**
* Adds handler to the head of the Handler chain, without specifying a name
*
* @param handler handler
*/
HandlerManager<T> addFirst(T handler);
/** /**
* Whether handler exists * Whether handler exists
* *
@ -72,6 +62,14 @@ public interface HandlerManager<T> {
* @return HandlerEntity * @return HandlerEntity
*/ */
default HandlerEntity<T> getHandlerEntity(long order, T handler, String name) { default HandlerEntity<T> getHandlerEntity(long order, T handler, String name) {
Class<?> cls = handler.getClass();
boolean b = cls.isAnnotationPresent(ChannelHandler.Sharable.class)
|| HandlerManager.class.isAssignableFrom(cls);
if (!b) {
throw new IllegalException("Join the execution of the handler must add io.netty.channel.ChannelHandler." +
"Sharable annotations, Please for the handler class " + cls.getName() + " add io.netty.channel." +
"ChannelHandler.Sharable annotation");
}
return new HandlerEntity<>(order, handler, name); return new HandlerEntity<>(order, handler, name);
} }

@ -17,7 +17,6 @@
package cn.hippo4j.rpc.handler; package cn.hippo4j.rpc.handler;
import cn.hippo4j.rpc.coder.NettyDecoder;
import cn.hippo4j.rpc.coder.NettyEncoder; import cn.hippo4j.rpc.coder.NettyEncoder;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import io.netty.channel.Channel; import io.netty.channel.Channel;
@ -26,12 +25,15 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.pool.ChannelPoolHandler; import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.List; import java.util.List;
/** /**
* Processing by the client connection pool handler to clean the buffer and define new connection properties * Processing by the client connection pool handler to clean the buffer and define new connection properties
*
* @since 1.5.1
*/ */
@Slf4j @Slf4j
public class NettyClientPoolHandler extends AbstractNettyHandlerManager implements ChannelPoolHandler { public class NettyClientPoolHandler extends AbstractNettyHandlerManager implements ChannelPoolHandler {
@ -60,22 +62,12 @@ public class NettyClientPoolHandler extends AbstractNettyHandlerManager implemen
return this; return this;
} }
@Override
public NettyClientPoolHandler addLast(ChannelHandler handler) {
super.addLast(handler);
return this;
}
@Override
public NettyClientPoolHandler addFirst(ChannelHandler handler) {
super.addFirst(handler);
return this;
}
@Override @Override
public void channelReleased(Channel ch) { public void channelReleased(Channel ch) {
ch.writeAndFlush(Unpooled.EMPTY_BUFFER); ch.writeAndFlush(Unpooled.EMPTY_BUFFER);
log.info("The connection buffer has been emptied of data"); if (log.isDebugEnabled()) {
log.debug("The connection buffer has been emptied of data");
}
} }
@Override @Override
@ -90,7 +82,7 @@ public class NettyClientPoolHandler extends AbstractNettyHandlerManager implemen
.setTcpNoDelay(false); .setTcpNoDelay(false);
ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new NettyEncoder()); pipeline.addLast(new NettyEncoder());
pipeline.addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null))); pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
this.handlerEntities.stream() this.handlerEntities.stream()
.sorted() .sorted()
.forEach(h -> { .forEach(h -> {

@ -24,6 +24,8 @@ import io.netty.channel.ChannelHandlerContext;
/** /**
* Interconnect with the netty mediation layer * Interconnect with the netty mediation layer
*
* @since 1.5.1
*/ */
@ChannelHandler.Sharable @ChannelHandler.Sharable
public class NettyClientTakeHandler extends AbstractNettyTakeHandler implements ConnectHandler { public class NettyClientTakeHandler extends AbstractNettyTakeHandler implements ConnectHandler {

@ -19,36 +19,28 @@ package cn.hippo4j.rpc.handler;
import cn.hippo4j.common.toolkit.Assert; import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.common.toolkit.ReflectUtil; import cn.hippo4j.common.toolkit.ReflectUtil;
import cn.hippo4j.rpc.process.ActivePostProcess;
import cn.hippo4j.rpc.process.ActiveProcessChain;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.DefaultResponse;
import cn.hippo4j.rpc.model.Response;
import cn.hippo4j.rpc.discovery.ClassRegistry; import cn.hippo4j.rpc.discovery.ClassRegistry;
import cn.hippo4j.rpc.discovery.Instance; import cn.hippo4j.rpc.discovery.Instance;
import cn.hippo4j.rpc.model.DefaultResponse;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.LinkedList;
import java.util.List;
/** /**
* netty adaptation layer * netty adaptation layer
*
* @since 1.5.1
*/ */
@ChannelHandler.Sharable @ChannelHandler.Sharable
public class NettyServerTakeHandler extends AbstractNettyTakeHandler implements ConnectHandler { public class NettyServerTakeHandler extends AbstractNettyTakeHandler implements ConnectHandler {
ActiveProcessChain activeProcessChain;
Instance instance; Instance instance;
public NettyServerTakeHandler(List<ActivePostProcess> processes, Instance instance) {
this.activeProcessChain = new ActiveProcessChain(processes);
this.instance = instance;
}
public NettyServerTakeHandler(Instance instance) { public NettyServerTakeHandler(Instance instance) {
this(new LinkedList<>(), instance); this.instance = instance;
} }
@Override @Override
@ -63,24 +55,17 @@ public class NettyServerTakeHandler extends AbstractNettyTakeHandler implements
@Override @Override
public Response sendHandler(Request request) { public Response sendHandler(Request request) {
if (!activeProcessChain.applyPreHandle(request)) { Response response;
return null;
}
Response response = null;
try { try {
Class<?> cls = ClassRegistry.get(request.getClassName()); Class<?> cls = ClassRegistry.get(request.getClassName());
Method method = ReflectUtil.getMethodByName(cls, request.getMethodName(), request.getParameterTypes()); Method method = ReflectUtil.getMethodByName(cls, request.getMethodName(), request.getParameterTypes());
Assert.notNull(method); Assert.notNull(method);
Object invoke = ReflectUtil.invoke(instance.getInstance(cls), method, request.getParameters()); Object invoke = ReflectUtil.invoke(instance.getInstance(cls), method, request.getParameters());
response = new DefaultResponse(request.getKey(), invoke.getClass(), invoke); response = new DefaultResponse(request.getKey(), invoke.getClass(), invoke);
activeProcessChain.applyPostHandle(request, response);
return response; return response;
} catch (Exception e) { } catch (Exception e) {
response = new DefaultResponse(request.getKey(), e, e.getMessage()); response = new DefaultResponse(request.getKey(), e, e.getMessage());
activeProcessChain.afterCompletion(request, response, e);
return response; return response;
} finally {
activeProcessChain.afterCompletion(request, response, null);
} }
} }

@ -25,6 +25,8 @@ import java.util.Objects;
/** /**
* default request<br> * default request<br>
* Use the fully qualified name key of the interface and override equals and hashCode * Use the fully qualified name key of the interface and override equals and hashCode
*
* @since 1.5.1
*/ */
public final class DefaultRequest implements Request { public final class DefaultRequest implements Request {

@ -25,6 +25,8 @@ import java.util.Objects;
/** /**
* default request<br> * default request<br>
* Use the fully qualified name key of the interface and override equals and hashCode * Use the fully qualified name key of the interface and override equals and hashCode
*
* @since 1.5.1
*/ */
public class DefaultResponse implements Response { public class DefaultResponse implements Response {

@ -21,6 +21,8 @@ import java.io.Serializable;
/** /**
* request * request
*
* @since 1.5.1
*/ */
public interface Request extends Serializable { public interface Request extends Serializable {

@ -21,6 +21,8 @@ import java.io.Serializable;
/** /**
* Response * Response
*
* @since 1.5.1
*/ */
public interface Response extends Serializable { public interface Response extends Serializable {

@ -1,60 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.process;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
/**
* Callback while the connection is in progress
*/
public interface ActivePostProcess {
/**
* Client: After establishing a connection and before passing parameters<br>
* Server: Receives parameters and performs pre-call operations<br>
*
* @param request request
* @return Whether to continue the execution. If it is a client, the returned value does not affect subsequent execution
*/
default boolean preHandler(Request request) {
return true;
}
/**
* Client: Action after receiving a response<br>
* Server: performs the operation after the call<br>
*
* @param request request
* @param response response
*/
default void postHandler(Request request, Response response) {
// NO SOMETHING
}
/**
* Called when an exception or resource is cleaned
*
* @param request request
* @param response response
* @param e Exception
*/
default void afterCompletion(Request request, Response response, Exception e) {
// NO SOMETHING
}
}

@ -1,102 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.process;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* Processor chain for easier processing of processors in different scenarios<br>
* reference resources: spring HandlerExecutionChain
*
* @see ActivePostProcess
*/
@Slf4j
public final class ActiveProcessChain {
/**
* A collection of processors that will be applied to their assigned programs.
* Processors will perform different actions on different occasions for both the server and the client,
* but the execution period of that action must be the same
*/
List<ActivePostProcess> processes;
/**
* index <br>
* that identifies where the {@link ActivePostProcess#preHandler(Request)} processing is performed<br>
* This allows for the fact that some processors will add shutable operations to the class<br>
* eg: {@link java.io.Closeable}, The {@link ActivePostProcess#afterCompletion(Request, Response, Exception)}
* operation is not performed after an exception if the preprocessor is not executed
*/
int index = -1;
public ActiveProcessChain(List<ActivePostProcess> processes) {
this.processes = processes;
}
public ActiveProcessChain(ActivePostProcess... processes) {
this((processes != null ? Arrays.asList(processes) : Collections.emptyList()));
}
/**
* Apply postHandle methods of registered processes.
*/
public boolean applyPreHandle(Request request) {
for (int i = 0; i < this.processes.size(); i++) {
ActivePostProcess handle = processes.get(i);
if (!handle.preHandler(request)) {
afterCompletion(request, null, null);
return false;
}
this.index = i;
}
return true;
}
/**
* Apply postHandle methods of registered processes.
*/
public void applyPostHandle(Request request, Response response) {
for (int i = processes.size() - 1; i >= 0; i--) {
ActivePostProcess handle = processes.get(i);
handle.postHandler(request, response);
}
}
/**
* Trigger afterCompletion callbacks on the mapped ActivePostProcess.
* Will just invoke afterCompletion for all interceptors whose preHandle invocation
* has successfully completed and returned true.
*/
public void afterCompletion(Request request, Response response, Exception ex) {
for (int i = this.index; i >= 0; i--) {
ActivePostProcess handle = processes.get(i);
try {
handle.afterCompletion(request, response, ex);
} catch (Throwable e) {
log.error("HandlerInterceptor.afterCompletion threw exception", e);
}
}
}
}

@ -18,7 +18,6 @@
package cn.hippo4j.rpc.server; package cn.hippo4j.rpc.server;
import cn.hippo4j.common.toolkit.Assert; import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.rpc.coder.NettyDecoder;
import cn.hippo4j.rpc.coder.NettyEncoder; import cn.hippo4j.rpc.coder.NettyEncoder;
import cn.hippo4j.rpc.discovery.ServerPort; import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.exception.ConnectionException; import cn.hippo4j.rpc.exception.ConnectionException;
@ -29,6 +28,7 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.Arrays; import java.util.Arrays;
@ -37,6 +37,8 @@ import java.util.List;
/** /**
* adapter to the netty server * adapter to the netty server
*
* @since 1.5.1
*/ */
@Slf4j @Slf4j
public class NettyServerConnection extends AbstractNettyHandlerManager implements ServerConnection { public class NettyServerConnection extends AbstractNettyHandlerManager implements ServerConnection {
@ -70,6 +72,10 @@ public class NettyServerConnection extends AbstractNettyHandlerManager implement
@Override @Override
public void bind(ServerPort port) { public void bind(ServerPort port) {
int serverPort = port.getPort();
if (serverPort < 0 || serverPort > 65535) {
throw new ConnectionException("The port number " + serverPort + " is outside 0~65535, which is not a legal port number");
}
ServerBootstrap server = new ServerBootstrap(); ServerBootstrap server = new ServerBootstrap();
server.group(leader, worker) server.group(leader, worker)
.channel(socketChannelCls) .channel(socketChannelCls)
@ -77,10 +83,10 @@ public class NettyServerConnection extends AbstractNettyHandlerManager implement
.childHandler(new ChannelInitializer<SocketChannel>() { .childHandler(new ChannelInitializer<SocketChannel>() {
@Override @Override
protected void initChannel(SocketChannel ch) throws Exception { protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new NettyEncoder()); pipeline.addLast(new NettyEncoder());
pipeline.addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null))); pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
handlerEntities.stream() handlerEntities.stream()
.sorted() .sorted()
.forEach(h -> { .forEach(h -> {
@ -93,9 +99,11 @@ public class NettyServerConnection extends AbstractNettyHandlerManager implement
} }
}); });
try { try {
this.future = server.bind(port.getPort()).sync(); this.future = server.bind(serverPort).sync();
this.channel = this.future.channel(); this.channel = this.future.channel();
log.info("The server is started and can receive requests. The listening port is {}", port.getPort()); if (log.isDebugEnabled()) {
log.debug("The server is started and can receive requests. The listening port is {}", serverPort);
}
this.port = port; this.port = port;
this.future.channel().closeFuture().sync(); this.future.channel().closeFuture().sync();
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
@ -113,7 +121,9 @@ public class NettyServerConnection extends AbstractNettyHandlerManager implement
this.worker.shutdownGracefully(); this.worker.shutdownGracefully();
this.channel.close(); this.channel.close();
this.future.channel().close(); this.future.channel().close();
log.info("The server is shut down and no more requests are received. The release port is {}", port.getPort()); if (log.isDebugEnabled()) {
log.debug("The server is shut down and no more requests are received. The release port is {}", port.getPort());
}
} }
@Override @Override
@ -136,16 +146,4 @@ public class NettyServerConnection extends AbstractNettyHandlerManager implement
return this; return this;
} }
@Override
public NettyServerConnection addLast(ChannelHandler handler) {
super.addLast(handler);
return this;
}
@Override
public NettyServerConnection addFirst(ChannelHandler handler) {
super.addFirst(handler);
return this;
}
} }

@ -24,6 +24,8 @@ import java.io.IOException;
/** /**
* Server Implementation * Server Implementation
*
* @since 1.5.1
*/ */
public class RPCServer implements Server { public class RPCServer implements Server {

@ -21,6 +21,8 @@ import java.io.Closeable;
/** /**
* the service for RPC, Explain the role of the service in the request * the service for RPC, Explain the role of the service in the request
*
* @since 1.5.1
*/ */
public interface Server extends Closeable { public interface Server extends Closeable {

@ -22,6 +22,8 @@ import cn.hippo4j.rpc.handler.Connection;
/** /**
* This applies to server-side connections * This applies to server-side connections
*
* @since 1.5.1
*/ */
public interface ServerConnection extends Connection { public interface ServerConnection extends Connection {

@ -17,6 +17,7 @@
package cn.hippo4j.rpc.support; package cn.hippo4j.rpc.support;
import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.rpc.client.Client; import cn.hippo4j.rpc.client.Client;
import cn.hippo4j.rpc.discovery.DiscoveryAdapter; import cn.hippo4j.rpc.discovery.DiscoveryAdapter;
import cn.hippo4j.rpc.exception.ConnectionException; import cn.hippo4j.rpc.exception.ConnectionException;
@ -30,11 +31,30 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationContextAware;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Optional;
/** /**
* A FactoryBean that builds interfaces to invoke proxy objects * A FactoryBean that builds interfaces to invoke proxy objects
* is responsible for managing the entire life cycle of the proxy objects<br> * is responsible for managing the entire life cycle of the proxy objects<br>
* <h3>APPLICATION START</h3>
* When the application is started, the request initiator needs to complete the proxy of the calling interface,
* which ensures that the method can be requested to the server side when the method is called, rather than simply
* request an interface that cannot be instantiated. The classes involved in adding proxy to the interface are:
* <ul>
* <li>{@link NettyClientSupport}</li>
* <li>{@link NettyProxyCenter}</li>
* <li>{@link NettyClientPoolHandler}</li>
* </ul>
* <h3>AND SPRING</h3>
* In order to fully integrate {@link ClientFactoryBean} into the life cycle of spring beans,
* {@link ClientFactoryBean} also needs to implement the following interfaces:
* <ul>
* <li>{@link InitializingBean}</li>
* <li>{@link ApplicationContextAware}</li>
* <li>{@link DisposableBean}</li>
* </ul>
* *
* @since 1.5.1
* @deprecated With {@link cn.hippo4j.config.service.ThreadPoolAdapterService} structure, FactoryBean is not the best choice * @deprecated With {@link cn.hippo4j.config.service.ThreadPoolAdapterService} structure, FactoryBean is not the best choice
*/ */
@Deprecated @Deprecated
@ -52,10 +72,16 @@ public class ClientFactoryBean implements FactoryBean<Object>, InitializingBean,
*/ */
private String discoveryAdapterName; private String discoveryAdapterName;
/**
* The adaptation interface for obtaining ip information in the registry is used together with
* {@link #discoveryAdapterName}, so that the adapter implementation can be obtained in the container
* during the initialization phase
*/
private DiscoveryAdapter discoveryAdapter; private DiscoveryAdapter discoveryAdapter;
/** /**
* the channel handler * the channel handler, To ensure the security and reliability of netty calls,
* {@link ChannelHandler} must be identified by {@link ChannelHandler.Sharable}
*/ */
private ChannelHandler[] handlers; private ChannelHandler[] handlers;
@ -70,11 +96,13 @@ public class ClientFactoryBean implements FactoryBean<Object>, InitializingBean,
private ApplicationContext applicationContext; private ApplicationContext applicationContext;
/** /**
* InetSocketAddress * InetSocketAddress, It is usually converted from {@link #applicationName} and {@link #discoveryAdapter}
*/ */
InetSocketAddress address; InetSocketAddress address;
public ClientFactoryBean(String applicationName, String discoveryAdapterName, Class<?> cls) { public ClientFactoryBean(String applicationName, String discoveryAdapterName, Class<?> cls) {
Assert.notNull(applicationName);
Assert.notNull(cls);
this.applicationName = applicationName; this.applicationName = applicationName;
this.discoveryAdapterName = discoveryAdapterName; this.discoveryAdapterName = discoveryAdapterName;
this.cls = cls; this.cls = cls;
@ -82,16 +110,17 @@ public class ClientFactoryBean implements FactoryBean<Object>, InitializingBean,
@Override @Override
public Object getObject() throws Exception { public Object getObject() throws Exception {
this.address = discoveryAdapter.getSocketAddress(applicationName); this.address = Optional.ofNullable(applicationName)
if (this.address == null) { .map(a -> discoveryAdapter.getSocketAddress(a))
.map(a -> {
String[] addressStr = applicationName.split(":"); String[] addressStr = applicationName.split(":");
if (addressStr.length < 2) { if (addressStr.length < 2) {
throw new ConnectionException("Failed to connect to the server because the IP address is invalid. Procedure"); throw new ConnectionException("Failed to connect to the server because the IP address is invalid. Procedure");
} }
this.address = InetSocketAddress.createUnresolved(addressStr[0], Integer.parseInt(addressStr[1])); return InetSocketAddress.createUnresolved(addressStr[0], Integer.parseInt(addressStr[1]));
} })
NettyClientPoolHandler handler = new NettyClientPoolHandler(handlers); .orElseThrow(() -> new ConnectionException("Failed to connect to the server because the IP address is invalid. Procedure"));
Client client = NettyClientSupport.getClient(this.address, handler); Client client = NettyClientSupport.getClient(this.address, new NettyClientPoolHandler(handlers));
return NettyProxyCenter.createProxy(client, cls, this.address); return NettyProxyCenter.createProxy(client, cls, this.address);
} }
@ -102,15 +131,15 @@ public class ClientFactoryBean implements FactoryBean<Object>, InitializingBean,
@Override @Override
public void afterPropertiesSet() throws Exception { public void afterPropertiesSet() throws Exception {
this.discoveryAdapter = (DiscoveryAdapter) applicationContext.getBean(discoveryAdapterName); this.discoveryAdapter = Optional.ofNullable(discoveryAdapterName)
.map(s -> (DiscoveryAdapter) applicationContext.getBean(discoveryAdapterName))
.orElse(null);
} }
@Override @Override
public void destroy() throws Exception { public void destroy() throws Exception {
if (this.address == null) { Optional.ofNullable(this.address)
return; .ifPresent(a -> NettyClientSupport.closeClient(this.address));
}
NettyClientSupport.closeClient(this.address);
} }
@Override @Override

@ -33,6 +33,7 @@ import java.io.IOException;
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
/** /**
@ -51,6 +52,7 @@ import java.util.concurrent.ConcurrentHashMap;
* @see cn.hippo4j.rpc.client.NettyClientConnection * @see cn.hippo4j.rpc.client.NettyClientConnection
* @see NettyServerSupport * @see NettyServerSupport
* @see ClientFactoryBean * @see ClientFactoryBean
* @since 1.5.1
*/ */
@NoArgsConstructor(access = AccessLevel.PRIVATE) @NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class NettyClientSupport { public final class NettyClientSupport {
@ -73,7 +75,7 @@ public final class NettyClientSupport {
? (NettyClientPoolHandler) handlerManager ? (NettyClientPoolHandler) handlerManager
: new NettyClientPoolHandler(); : new NettyClientPoolHandler();
if (handler.isEmpty()) { if (handler.isEmpty()) {
handler.addFirst(new NettyClientTakeHandler()); handler.addFirst(null, new NettyClientTakeHandler());
} }
NettyClientConnection connection = new NettyClientConnection(address, handler); NettyClientConnection connection = new NettyClientConnection(address, handler);
return new RPCClient(connection); return new RPCClient(connection);
@ -97,12 +99,13 @@ public final class NettyClientSupport {
*/ */
public static void closeClient(InetSocketAddress address) { public static void closeClient(InetSocketAddress address) {
Client client = clientMap.remove(address); Client client = clientMap.remove(address);
Optional.ofNullable(client)
.ifPresent(c -> {
try { try {
if (client != null) { c.close();
client.close();
}
} catch (IOException e) { } catch (IOException e) {
throw new IllegalException(e); throw new IllegalException(e);
} }
});
} }
} }

@ -30,10 +30,13 @@ import io.netty.util.concurrent.Future;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Optional;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
* This parameter applies only to the connection pool of netty * This parameter applies only to the connection pool of netty
*
* @since 1.5.1
*/ */
@Slf4j @Slf4j
public class NettyConnectPool { public class NettyConnectPool {
@ -58,7 +61,9 @@ public class NettyConnectPool {
this.handler = handler; this.handler = handler;
this.pool = new FixedChannelPool(bootstrap, handler, healthCheck, acquireTimeoutAction, this.pool = new FixedChannelPool(bootstrap, handler, healthCheck, acquireTimeoutAction,
timeout, maxConnect, maxPendingAcquires, true, true); timeout, maxConnect, maxPendingAcquires, true, true);
if (log.isDebugEnabled()) {
log.info("The connection pool is established with the connection target {}:{}", address.getHostName(), address.getPort()); log.info("The connection pool is established with the connection target {}:{}", address.getHostName(), address.getPort());
}
NettyConnectPoolHolder.createPool(address, this); NettyConnectPoolHolder.createPool(address, this);
} }
@ -82,14 +87,15 @@ public class NettyConnectPool {
} }
public void release(Channel channel) { public void release(Channel channel) {
Optional.ofNullable(channel)
.ifPresent(c -> {
try { try {
if (channel != null) {
pool.release(channel); pool.release(channel);
}
} catch (Exception e) { } catch (Exception e) {
NettyClientSupport.closeClient(address); NettyClientSupport.closeClient(address);
throw new ConnectionException("Failed to release the connection", e); throw new ConnectionException("Failed to release the connection", e);
} }
});
} }
public void close() { public void close() {

@ -31,6 +31,8 @@ import java.util.concurrent.ConcurrentHashMap;
/** /**
* To avoid creating multiple connection pools for the same host:port, save all connection pools of the client * To avoid creating multiple connection pools for the same host:port, save all connection pools of the client
*
* @since 1.5.1
*/ */
@NoArgsConstructor(access = AccessLevel.PRIVATE) @NoArgsConstructor(access = AccessLevel.PRIVATE)
public class NettyConnectPoolHolder { public class NettyConnectPoolHolder {

@ -36,6 +36,8 @@ import java.util.concurrent.ConcurrentHashMap;
/** /**
* Add a proxy for the request, {@link Proxy} and {@link InvocationHandler} * Add a proxy for the request, {@link Proxy} and {@link InvocationHandler}
*
* @since 1.5.1
*/ */
@NoArgsConstructor(access = AccessLevel.PRIVATE) @NoArgsConstructor(access = AccessLevel.PRIVATE)
public class NettyProxyCenter { public class NettyProxyCenter {

@ -40,6 +40,7 @@ import java.util.List;
* @see RPCServer * @see RPCServer
* @see NettyServerConnection * @see NettyServerConnection
* @see NettyClientSupport * @see NettyClientSupport
* @since 1.5.1
*/ */
public class NettyServerSupport implements Server { public class NettyServerSupport implements Server {
@ -95,7 +96,7 @@ public class NettyServerSupport implements Server {
: new NettyServerConnection(); : new NettyServerConnection();
// Assign a default handler if no handler exists // Assign a default handler if no handler exists
if (connection.isEmpty()) { if (connection.isEmpty()) {
connection.addFirst(new NettyServerTakeHandler(new DefaultInstance())); connection.addFirst(null, new NettyServerTakeHandler(new DefaultInstance()));
} }
server = new RPCServer(connection, serverPort); server = new RPCServer(connection, serverPort);
} }

@ -31,6 +31,8 @@ import java.util.concurrent.locks.LockSupport;
* The unique remote call can be determined by the key of request and * The unique remote call can be determined by the key of request and
* response, and the result of the call is stored in the secondary cache, * response, and the result of the call is stored in the secondary cache,
* which is convenient for the client to use at any time. * which is convenient for the client to use at any time.
*
* @since 1.5.1
*/ */
@Slf4j @Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE) @NoArgsConstructor(access = AccessLevel.PRIVATE)

@ -17,6 +17,8 @@
package cn.hippo4j.rpc.client; package cn.hippo4j.rpc.client;
import cn.hippo4j.common.toolkit.ThreadUtil;
public class CallManager { public class CallManager {
public int call() { public int call() {
@ -27,4 +29,10 @@ public class CallManager {
return a + b; return a + b;
} }
public int callTestTimeout() {
// thread sleep for 10 seconds
ThreadUtil.sleep(10000);
return 1;
}
} }

@ -22,9 +22,7 @@ import cn.hippo4j.rpc.discovery.ClassRegistry;
import cn.hippo4j.rpc.discovery.DefaultInstance; import cn.hippo4j.rpc.discovery.DefaultInstance;
import cn.hippo4j.rpc.discovery.Instance; import cn.hippo4j.rpc.discovery.Instance;
import cn.hippo4j.rpc.discovery.ServerPort; import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler; import cn.hippo4j.rpc.handler.*;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.rpc.model.DefaultRequest; import cn.hippo4j.rpc.model.DefaultRequest;
import cn.hippo4j.rpc.model.Request; import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response; import cn.hippo4j.rpc.model.Response;
@ -109,19 +107,53 @@ public class RPCClientTest {
rpcServer.close(); rpcServer.close();
} }
@Test(expected = Exception.class)
public void responseNullExceptionTest() throws IOException {
Class<CallManager> cls = CallManager.class;
String className = cls.getName();
ClassRegistry.put(className, cls);
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(connection, port);
rpcServer.bind();
while (!rpcServer.isActive()) {
ThreadUtil.sleep(100L);
}
InetSocketAddress address = InetSocketAddress.createUnresolved(host, port.getPort());
ChannelPoolHandler channelPoolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
ClientConnection clientConnection = new NettyClientConnection(address, channelPoolHandler);
clientConnection.setTimeout(300L);
try (RPCClient rpcClient = new RPCClient(clientConnection)) {
Request request = new DefaultRequest("127.0.0.18888", className, "callTestTimeout", null, null);
Response response = rpcClient.connection(request);
Assert.assertNotNull(response.getErrMsg());
Assert.assertNotNull(response.getThrowable());
} catch (IOException e) {
// no something
} finally {
rpcServer.close();
}
}
static class TestServerPort implements ServerPort { static class TestServerPort implements ServerPort {
int port = RandomPort.getSafeRandomPort();
@Override @Override
public int getPort() { public int getPort() {
return 8888; return port;
} }
} }
static class TestPortServerPort implements ServerPort { static class TestPortServerPort implements ServerPort {
int port = RandomPort.getSafeRandomPort();
@Override @Override
public int getPort() { public int getPort() {
return 8889; return port;
} }
} }
} }

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.client;
import java.io.IOException;
import java.net.ServerSocket;
public class RandomPort {
public static int getSafeRandomPort() {
try (ServerSocket socket = new ServerSocket(0)) {
return socket.getLocalPort();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

@ -20,7 +20,12 @@ package cn.hippo4j.rpc.handler;
import cn.hippo4j.common.toolkit.ThreadUtil; import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.rpc.client.NettyClientConnection; import cn.hippo4j.rpc.client.NettyClientConnection;
import cn.hippo4j.rpc.client.RPCClient; import cn.hippo4j.rpc.client.RPCClient;
import cn.hippo4j.rpc.client.RandomPort;
import cn.hippo4j.rpc.discovery.*; import cn.hippo4j.rpc.discovery.*;
import cn.hippo4j.rpc.model.DefaultRequest;
import cn.hippo4j.rpc.model.DefaultResponse;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import cn.hippo4j.rpc.server.NettyServerConnection; import cn.hippo4j.rpc.server.NettyServerConnection;
import cn.hippo4j.rpc.server.RPCServer; import cn.hippo4j.rpc.server.RPCServer;
import cn.hippo4j.rpc.support.NettyProxyCenter; import cn.hippo4j.rpc.support.NettyProxyCenter;
@ -38,7 +43,7 @@ public class ConnectHandlerTest {
// server // server
Class<InstanceServerLoader> cls = InstanceServerLoader.class; Class<InstanceServerLoader> cls = InstanceServerLoader.class;
ClassRegistry.put(cls.getName(), cls); ClassRegistry.put(cls.getName(), cls);
ServerPort port = () -> 8892; ServerPort port = new TestServerPort();
Instance instance = new DefaultInstance(); Instance instance = new DefaultInstance();
NettyServerTakeHandler serverHandler = new NettyServerTakeHandler(instance); NettyServerTakeHandler serverHandler = new NettyServerTakeHandler(instance);
NettyServerConnection connection = new NettyServerConnection(serverHandler); NettyServerConnection connection = new NettyServerConnection(serverHandler);
@ -59,4 +64,36 @@ public class ConnectHandlerTest {
rpcServer.close(); rpcServer.close();
} }
@Test
public void testConnectHandlerDefault() {
ConnectHandler handler = new TestConnectHandler();
Request request = new DefaultRequest("key", "className", "methodName", new Class[0], new Object[0]);
Response response = handler.sendHandler(request);
Assert.assertNull(response);
Response response1 = new DefaultResponse("key", this.getClass(), handler);
String key = response1.getKey();
Class<?> cls = response1.getCls();
Object obj = response1.getObj();
handler.handler(response1);
Assert.assertEquals(key, response1.getKey());
Assert.assertEquals(cls, response1.getCls());
Assert.assertEquals(obj, response1.getObj());
}
static class TestConnectHandler implements ConnectHandler {
}
static class TestServerPort implements ServerPort {
int port = RandomPort.getSafeRandomPort();
@Override
public int getPort() {
return port;
}
}
} }

@ -17,10 +17,28 @@
package cn.hippo4j.rpc.handler; package cn.hippo4j.rpc.handler;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.common.web.exception.IllegalException;
import cn.hippo4j.rpc.client.*;
import cn.hippo4j.rpc.discovery.ClassRegistry;
import cn.hippo4j.rpc.discovery.DefaultInstance;
import cn.hippo4j.rpc.discovery.Instance;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.model.DefaultRequest;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import cn.hippo4j.rpc.server.NettyServerConnection;
import cn.hippo4j.rpc.server.RPCServer;
import cn.hippo4j.rpc.server.ServerConnection;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
public class NettyClientPoolHandlerTest { public class NettyClientPoolHandlerTest {
@Test @Test
@ -54,7 +72,7 @@ public class NettyClientPoolHandlerTest {
public void addLast() { public void addLast() {
NettyClientPoolHandler handler = new NettyClientPoolHandler(); NettyClientPoolHandler handler = new NettyClientPoolHandler();
Assert.assertTrue(handler.isEmpty()); Assert.assertTrue(handler.isEmpty());
handler.addLast(new TestHandler()); handler.addLast(null, new TestHandler());
Assert.assertFalse(handler.isEmpty()); Assert.assertFalse(handler.isEmpty());
} }
@ -62,7 +80,7 @@ public class NettyClientPoolHandlerTest {
public void addFirst() { public void addFirst() {
NettyClientPoolHandler handler = new NettyClientPoolHandler(); NettyClientPoolHandler handler = new NettyClientPoolHandler();
Assert.assertTrue(handler.isEmpty()); Assert.assertTrue(handler.isEmpty());
handler.addFirst(new TestHandler()); handler.addFirst(null, new TestHandler());
Assert.assertFalse(handler.isEmpty()); Assert.assertFalse(handler.isEmpty());
} }
@ -81,4 +99,54 @@ public class NettyClientPoolHandlerTest {
handler.addFirst("Test", new TestHandler()); handler.addFirst("Test", new TestHandler());
Assert.assertFalse(handler.isEmpty()); Assert.assertFalse(handler.isEmpty());
} }
@Test(expected = IllegalException.class)
public void testGetHandlerEntityFalse() {
TestFalseHandler handler = new TestFalseHandler();
long order = 0;
String name = "Test";
NettyClientPoolHandler poolHandler = new NettyClientPoolHandler();
poolHandler.getHandlerEntity(order, handler, name);
}
@Test
public void connectionTest() throws IOException {
ServerPort port = new ServerPort() {
final int a = RandomPort.getSafeRandomPort();
@Override
public int getPort() {
return a;
}
};
Class<CallManager> cls = CallManager.class;
String className = cls.getName();
ClassRegistry.put(className, cls);
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(connection, port);
rpcServer.bind();
while (!rpcServer.isActive()) {
ThreadUtil.sleep(100L);
}
InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", port.getPort());
List<ChannelHandler> handlers = new ArrayList<>();
handlers.add(new NettyClientTakeHandler());
NettyClientPoolHandler channelPoolHandler = new NettyClientPoolHandler(handlers);
channelPoolHandler.addLast("test", new TestHandler());
ClientConnection clientConnection = new NettyClientConnection(address, channelPoolHandler);
RPCClient rpcClient = new RPCClient(clientConnection);
Request request = new DefaultRequest("127.0.0.18888", className, "call", null, null);
for (int i = 0; i < 50; i++) {
Response response = rpcClient.connection(request);
boolean active = rpcClient.isActive();
Assert.assertTrue(active);
Assert.assertEquals(response.getObj(), 1);
}
rpcClient.close();
rpcServer.close();
}
} }

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.handler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
public class TestFalseHandler implements ChannelHandler {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
}
}

@ -19,7 +19,7 @@ package cn.hippo4j.rpc.handler;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
@ChannelHandler.Sharable
public class TestHandler implements ChannelHandler { public class TestHandler implements ChannelHandler {
@Override @Override

@ -59,4 +59,18 @@ public class DefaultRequestTest {
Assert.assertEquals(request1, request); Assert.assertEquals(request1, request);
} }
@Test
public void testEquals() throws NoSuchMethodException {
String key = "name";
String clsName = InstanceServerLoaderImpl.class.getName();
Method method = InstanceServerLoaderImpl.class.getMethod("setName", String.class);
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
Object[] parameters = new Object[1];
parameters[0] = "hippo4j";
Request request = new DefaultRequest(key, clsName, methodName, parameterTypes, parameters);
Assert.assertTrue(request.equals(request));
Assert.assertFalse(request.equals(null));
}
} }

@ -83,4 +83,13 @@ public class DefaultResponseTest {
Assert.assertTrue(response1.isErr()); Assert.assertTrue(response1.isErr());
} }
@Test
public void testEquals() throws NoSuchMethodException {
String key = "name";
Object o = "obj";
Class<?> cls = String.class;
Response response = new DefaultResponse(key, cls, o);
Assert.assertTrue(response.equals(response));
Assert.assertFalse(response.equals(null));
}
} }

@ -27,7 +27,7 @@ public class NettyServerConnectionTest {
public void addLast() { public void addLast() {
NettyServerConnection connection = new NettyServerConnection(); NettyServerConnection connection = new NettyServerConnection();
Assert.assertTrue(connection.isEmpty()); Assert.assertTrue(connection.isEmpty());
connection.addLast(new TestHandler()); connection.addLast(null, new TestHandler());
Assert.assertFalse(connection.isEmpty()); Assert.assertFalse(connection.isEmpty());
} }
@ -35,7 +35,7 @@ public class NettyServerConnectionTest {
public void addFirst() { public void addFirst() {
NettyServerConnection connection = new NettyServerConnection(); NettyServerConnection connection = new NettyServerConnection();
Assert.assertTrue(connection.isEmpty()); Assert.assertTrue(connection.isEmpty());
connection.addFirst(new TestHandler()); connection.addFirst(null, new TestHandler());
Assert.assertFalse(connection.isEmpty()); Assert.assertFalse(connection.isEmpty());
} }

@ -18,15 +18,26 @@
package cn.hippo4j.rpc.server; package cn.hippo4j.rpc.server;
import cn.hippo4j.common.toolkit.ThreadUtil; import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.rpc.client.*;
import cn.hippo4j.rpc.discovery.ClassRegistry;
import cn.hippo4j.rpc.discovery.DefaultInstance; import cn.hippo4j.rpc.discovery.DefaultInstance;
import cn.hippo4j.rpc.discovery.Instance; import cn.hippo4j.rpc.discovery.Instance;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler; import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.rpc.handler.TestHandler;
import cn.hippo4j.rpc.model.DefaultRequest;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.pool.ChannelPoolHandler;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
public class RPCServerTest { public class RPCServerTest {
@ -35,7 +46,7 @@ public class RPCServerTest {
Instance instance = new DefaultInstance(); Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler); ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(connection, () -> 8893); RPCServer rpcServer = new RPCServer(connection, RandomPort::getSafeRandomPort);
rpcServer.bind(); rpcServer.bind();
while (!rpcServer.isActive()) { while (!rpcServer.isActive()) {
ThreadUtil.sleep(100L); ThreadUtil.sleep(100L);
@ -52,7 +63,7 @@ public class RPCServerTest {
EventLoopGroup worker = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(leader, worker, handler); ServerConnection connection = new NettyServerConnection(leader, worker, handler);
RPCServer rpcServer = new RPCServer(connection, () -> 8894); RPCServer rpcServer = new RPCServer(connection, RandomPort::getSafeRandomPort);
rpcServer.bind(); rpcServer.bind();
while (!rpcServer.isActive()) { while (!rpcServer.isActive()) {
ThreadUtil.sleep(100L); ThreadUtil.sleep(100L);
@ -61,4 +72,57 @@ public class RPCServerTest {
Assert.assertTrue(active); Assert.assertTrue(active);
rpcServer.close(); rpcServer.close();
} }
@Test
public void bindPipelineTest() throws IOException {
ServerPort serverPort = new ServerPort() {
final int port = RandomPort.getSafeRandomPort();
@Override
public int getPort() {
return port;
}
};
Class<CallManager> cls = CallManager.class;
String className = cls.getName();
ClassRegistry.put(className, cls);
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
NettyServerConnection connection = new NettyServerConnection(handler);
connection.addLast("Test", new TestHandler());
RPCServer rpcServer = new RPCServer(connection, serverPort);
rpcServer.bind();
while (!rpcServer.isActive()) {
ThreadUtil.sleep(100L);
}
InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", serverPort.getPort());
ChannelPoolHandler channelPoolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
ClientConnection clientConnection = new NettyClientConnection(address, channelPoolHandler);
RPCClient rpcClient = new RPCClient(clientConnection);
Request request = new DefaultRequest("127.0.0.18888", className, "call", null, null);
for (int i = 0; i < 50; i++) {
Response response = rpcClient.connection(request);
boolean active = rpcClient.isActive();
Assert.assertTrue(active);
Assert.assertEquals(response.getObj(), 1);
}
rpcClient.close();
rpcServer.close();
}
@Test
public void bindNegativeTest() {
ServerPort serverPort = () -> -1;
Class<CallManager> cls = CallManager.class;
String className = cls.getName();
ClassRegistry.put(className, cls);
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
NettyServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(connection, serverPort);
rpcServer.bind();
}
} }

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.support;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.rpc.client.*;
import cn.hippo4j.rpc.discovery.ClassRegistry;
import cn.hippo4j.rpc.discovery.DefaultInstance;
import cn.hippo4j.rpc.discovery.Instance;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.rpc.handler.TestHandler;
import cn.hippo4j.rpc.model.DefaultRequest;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import cn.hippo4j.rpc.server.NettyServerConnection;
import cn.hippo4j.rpc.server.RPCServer;
import io.netty.channel.pool.ChannelPoolHandler;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
public class NettyClientSupportTest {
@Test
public void closeTest() throws IOException {
int port = RandomPort.getSafeRandomPort();
ServerPort serverPort = () -> port;
Class<CallManager> cls = CallManager.class;
String className = cls.getName();
ClassRegistry.put(className, cls);
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
NettyServerConnection connection = new NettyServerConnection(handler);
connection.addLast("Test", new TestHandler());
RPCServer rpcServer = new RPCServer(connection, serverPort);
rpcServer.bind();
while (!rpcServer.isActive()) {
ThreadUtil.sleep(100L);
}
InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", port);
ChannelPoolHandler channelPoolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
ClientConnection clientConnection = new NettyClientConnection(address, channelPoolHandler);
RPCClient rpcClient = new RPCClient(clientConnection);
NettyClientSupport.closeClient(new InetSocketAddress("localhost", port));
rpcClient.close();
rpcServer.close();
}
}

@ -17,6 +17,7 @@
package cn.hippo4j.rpc.support; package cn.hippo4j.rpc.support;
import cn.hippo4j.rpc.client.RandomPort;
import cn.hippo4j.rpc.discovery.ServerPort; import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler; import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler; import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
@ -78,7 +79,7 @@ public class NettyConnectPoolHolderTest {
@Override @Override
public int getPort() { public int getPort() {
return 8895; return RandomPort.getSafeRandomPort();
} }
} }
} }

@ -18,6 +18,7 @@
package cn.hippo4j.rpc.support; package cn.hippo4j.rpc.support;
import cn.hippo4j.common.toolkit.ThreadUtil; import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.rpc.client.RandomPort;
import cn.hippo4j.rpc.discovery.DefaultInstance; import cn.hippo4j.rpc.discovery.DefaultInstance;
import cn.hippo4j.rpc.discovery.Instance; import cn.hippo4j.rpc.discovery.Instance;
import cn.hippo4j.rpc.discovery.ServerPort; import cn.hippo4j.rpc.discovery.ServerPort;
@ -112,9 +113,10 @@ public class NettyConnectPoolTest {
static class TestServerPort implements ServerPort { static class TestServerPort implements ServerPort {
int port = RandomPort.getSafeRandomPort();
@Override @Override
public int getPort() { public int getPort() {
return 8890; return port;
} }
} }
} }

@ -17,13 +17,19 @@
package cn.hippo4j.rpc.support; package cn.hippo4j.rpc.support;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.common.web.exception.IllegalException; import cn.hippo4j.common.web.exception.IllegalException;
import cn.hippo4j.rpc.discovery.ServerPort; import cn.hippo4j.rpc.client.*;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler; import cn.hippo4j.rpc.discovery.*;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler; import cn.hippo4j.rpc.exception.ConnectionException;
import cn.hippo4j.rpc.handler.*;
import cn.hippo4j.rpc.server.NettyServerConnection;
import cn.hippo4j.rpc.server.RPCServer;
import io.netty.channel.pool.ChannelPoolHandler;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
public class NettyProxyCenterTest { public class NettyProxyCenterTest {
@ -42,6 +48,27 @@ public class NettyProxyCenterTest {
public void createProxy() { public void createProxy() {
ProxyInterface localhost = NettyProxyCenter.getProxy(ProxyInterface.class, "localhost:8894"); ProxyInterface localhost = NettyProxyCenter.getProxy(ProxyInterface.class, "localhost:8894");
Assert.assertNotNull(localhost); Assert.assertNotNull(localhost);
NettyProxyCenter.getProxy(ProxyInterface.class, "localhost:8894");
InetSocketAddress socketAddress = InetSocketAddress.createUnresolved("localhost", 8894);
Client client = NettyClientSupport.getClient(socketAddress);
ProxyInterface proxy = NettyProxyCenter.createProxy(client, ProxyInterface.class, socketAddress);
Assert.assertNotNull(proxy);
}
@Test(expected = ConnectionException.class)
public void createProxyException() {
NettyProxyCenter.getProxy(ProxyInterface.class, "localhost8894");
}
@Test
public void removeProxy() {
NettyProxyCenter.getProxy(ProxyInterface.class, "localhost:8894");
NettyProxyCenter.removeProxy(ProxyInterface.class, "localhost:8894");
}
@Test(expected = ConnectionException.class)
public void removeProxyException() {
NettyProxyCenter.removeProxy(ProxyInterface.class, "localhost8894");
} }
@Test(expected = IllegalException.class) @Test(expected = IllegalException.class)
@ -52,6 +79,32 @@ public class NettyProxyCenterTest {
Assert.assertNotNull(localhost); Assert.assertNotNull(localhost);
} }
@Test
public void bindPipelineTest() throws IOException {
// server
Class<InstanceServerLoader> cls = InstanceServerLoader.class;
ClassRegistry.put(cls.getName(), cls);
ServerPort port = new TestServerPort();
Instance instance = new DefaultInstance();
NettyServerTakeHandler serverHandler = new NettyServerTakeHandler(instance);
NettyServerConnection connection = new NettyServerConnection(serverHandler);
RPCServer rpcServer = new RPCServer(connection, port);
rpcServer.bind();
while (!rpcServer.isActive()) {
ThreadUtil.sleep(100L);
}
InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", port.getPort());
ChannelPoolHandler channelPoolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
NettyClientConnection clientConnection = new NettyClientConnection(address, channelPoolHandler);
RPCClient rpcClient = new RPCClient(clientConnection);
InstanceServerLoader loader = NettyProxyCenter.createProxy(rpcClient, cls, address);
String name = loader.getName();
Assert.assertEquals("name", name);
rpcClient.close();
rpcServer.close();
}
interface ProxyInterface { interface ProxyInterface {
void hello(); void hello();
@ -63,9 +116,11 @@ public class NettyProxyCenterTest {
static class TestServerPort implements ServerPort { static class TestServerPort implements ServerPort {
int port = RandomPort.getSafeRandomPort();
@Override @Override
public int getPort() { public int getPort() {
return 8894; return port;
} }
} }
} }

@ -18,17 +18,44 @@
package cn.hippo4j.rpc.support; package cn.hippo4j.rpc.support;
import cn.hippo4j.common.toolkit.ThreadUtil; import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.rpc.discovery.InstanceServerLoader; import cn.hippo4j.rpc.client.*;
import cn.hippo4j.rpc.discovery.*;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.rpc.model.DefaultRequest;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import cn.hippo4j.rpc.server.NettyServerConnection;
import cn.hippo4j.rpc.server.RPCServer;
import cn.hippo4j.rpc.server.ServerConnection;
import io.netty.channel.pool.ChannelPoolHandler;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
public class NettyServerSupportTest { public class NettyServerSupportTest {
@Test @Test
public synchronized void bind() throws IOException { public void bind() throws IOException {
NettyServerSupport support = new NettyServerSupport(() -> 8891, InstanceServerLoader.class); NettyServerSupport support = new NettyServerSupport(RandomPort::getSafeRandomPort, InstanceServerLoader.class);
support.bind();
while (!support.isActive()) {
ThreadUtil.sleep(100L);
}
Assert.assertTrue(support.isActive());
support.close();
}
@Test
public void bindTest() throws IOException {
List<Class<?>> classes = new ArrayList<>();
classes.add(InstanceServerLoader.class);
NettyServerSupport support = new NettyServerSupport(RandomPort::getSafeRandomPort, classes);
support.bind(); support.bind();
while (!support.isActive()) { while (!support.isActive()) {
ThreadUtil.sleep(100L); ThreadUtil.sleep(100L);

Loading…
Cancel
Save