fix : add server support for better use server

pull/945/head
pizihao 3 years ago
parent b3b27204a7
commit 433df608e2

@ -18,13 +18,15 @@
package cn.hippo4j.rpc.discovery; package cn.hippo4j.rpc.discovery;
/** /**
* * Gets the top-level interface of the instance port
*/ */
@FunctionalInterface
public interface ServerPort { public interface ServerPort {
/** /**
* Gets the listening or exposed port
* *
* @return * @return port
*/ */
int getPort(); int getPort();

@ -33,40 +33,40 @@ import java.util.List;
* Processing by the client connection pool handler to clean the buffer and define new connection properties * Processing by the client connection pool handler to clean the buffer and define new connection properties
*/ */
@Slf4j @Slf4j
public class NettyClientPoolHandler extends NettyHandlerManager implements ChannelPoolHandler { public class AbstractNettyClientPoolHandler extends AbstractNettyHandlerManager implements ChannelPoolHandler {
public NettyClientPoolHandler(List<ChannelHandler> handlers) { public AbstractNettyClientPoolHandler(List<ChannelHandler> handlers) {
super(handlers); super(handlers);
} }
public NettyClientPoolHandler(ChannelHandler... handlers) { public AbstractNettyClientPoolHandler(ChannelHandler... handlers) {
super(handlers); super(handlers);
} }
public NettyClientPoolHandler() { public AbstractNettyClientPoolHandler() {
super(); super();
} }
@Override @Override
public NettyClientPoolHandler addLast(String name, ChannelHandler handler) { public AbstractNettyClientPoolHandler addLast(String name, ChannelHandler handler) {
super.addLast(name, handler); super.addLast(name, handler);
return this; return this;
} }
@Override @Override
public NettyClientPoolHandler addFirst(String name, ChannelHandler handler) { public AbstractNettyClientPoolHandler addFirst(String name, ChannelHandler handler) {
super.addFirst(name, handler); super.addFirst(name, handler);
return this; return this;
} }
@Override @Override
public NettyClientPoolHandler addLast(ChannelHandler handler) { public AbstractNettyClientPoolHandler addLast(ChannelHandler handler) {
super.addLast(handler); super.addLast(handler);
return this; return this;
} }
@Override @Override
public NettyClientPoolHandler addFirst(ChannelHandler handler) { public AbstractNettyClientPoolHandler addFirst(ChannelHandler handler) {
super.addFirst(handler); super.addFirst(handler);
return this; return this;
} }

@ -27,7 +27,7 @@ import java.util.stream.Collectors;
/** /**
* Processor manager for ChannelHandler in netty * Processor manager for ChannelHandler in netty
*/ */
public abstract class NettyHandlerManager implements HandlerManager<ChannelHandler> { public abstract class AbstractNettyHandlerManager implements HandlerManager<ChannelHandler> {
protected final List<HandlerEntity<ChannelHandler>> handlerEntities; protected final List<HandlerEntity<ChannelHandler>> handlerEntities;
@ -35,7 +35,7 @@ public abstract class NettyHandlerManager implements HandlerManager<ChannelHandl
AtomicLong lastIndex = new AtomicLong(0); AtomicLong lastIndex = new AtomicLong(0);
protected NettyHandlerManager(List<ChannelHandler> handlerEntities) { protected AbstractNettyHandlerManager(List<ChannelHandler> handlerEntities) {
Assert.notNull(handlerEntities); Assert.notNull(handlerEntities);
this.handlerEntities = handlerEntities.stream() this.handlerEntities = handlerEntities.stream()
.filter(Objects::nonNull) .filter(Objects::nonNull)
@ -43,11 +43,11 @@ public abstract class NettyHandlerManager implements HandlerManager<ChannelHandl
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
protected NettyHandlerManager(ChannelHandler... handlerEntities) { protected AbstractNettyHandlerManager(ChannelHandler... handlerEntities) {
this(handlerEntities != null ? Arrays.asList(handlerEntities) : Collections.emptyList()); this(handlerEntities != null ? Arrays.asList(handlerEntities) : Collections.emptyList());
} }
protected NettyHandlerManager() { protected AbstractNettyHandlerManager() {
this.handlerEntities = new LinkedList<>(); this.handlerEntities = new LinkedList<>();
} }
@ -63,7 +63,7 @@ public abstract class NettyHandlerManager implements HandlerManager<ChannelHandl
* @param handler handler * @param handler handler
* @return NettyHandlerManager * @return NettyHandlerManager
*/ */
public NettyHandlerManager addLast(String name, ChannelHandler handler) { public AbstractNettyHandlerManager addLast(String name, ChannelHandler handler) {
Assert.notNull(handler); Assert.notNull(handler);
this.handlerEntities.add(getHandlerEntity(lastIndex.getAndIncrement(), handler, name)); this.handlerEntities.add(getHandlerEntity(lastIndex.getAndIncrement(), handler, name));
return this; return this;
@ -76,7 +76,7 @@ public abstract class NettyHandlerManager implements HandlerManager<ChannelHandl
* @param handler handler * @param handler handler
* @return NettyHandlerManager * @return NettyHandlerManager
*/ */
public NettyHandlerManager addFirst(String name, ChannelHandler handler) { public AbstractNettyHandlerManager addFirst(String name, ChannelHandler handler) {
Assert.notNull(handler); Assert.notNull(handler);
this.handlerEntities.add(getHandlerEntity(firstIndex.getAndIncrement(), handler, name)); this.handlerEntities.add(getHandlerEntity(firstIndex.getAndIncrement(), handler, name));
return this; return this;
@ -88,7 +88,7 @@ public abstract class NettyHandlerManager implements HandlerManager<ChannelHandl
* @param handler handler * @param handler handler
* @return NettyHandlerManager * @return NettyHandlerManager
*/ */
public NettyHandlerManager addLast(ChannelHandler handler) { public AbstractNettyHandlerManager addLast(ChannelHandler handler) {
Assert.notNull(handler); Assert.notNull(handler);
this.handlerEntities.add(getHandlerEntity(lastIndex.getAndIncrement(), handler, null)); this.handlerEntities.add(getHandlerEntity(lastIndex.getAndIncrement(), handler, null));
return this; return this;
@ -100,7 +100,7 @@ public abstract class NettyHandlerManager implements HandlerManager<ChannelHandl
* @param handler handler * @param handler handler
* @return NettyHandlerManager * @return NettyHandlerManager
*/ */
public NettyHandlerManager addFirst(ChannelHandler handler) { public AbstractNettyHandlerManager addFirst(ChannelHandler handler) {
Assert.notNull(handler); Assert.notNull(handler);
this.handlerEntities.add(getHandlerEntity(firstIndex.getAndDecrement(), handler, null)); this.handlerEntities.add(getHandlerEntity(firstIndex.getAndDecrement(), handler, null));
return this; return this;

@ -117,6 +117,8 @@ public class DefaultResponse implements Response {
private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException { private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
s.defaultReadObject(); s.defaultReadObject();
// Deserialization obj // Deserialization obj
if (!isErr()){
this.obj = s.readObject(); this.obj = s.readObject();
} }
}
} }

@ -50,7 +50,8 @@ public interface Response extends Serializable {
String getErrMsg(); String getErrMsg();
/** /**
* Whether the current request has an error * Whether the current request has an error, <br>
* If it is true then it cannot be retrieved from obj
*/ */
boolean isErr(); boolean isErr();

@ -21,7 +21,7 @@ import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.rpc.coder.NettyDecoder; import cn.hippo4j.rpc.coder.NettyDecoder;
import cn.hippo4j.rpc.coder.NettyEncoder; import cn.hippo4j.rpc.coder.NettyEncoder;
import cn.hippo4j.rpc.discovery.ServerPort; import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.NettyHandlerManager; import cn.hippo4j.rpc.handler.AbstractNettyHandlerManager;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*; import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
@ -38,7 +38,7 @@ import java.util.List;
* adapter to the netty server * adapter to the netty server
*/ */
@Slf4j @Slf4j
public class NettyServerConnection extends NettyHandlerManager implements ServerConnection { public class AbstractNettyServerConnection extends AbstractNettyHandlerManager implements ServerConnection {
ServerPort port; ServerPort port;
EventLoopGroup leader; EventLoopGroup leader;
@ -47,7 +47,7 @@ public class NettyServerConnection extends NettyHandlerManager implements Server
ChannelFuture future; ChannelFuture future;
Channel channel; Channel channel;
public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, List<ChannelHandler> handlers) { public AbstractNettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, List<ChannelHandler> handlers) {
super(handlers); super(handlers);
Assert.notNull(leader); Assert.notNull(leader);
Assert.notNull(worker); Assert.notNull(worker);
@ -55,15 +55,15 @@ public class NettyServerConnection extends NettyHandlerManager implements Server
this.worker = worker; this.worker = worker;
} }
public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, ChannelHandler... handlers) { public AbstractNettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, ChannelHandler... handlers) {
this(leader, worker, (handlers != null ? Arrays.asList(handlers) : Collections.emptyList())); this(leader, worker, (handlers != null ? Arrays.asList(handlers) : Collections.emptyList()));
} }
public NettyServerConnection(ChannelHandler... handlers) { public AbstractNettyServerConnection(ChannelHandler... handlers) {
this(handlers != null ? Arrays.asList(handlers) : Collections.emptyList()); this(handlers != null ? Arrays.asList(handlers) : Collections.emptyList());
} }
public NettyServerConnection(List<ChannelHandler> handlers) { public AbstractNettyServerConnection(List<ChannelHandler> handlers) {
this(new NioEventLoopGroup(), new NioEventLoopGroup(), handlers); this(new NioEventLoopGroup(), new NioEventLoopGroup(), handlers);
} }
@ -118,25 +118,25 @@ public class NettyServerConnection extends NettyHandlerManager implements Server
} }
@Override @Override
public NettyServerConnection addLast(String name, ChannelHandler handler) { public AbstractNettyServerConnection addLast(String name, ChannelHandler handler) {
super.addLast(name, handler); super.addLast(name, handler);
return this; return this;
} }
@Override @Override
public NettyServerConnection addFirst(String name, ChannelHandler handler) { public AbstractNettyServerConnection addFirst(String name, ChannelHandler handler) {
super.addFirst(name, handler); super.addFirst(name, handler);
return this; return this;
} }
@Override @Override
public NettyServerConnection addLast(ChannelHandler handler) { public AbstractNettyServerConnection addLast(ChannelHandler handler) {
super.addLast(handler); super.addLast(handler);
return this; return this;
} }
@Override @Override
public NettyServerConnection addFirst(ChannelHandler handler) { public AbstractNettyServerConnection addFirst(ChannelHandler handler) {
super.addFirst(handler); super.addFirst(handler);
return this; return this;
} }

@ -29,7 +29,7 @@ public class RPCServer implements Server {
ServerPort port; ServerPort port;
ServerConnection serverConnection; ServerConnection serverConnection;
public RPCServer(ServerPort port, ServerConnection serverConnection) { public RPCServer(ServerConnection serverConnection, ServerPort port) {
this.port = port; this.port = port;
this.serverConnection = serverConnection; this.serverConnection = serverConnection;
} }

@ -19,7 +19,9 @@ package cn.hippo4j.rpc.support;
import cn.hippo4j.common.toolkit.IdUtil; import cn.hippo4j.common.toolkit.IdUtil;
import cn.hippo4j.common.web.exception.IllegalException; import cn.hippo4j.common.web.exception.IllegalException;
import cn.hippo4j.rpc.client.Client;
import cn.hippo4j.rpc.client.NettyClientConnection; import cn.hippo4j.rpc.client.NettyClientConnection;
import cn.hippo4j.rpc.client.RPCClient;
import cn.hippo4j.rpc.discovery.ServerPort; import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.model.DefaultRequest; import cn.hippo4j.rpc.model.DefaultRequest;
import cn.hippo4j.rpc.model.Request; import cn.hippo4j.rpc.model.Request;
@ -54,11 +56,12 @@ public class NettyProxyCenter {
*/ */
public static <T> T getProxy(Class<T> cls, String host, ServerPort port, ChannelPoolHandler handler) { public static <T> T getProxy(Class<T> cls, String host, ServerPort port, ChannelPoolHandler handler) {
NettyClientConnection connection = new NettyClientConnection(host, port, handler); NettyClientConnection connection = new NettyClientConnection(host, port, handler);
return getProxy(connection, cls, host, port); Client rpcClient = new RPCClient(connection);
return getProxy(rpcClient, cls, host, port);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static <T> T getProxy(NettyClientConnection connection, Class<T> cls, String host, ServerPort port) { public static <T> T getProxy(Client client, Class<T> cls, String host, ServerPort port) {
boolean b = cls.isInterface(); boolean b = cls.isInterface();
if (!b) { if (!b) {
throw new IllegalException(cls.getName() + "is not a Interface"); throw new IllegalException(cls.getName() + "is not a Interface");
@ -76,7 +79,7 @@ public class NettyProxyCenter {
String key = host + port + clsName + methodName + IdUtil.simpleUUID(); String key = host + port + clsName + methodName + IdUtil.simpleUUID();
Class<?>[] parameterTypes = method.getParameterTypes(); Class<?>[] parameterTypes = method.getParameterTypes();
Request request = new DefaultRequest(key, clsName, methodName, parameterTypes, args); Request request = new DefaultRequest(key, clsName, methodName, parameterTypes, args);
Response response = connection.connect(request); Response response = client.connection(request);
if (response == null) { if (response == null) {
return null; return null;
} }

@ -22,7 +22,7 @@ import cn.hippo4j.rpc.discovery.DefaultInstance;
import cn.hippo4j.rpc.discovery.ServerPort; import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.HandlerManager; import cn.hippo4j.rpc.handler.HandlerManager;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler; import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.rpc.server.NettyServerConnection; import cn.hippo4j.rpc.server.AbstractNettyServerConnection;
import cn.hippo4j.rpc.server.RPCServer; import cn.hippo4j.rpc.server.RPCServer;
import cn.hippo4j.rpc.server.Server; import cn.hippo4j.rpc.server.Server;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
@ -59,11 +59,11 @@ public class NettyServerSupport implements Server {
Server server; Server server;
public NettyServerSupport(ServerPort serverPort, Class<?>... classes) { public NettyServerSupport(ServerPort serverPort, Class<?>... classes) {
this(serverPort, new NettyServerConnection(), classes); this(serverPort, new AbstractNettyServerConnection(), classes);
} }
public NettyServerSupport(ServerPort serverPort, List<Class<?>> classes) { public NettyServerSupport(ServerPort serverPort, List<Class<?>> classes) {
this(serverPort, new NettyServerConnection(), classes); this(serverPort, new AbstractNettyServerConnection(), classes);
} }
public NettyServerSupport(ServerPort serverPort, HandlerManager<ChannelHandler> handlerManager, Class<?>... classes) { public NettyServerSupport(ServerPort serverPort, HandlerManager<ChannelHandler> handlerManager, Class<?>... classes) {
@ -86,14 +86,14 @@ public class NettyServerSupport implements Server {
// Register the interface that can be invoked // Register the interface that can be invoked
classes.stream().filter(Class::isInterface) classes.stream().filter(Class::isInterface)
.forEach(cls -> ClassRegistry.put(cls.getName(), cls)); .forEach(cls -> ClassRegistry.put(cls.getName(), cls));
NettyServerConnection connection = (handlerManager instanceof NettyServerConnection) AbstractNettyServerConnection connection = (handlerManager instanceof AbstractNettyServerConnection)
? (NettyServerConnection) handlerManager ? (AbstractNettyServerConnection) handlerManager
: new NettyServerConnection(); : new AbstractNettyServerConnection();
// Assign a default handler if no handler exists // Assign a default handler if no handler exists
if (connection.isEmpty()) { if (connection.isEmpty()) {
connection.addLast(new NettyServerTakeHandler(new DefaultInstance())); connection.addLast(new NettyServerTakeHandler(new DefaultInstance()));
} }
server = new RPCServer(serverPort, connection); server = new RPCServer(connection, serverPort);
} }
@Override @Override

Loading…
Cancel
Save