Adjust the directory structure and add the supporting classes on the server side (#946)

* fix : Adjust the directory structure and add the supporting classes on the server side

* fix : test for rpc handler and server side

* fix : update test for rpc
pull/950/head
pizihao 2 years ago committed by GitHub
parent 344f629c74
commit 31bbd1efa4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -17,8 +17,8 @@
package cn.hippo4j.rpc.client;
import cn.hippo4j.rpc.request.Request;
import cn.hippo4j.rpc.response.Response;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import java.io.Closeable;

@ -18,8 +18,8 @@
package cn.hippo4j.rpc.client;
import cn.hippo4j.rpc.handler.Connection;
import cn.hippo4j.rpc.request.Request;
import cn.hippo4j.rpc.response.Response;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
/**
* Applicable to client connections

@ -19,11 +19,12 @@ package cn.hippo4j.rpc.client;
import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.common.web.exception.IllegalException;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.exception.TimeOutException;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import cn.hippo4j.rpc.process.ActivePostProcess;
import cn.hippo4j.rpc.process.ActiveProcessChain;
import cn.hippo4j.rpc.request.Request;
import cn.hippo4j.rpc.response.Response;
import cn.hippo4j.rpc.support.NettyConnectPool;
import cn.hippo4j.rpc.support.NettyConnectPoolHolder;
import cn.hippo4j.rpc.support.ResultHolder;
@ -45,7 +46,7 @@ import java.util.concurrent.locks.LockSupport;
public class NettyClientConnection implements ClientConnection {
String host;
Integer port;
ServerPort port;
// Obtain the connection timeout period. The default value is 30s
long timeout = 30000L;
EventLoopGroup worker = new NioEventLoopGroup();
@ -54,7 +55,7 @@ public class NettyClientConnection implements ClientConnection {
ChannelFuture future;
Channel channel;
public NettyClientConnection(String host, int port,
public NettyClientConnection(String host, ServerPort port,
List<ActivePostProcess> activeProcesses,
ChannelPoolHandler handler) {
Assert.notNull(worker);
@ -64,7 +65,7 @@ public class NettyClientConnection implements ClientConnection {
this.connectionPool = NettyConnectPoolHolder.getPool(host, port, timeout, worker, handler);
}
public NettyClientConnection(String host, int port, ChannelPoolHandler handler) {
public NettyClientConnection(String host, ServerPort port, ChannelPoolHandler handler) {
this(host, port, new LinkedList<>(), handler);
}
@ -76,7 +77,7 @@ public class NettyClientConnection implements ClientConnection {
try {
String key = request.getKey();
this.future = channel.writeAndFlush(request);
log.info("Call successful, target address is {}:{}, request key is {}", host, port, key);
log.info("Call successful, target address is {}:{}, request key is {}", host, port.getPort(), key);
// Wait for execution to complete
ResultHolder.putThread(key, Thread.currentThread());
LockSupport.parkNanos(timeout() * 1000000);
@ -85,7 +86,7 @@ public class NettyClientConnection implements ClientConnection {
throw new TimeOutException("Timeout waiting for server-side response");
}
activeProcessChain.applyPostHandle(request, response);
log.info("The response from {}:{} was received successfully with the response key {}.", host, port, key);
log.info("The response from {}:{} was received successfully with the response key {}.", host, port.getPort(), key);
return response;
} catch (Exception ex) {
activeProcessChain.afterCompletion(request, response, ex);

@ -17,8 +17,8 @@
package cn.hippo4j.rpc.client;
import cn.hippo4j.rpc.request.Request;
import cn.hippo4j.rpc.response.Response;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import java.io.IOException;

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.rpc.support;
package cn.hippo4j.rpc.discovery;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;

@ -15,19 +15,28 @@
* limitations under the License.
*/
package cn.hippo4j.rpc.support;
package cn.hippo4j.rpc.discovery;
import cn.hippo4j.common.toolkit.ReflectUtil;
import cn.hippo4j.common.web.exception.IllegalException;
import java.util.Iterator;
import java.util.ServiceLoader;
/**
* Simply creating an instance of a class by its name and its specific type,
* and then throwing an exception if it is an interface, is not elegant
* You simply create an instance of a class based on its name and specific type.
* Load through the ServiceLoader first. If the load fails, load directly through the instantiation.
* If it is an interface, throw an exception. This is not elegant implementation
*/
public class DefaultInstance implements Instance {
@Override
public Object getInstance(Class<?> cls) {
ServiceLoader<?> load = ServiceLoader.load(cls);
Iterator<?> iterator = load.iterator();
if (iterator.hasNext()) {
return iterator.next();
}
return ReflectUtil.createInstance(cls);
}

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.rpc.support;
package cn.hippo4j.rpc.discovery;
/**
* Instance interface to get an instance

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.discovery;
/**
* Gets the top-level interface of the instance port
*/
@FunctionalInterface
public interface ServerPort {
/**
* Gets the listening or exposed port
*
* @return port
*/
int getPort();
}

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.rpc.support;
package cn.hippo4j.rpc.discovery;
import cn.hippo4j.common.config.ApplicationContextHolder;

@ -33,36 +33,40 @@ import java.util.List;
* Processing by the client connection pool handler to clean the buffer and define new connection properties
*/
@Slf4j
public class NettyClientPoolHandler extends NettyHandlerManager implements ChannelPoolHandler {
public class AbstractNettyClientPoolHandler extends AbstractNettyHandlerManager implements ChannelPoolHandler {
public NettyClientPoolHandler(List<ChannelHandler> handlers) {
public AbstractNettyClientPoolHandler(List<ChannelHandler> handlers) {
super(handlers);
}
public NettyClientPoolHandler(ChannelHandler... handlers) {
public AbstractNettyClientPoolHandler(ChannelHandler... handlers) {
super(handlers);
}
public NettyClientPoolHandler() {
public AbstractNettyClientPoolHandler() {
super();
}
public NettyClientPoolHandler addLast(String name, ChannelHandler handler) {
@Override
public AbstractNettyClientPoolHandler addLast(String name, ChannelHandler handler) {
super.addLast(name, handler);
return this;
}
public NettyClientPoolHandler addFirst(String name, ChannelHandler handler) {
@Override
public AbstractNettyClientPoolHandler addFirst(String name, ChannelHandler handler) {
super.addFirst(name, handler);
return this;
}
public NettyClientPoolHandler addLast(ChannelHandler handler) {
@Override
public AbstractNettyClientPoolHandler addLast(ChannelHandler handler) {
super.addLast(handler);
return this;
}
public NettyClientPoolHandler addFirst(ChannelHandler handler) {
@Override
public AbstractNettyClientPoolHandler addFirst(ChannelHandler handler) {
super.addFirst(handler);
return this;
}
@ -85,7 +89,7 @@ public class NettyClientPoolHandler extends NettyHandlerManager implements Chann
.setTcpNoDelay(false);
ch.pipeline().addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null)));
ch.pipeline().addLast(new NettyEncoder());
this.handlers.stream()
this.handlerEntities.stream()
.sorted()
.forEach(h -> {
if (h.getName() == null) {

@ -27,27 +27,33 @@ import java.util.stream.Collectors;
/**
* Processor manager for ChannelHandler in netty
*/
public abstract class NettyHandlerManager implements HandlerManager<ChannelHandler> {
public abstract class AbstractNettyHandlerManager implements HandlerManager<ChannelHandler> {
protected final List<HandlerEntity<ChannelHandler>> handlers;
protected final List<HandlerEntity<ChannelHandler>> handlerEntities;
AtomicLong firstIndex = new AtomicLong(-1);
AtomicLong lastIndex = new AtomicLong(0);
protected NettyHandlerManager(List<ChannelHandler> handlers) {
this.handlers = handlers.stream()
protected AbstractNettyHandlerManager(List<ChannelHandler> handlerEntities) {
Assert.notNull(handlerEntities);
this.handlerEntities = handlerEntities.stream()
.filter(Objects::nonNull)
.map(c -> getHandlerEntity(lastIndex.getAndIncrement(), c, null))
.collect(Collectors.toList());
}
protected NettyHandlerManager(ChannelHandler... handlers) {
this(handlers != null ? Arrays.asList(handlers) : Collections.emptyList());
protected AbstractNettyHandlerManager(ChannelHandler... handlerEntities) {
this(handlerEntities != null ? Arrays.asList(handlerEntities) : Collections.emptyList());
}
protected NettyHandlerManager() {
this.handlers = new LinkedList<>();
protected AbstractNettyHandlerManager() {
this.handlerEntities = new LinkedList<>();
}
@Override
public boolean isEmpty() {
return handlerEntities.isEmpty();
}
/**
@ -57,9 +63,9 @@ public abstract class NettyHandlerManager implements HandlerManager<ChannelHandl
* @param handler handler
* @return NettyHandlerManager
*/
public NettyHandlerManager addLast(String name, ChannelHandler handler) {
public AbstractNettyHandlerManager addLast(String name, ChannelHandler handler) {
Assert.notNull(handler);
this.handlers.add(getHandlerEntity(lastIndex.getAndIncrement(), handler, name));
this.handlerEntities.add(getHandlerEntity(lastIndex.getAndIncrement(), handler, name));
return this;
}
@ -70,9 +76,9 @@ public abstract class NettyHandlerManager implements HandlerManager<ChannelHandl
* @param handler handler
* @return NettyHandlerManager
*/
public NettyHandlerManager addFirst(String name, ChannelHandler handler) {
public AbstractNettyHandlerManager addFirst(String name, ChannelHandler handler) {
Assert.notNull(handler);
this.handlers.add(getHandlerEntity(firstIndex.getAndIncrement(), handler, name));
this.handlerEntities.add(getHandlerEntity(firstIndex.getAndIncrement(), handler, name));
return this;
}
@ -82,9 +88,9 @@ public abstract class NettyHandlerManager implements HandlerManager<ChannelHandl
* @param handler handler
* @return NettyHandlerManager
*/
public NettyHandlerManager addLast(ChannelHandler handler) {
public AbstractNettyHandlerManager addLast(ChannelHandler handler) {
Assert.notNull(handler);
this.handlers.add(getHandlerEntity(lastIndex.getAndIncrement(), handler, null));
this.handlerEntities.add(getHandlerEntity(lastIndex.getAndIncrement(), handler, null));
return this;
}
@ -94,9 +100,9 @@ public abstract class NettyHandlerManager implements HandlerManager<ChannelHandl
* @param handler handler
* @return NettyHandlerManager
*/
public NettyHandlerManager addFirst(ChannelHandler handler) {
public AbstractNettyHandlerManager addFirst(ChannelHandler handler) {
Assert.notNull(handler);
this.handlers.add(getHandlerEntity(firstIndex.getAndDecrement(), handler, null));
this.handlerEntities.add(getHandlerEntity(firstIndex.getAndDecrement(), handler, null));
return this;
}
}

@ -18,7 +18,7 @@
package cn.hippo4j.rpc.handler;
import cn.hippo4j.rpc.exception.ConnectionException;
import cn.hippo4j.rpc.response.Response;
import cn.hippo4j.rpc.model.Response;
import cn.hippo4j.rpc.support.ResultHolder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;

@ -17,8 +17,8 @@
package cn.hippo4j.rpc.handler;
import cn.hippo4j.rpc.request.Request;
import cn.hippo4j.rpc.response.Response;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
/**
* The handler in each connection, where the specific behavior of the connection

@ -18,7 +18,7 @@
package cn.hippo4j.rpc.handler;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
/**
* Manage the Handler used in the processing.<br>
@ -56,6 +56,13 @@ public interface HandlerManager<T> {
*/
HandlerManager<T> addFirst(T handler);
/**
* Whether handler exists
*
* @return Whether handler exists
*/
boolean isEmpty();
/**
* Create a handler
*
@ -68,7 +75,7 @@ public interface HandlerManager<T> {
return new HandlerEntity<>(order, handler, name);
}
@Data
@Getter
@AllArgsConstructor
class HandlerEntity<T> implements Comparable<HandlerEntity<T>> {

@ -18,7 +18,7 @@
package cn.hippo4j.rpc.handler;
import cn.hippo4j.common.web.exception.IllegalException;
import cn.hippo4j.rpc.response.Response;
import cn.hippo4j.rpc.model.Response;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;

@ -21,11 +21,11 @@ import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.common.toolkit.ReflectUtil;
import cn.hippo4j.rpc.process.ActivePostProcess;
import cn.hippo4j.rpc.process.ActiveProcessChain;
import cn.hippo4j.rpc.request.Request;
import cn.hippo4j.rpc.response.DefaultResponse;
import cn.hippo4j.rpc.response.Response;
import cn.hippo4j.rpc.support.ClassRegistry;
import cn.hippo4j.rpc.support.Instance;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.DefaultResponse;
import cn.hippo4j.rpc.model.Response;
import cn.hippo4j.rpc.discovery.ClassRegistry;
import cn.hippo4j.rpc.discovery.Instance;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.rpc.request;
package cn.hippo4j.rpc.model;
import java.io.IOException;
import java.io.ObjectInputStream;

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.rpc.response;
package cn.hippo4j.rpc.model;
import java.io.IOException;
import java.io.ObjectInputStream;
@ -117,6 +117,8 @@ public class DefaultResponse implements Response {
private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
s.defaultReadObject();
// Deserialization obj
this.obj = s.readObject();
if (!isErr()) {
this.obj = s.readObject();
}
}
}

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.rpc.request;
package cn.hippo4j.rpc.model;
import java.io.Serializable;

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.rpc.response;
package cn.hippo4j.rpc.model;
import java.io.Serializable;
@ -50,7 +50,8 @@ public interface Response extends Serializable {
String getErrMsg();
/**
* Whether the current request has an error
* Whether the current request has an error, <br>
* If it is true then it cannot be retrieved from obj
*/
boolean isErr();

@ -17,8 +17,8 @@
package cn.hippo4j.rpc.process;
import cn.hippo4j.rpc.request.Request;
import cn.hippo4j.rpc.response.Response;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
/**
* Callback while the connection is in progress

@ -17,8 +17,8 @@
package cn.hippo4j.rpc.process;
import cn.hippo4j.rpc.request.Request;
import cn.hippo4j.rpc.response.Response;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;

@ -20,7 +20,8 @@ package cn.hippo4j.rpc.server;
import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.rpc.coder.NettyDecoder;
import cn.hippo4j.rpc.coder.NettyEncoder;
import cn.hippo4j.rpc.handler.NettyHandlerManager;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.AbstractNettyHandlerManager;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
@ -37,38 +38,37 @@ import java.util.List;
* adapter to the netty server
*/
@Slf4j
public class NettyServerConnection extends NettyHandlerManager implements ServerConnection {
public class AbstractNettyServerConnection extends AbstractNettyHandlerManager implements ServerConnection {
Integer port;
ServerPort port;
EventLoopGroup leader;
EventLoopGroup worker;
Class<? extends ServerChannel> socketChannelCls = NioServerSocketChannel.class;
ChannelFuture future;
Channel channel;
public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, List<ChannelHandler> handlers) {
public AbstractNettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, List<ChannelHandler> handlers) {
super(handlers);
Assert.notNull(handlers);
Assert.notNull(leader);
Assert.notNull(worker);
this.leader = leader;
this.worker = worker;
}
public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, ChannelHandler... handlers) {
public AbstractNettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, ChannelHandler... handlers) {
this(leader, worker, (handlers != null ? Arrays.asList(handlers) : Collections.emptyList()));
}
public NettyServerConnection(ChannelHandler... handlers) {
public AbstractNettyServerConnection(ChannelHandler... handlers) {
this(handlers != null ? Arrays.asList(handlers) : Collections.emptyList());
}
public NettyServerConnection(List<ChannelHandler> handlers) {
public AbstractNettyServerConnection(List<ChannelHandler> handlers) {
this(new NioEventLoopGroup(), new NioEventLoopGroup(), handlers);
}
@Override
public void bind(int port) {
public void bind(ServerPort port) {
ServerBootstrap server = new ServerBootstrap();
server.group(leader, worker)
.channel(socketChannelCls)
@ -79,7 +79,7 @@ public class NettyServerConnection extends NettyHandlerManager implements Server
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null)));
ch.pipeline().addLast(new NettyEncoder());
handlers.stream()
handlerEntities.stream()
.sorted()
.forEach(h -> {
if (h.getName() == null) {
@ -91,7 +91,7 @@ public class NettyServerConnection extends NettyHandlerManager implements Server
}
});
try {
this.future = server.bind(port);
this.future = server.bind(port.getPort());
this.channel = this.future.channel();
log.info("The server is started and can receive requests. The listening port is {}", port);
this.port = port;
@ -117,22 +117,26 @@ public class NettyServerConnection extends NettyHandlerManager implements Server
return channel.isActive();
}
public NettyServerConnection addLast(String name, ChannelHandler handler) {
@Override
public AbstractNettyServerConnection addLast(String name, ChannelHandler handler) {
super.addLast(name, handler);
return this;
}
public NettyServerConnection addFirst(String name, ChannelHandler handler) {
@Override
public AbstractNettyServerConnection addFirst(String name, ChannelHandler handler) {
super.addFirst(name, handler);
return this;
}
public NettyServerConnection addLast(ChannelHandler handler) {
@Override
public AbstractNettyServerConnection addLast(ChannelHandler handler) {
super.addLast(handler);
return this;
}
public NettyServerConnection addFirst(ChannelHandler handler) {
@Override
public AbstractNettyServerConnection addFirst(ChannelHandler handler) {
super.addFirst(handler);
return this;
}

@ -17,6 +17,8 @@
package cn.hippo4j.rpc.server;
import cn.hippo4j.rpc.discovery.ServerPort;
import java.io.IOException;
/**
@ -24,10 +26,10 @@ import java.io.IOException;
*/
public class RPCServer implements Server {
int port;
ServerPort port;
ServerConnection serverConnection;
public RPCServer(int port, ServerConnection serverConnection) {
public RPCServer(ServerConnection serverConnection, ServerPort port) {
this.port = port;
this.serverConnection = serverConnection;
}

@ -17,6 +17,7 @@
package cn.hippo4j.rpc.server;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.Connection;
/**
@ -27,6 +28,6 @@ public interface ServerConnection extends Connection {
/**
* Bind ports and process them
*/
void bind(int port);
void bind(ServerPort port);
}

@ -17,6 +17,7 @@
package cn.hippo4j.rpc.support;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.exception.ConnectionException;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
@ -43,13 +44,13 @@ public class NettyConnectPool {
ChannelPoolHandler handler;
ChannelPool pool;
String host;
int port;
ServerPort port;
public NettyConnectPool(String host, int port, int maxConnect,
public NettyConnectPool(String host, ServerPort port, int maxConnect,
long timeout, EventLoopGroup worker,
Class<? extends Channel> socketChannelCls,
ChannelPoolHandler handler) {
InetSocketAddress socketAddress = InetSocketAddress.createUnresolved(host, port);
InetSocketAddress socketAddress = InetSocketAddress.createUnresolved(host, port.getPort());
Bootstrap bootstrap = new Bootstrap()
.group(worker)
.channel(socketChannelCls)
@ -59,7 +60,7 @@ public class NettyConnectPool {
this.handler = handler;
this.pool = new FixedChannelPool(bootstrap, handler, healthCheck, acquireTimeoutAction,
timeout, maxConnect, maxPendingAcquires, true, true);
log.info("The connection pool is established with the connection target {}:{}", host, port);
log.info("The connection pool is established with the connection target {}:{}", host, port.getPort());
NettyConnectPoolHolder.createPool(host, port, this);
}

@ -17,6 +17,7 @@
package cn.hippo4j.rpc.support;
import cn.hippo4j.rpc.discovery.ServerPort;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.pool.ChannelPoolHandler;
@ -38,7 +39,7 @@ public class NettyConnectPoolHolder {
static Map<String, NettyConnectPool> connectPoolMap = new ConcurrentHashMap<>();
private static NettyConnectPool initPool(String host, int port,
private static NettyConnectPool initPool(String host, ServerPort port,
long timeout, EventLoopGroup worker,
ChannelPoolHandler handler) {
return new NettyConnectPool(
@ -48,8 +49,8 @@ public class NettyConnectPoolHolder {
handler);
}
private static String getKey(String host, int port) {
return host + ":" + port;
private static String getKey(String host, ServerPort port) {
return host + ":" + port.getPort();
}
/**
@ -60,7 +61,7 @@ public class NettyConnectPoolHolder {
* @param port the port
* @param pool This parameter applies only to the connection pool of netty
*/
public static void createPool(String host, int port, NettyConnectPool pool) {
public static void createPool(String host, ServerPort port, NettyConnectPool pool) {
connectPoolMap.put(getKey(host, port), pool);
}
@ -71,7 +72,7 @@ public class NettyConnectPoolHolder {
* @param port the port
* @return Map to the connection pool
*/
public static NettyConnectPool getPool(String host, int port) {
public static NettyConnectPool getPool(String host, ServerPort port) {
return connectPoolMap.get(getKey(host, port));
}
@ -86,7 +87,7 @@ public class NettyConnectPoolHolder {
* @param handler the chandler for netty
* @return Map to the connection pool
*/
public static synchronized NettyConnectPool getPool(String host, int port,
public static synchronized NettyConnectPool getPool(String host, ServerPort port,
long timeout, EventLoopGroup worker,
ChannelPoolHandler handler) {
/*
@ -102,7 +103,7 @@ public class NettyConnectPoolHolder {
* @param host host
* @param port port
*/
public static void remove(String host, int port) {
public static void remove(String host, ServerPort port) {
connectPoolMap.remove(getKey(host, port));
}

@ -19,10 +19,13 @@ package cn.hippo4j.rpc.support;
import cn.hippo4j.common.toolkit.IdUtil;
import cn.hippo4j.common.web.exception.IllegalException;
import cn.hippo4j.rpc.client.Client;
import cn.hippo4j.rpc.client.NettyClientConnection;
import cn.hippo4j.rpc.request.DefaultRequest;
import cn.hippo4j.rpc.request.Request;
import cn.hippo4j.rpc.response.Response;
import cn.hippo4j.rpc.client.RPCClient;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.model.DefaultRequest;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import io.netty.channel.pool.ChannelPoolHandler;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
@ -51,13 +54,14 @@ public class NettyProxyCenter {
* @param handler the pool handler for netty
* @return Proxy objects
*/
public static <T> T getProxy(Class<T> cls, String host, int port, ChannelPoolHandler handler) {
public static <T> T getProxy(Class<T> cls, String host, ServerPort port, ChannelPoolHandler handler) {
NettyClientConnection connection = new NettyClientConnection(host, port, handler);
return getProxy(connection, cls, host, port);
Client rpcClient = new RPCClient(connection);
return getProxy(rpcClient, cls, host, port);
}
@SuppressWarnings("unchecked")
public static <T> T getProxy(NettyClientConnection connection, Class<T> cls, String host, int port) {
public static <T> T getProxy(Client client, Class<T> cls, String host, ServerPort port) {
boolean b = cls.isInterface();
if (!b) {
throw new IllegalException(cls.getName() + "is not a Interface");
@ -75,7 +79,7 @@ public class NettyProxyCenter {
String key = host + port + clsName + methodName + IdUtil.simpleUUID();
Class<?>[] parameterTypes = method.getParameterTypes();
Request request = new DefaultRequest(key, clsName, methodName, parameterTypes, args);
Response response = connection.connect(request);
Response response = client.connection(request);
if (response == null) {
return null;
}

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.support;
import cn.hippo4j.rpc.discovery.ClassRegistry;
import cn.hippo4j.rpc.discovery.DefaultInstance;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.HandlerManager;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.rpc.server.AbstractNettyServerConnection;
import cn.hippo4j.rpc.server.RPCServer;
import cn.hippo4j.rpc.server.Server;
import io.netty.channel.ChannelHandler;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* This is a server-side build class that allows you to quickly prepare data on the server side and start the server side.<br>
* <p>
* The composite pattern is adopted, which means that it is itself a server-side implementation, so it is stateless.
*/
public class NettyServerSupport implements Server {
/**
* The interface that the server side can call,
* All the methods in the interface are brokered during initialization
*/
List<Class<?>> classes;
/**
* Extract the port number of the web container,
* which is the port information exposed by the server
*/
ServerPort serverPort;
/**
* ChannelHandler
*/
HandlerManager<ChannelHandler> handlerManager;
Server server;
public NettyServerSupport(ServerPort serverPort, Class<?>... classes) {
this(serverPort, new AbstractNettyServerConnection(), classes);
}
public NettyServerSupport(ServerPort serverPort, List<Class<?>> classes) {
this(serverPort, new AbstractNettyServerConnection(), classes);
}
public NettyServerSupport(ServerPort serverPort, HandlerManager<ChannelHandler> handlerManager, Class<?>... classes) {
this(serverPort, handlerManager, classes != null ? Arrays.asList(classes) : Collections.emptyList());
}
public NettyServerSupport(ServerPort serverPort, HandlerManager<ChannelHandler> handlerManager, List<Class<?>> classes) {
this.classes = classes;
this.serverPort = serverPort;
this.handlerManager = handlerManager;
initServer();
}
/**
* Initializes the entire server side, which includes interface registration, processors, and ports.<br>
* Only interfaces are registered during registration. Classes and abstract classes are not registered.
* If no processor is available, a default processor is provided
*/
private void initServer() {
// Register the interface that can be invoked
classes.stream().filter(Class::isInterface)
.forEach(cls -> ClassRegistry.put(cls.getName(), cls));
AbstractNettyServerConnection connection = (handlerManager instanceof AbstractNettyServerConnection)
? (AbstractNettyServerConnection) handlerManager
: new AbstractNettyServerConnection();
// Assign a default handler if no handler exists
if (connection.isEmpty()) {
connection.addLast(new NettyServerTakeHandler(new DefaultInstance()));
}
server = new RPCServer(connection, serverPort);
}
@Override
public void bind() {
server.bind();
}
@Override
public boolean isActive() {
return server.isActive();
}
@Override
public void close() throws IOException {
server.close();
}
}

@ -17,18 +17,19 @@
package cn.hippo4j.rpc.client;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.AbstractNettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.rpc.request.DefaultRequest;
import cn.hippo4j.rpc.request.Request;
import cn.hippo4j.rpc.response.Response;
import cn.hippo4j.rpc.server.NettyServerConnection;
import cn.hippo4j.rpc.model.DefaultRequest;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import cn.hippo4j.rpc.server.AbstractNettyServerConnection;
import cn.hippo4j.rpc.server.RPCServer;
import cn.hippo4j.rpc.server.ServerConnection;
import cn.hippo4j.rpc.support.ClassRegistry;
import cn.hippo4j.rpc.support.DefaultInstance;
import cn.hippo4j.rpc.support.Instance;
import cn.hippo4j.rpc.discovery.ClassRegistry;
import cn.hippo4j.rpc.discovery.DefaultInstance;
import cn.hippo4j.rpc.discovery.Instance;
import io.netty.channel.pool.ChannelPoolHandler;
import org.junit.Assert;
import org.junit.Test;
@ -40,8 +41,8 @@ import java.util.concurrent.TimeUnit;
public class RPCClientTest {
String host = "localhost";
int port = 8888;
int portTest = 8889;
ServerPort port = new TestServerPort();
ServerPort portTest = new TestPortServerPort();
@Test
public void connection() throws IOException {
@ -51,15 +52,15 @@ public class RPCClientTest {
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(port, connection);
ServerConnection connection = new AbstractNettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(connection, port);
CompletableFuture.runAsync(rpcServer::bind);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
ChannelPoolHandler channelPoolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
ChannelPoolHandler channelPoolHandler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler());
NettyClientConnection clientConnection = new NettyClientConnection(host, port, channelPoolHandler);
RPCClient rpcClient = new RPCClient(clientConnection);
Request request = new DefaultRequest("127.0.0.18888", className, "call", null, null);
@ -84,15 +85,15 @@ public class RPCClientTest {
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(portTest, connection);
ServerConnection connection = new AbstractNettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(connection, portTest);
CompletableFuture.runAsync(rpcServer::bind);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
ChannelPoolHandler channelPoolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
ChannelPoolHandler channelPoolHandler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler());
NettyClientConnection clientConnection = new NettyClientConnection(host, portTest, channelPoolHandler);
RPCClient rpcClient = new RPCClient(clientConnection);
Class<?>[] classes = new Class<?>[2];
@ -109,4 +110,20 @@ public class RPCClientTest {
rpcClient.close();
rpcServer.close();
}
static class TestServerPort implements ServerPort {
@Override
public int getPort() {
return 8888;
}
}
static class TestPortServerPort implements ServerPort {
@Override
public int getPort() {
return 8889;
}
}
}

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.rpc.support;
package cn.hippo4j.rpc.discovery;
import org.junit.Assert;
import org.junit.Test;

@ -15,12 +15,9 @@
* limitations under the License.
*/
package cn.hippo4j.rpc.support;
package cn.hippo4j.rpc.discovery;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import cn.hippo4j.common.web.exception.IllegalException;
import org.junit.Assert;
import org.junit.Test;
@ -38,18 +35,26 @@ public class DefaultInstanceTest {
@Test
public void testGetInstance() {
String className = "cn.hippo4j.rpc.support.DefaultInstanceTest$InstanceModel";
String className = "cn.hippo4j.rpc.discovery.InstanceModel";
Object instanceInstance = instance.getInstance(className);
Assert.assertNotNull(instanceInstance);
Assert.assertEquals(className, instanceInstance.getClass().getName());
}
@Setter
@Getter
@AllArgsConstructor
@NoArgsConstructor
public static class InstanceModel {
@Test(expected = IllegalException.class)
public void testGetInstanceTest() {
String className = "cn.hippo4j.rpc.discovery.InstanceModelTest";
Object instanceInstance = instance.getInstance(className);
Assert.assertNotNull(instanceInstance);
Assert.assertEquals(className, instanceInstance.getClass().getName());
}
String name;
@Test
public void getInstanceTest() {
Class<InstanceServerLoader> cls = InstanceServerLoader.class;
Object instanceInstance = instance.getInstance(cls);
Assert.assertNotNull(instanceInstance);
Assert.assertEquals(InstanceServerLoaderImpl.class, instanceInstance.getClass());
}
}

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.discovery;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.springframework.boot.test.context.TestComponent;
@Setter
@Getter
@AllArgsConstructor
@NoArgsConstructor
@TestComponent
public class InstanceModel {
String name;
}

@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.discovery;
public interface InstanceServerLoader {
String getName();
}

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.discovery;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class InstanceServerLoaderImpl implements InstanceServerLoader {
String name = "name";
}

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.discovery;
import cn.hippo4j.common.config.ApplicationContextHolder;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = {InstanceModel.class, ApplicationContextHolder.class})
@RunWith(SpringJUnit4ClassRunner.class)
public class SpringContextInstanceTest {
Instance instance = new SpringContextInstance();
@Test
public void getInstance() {
Object obj = instance.getInstance(InstanceModel.class);
Assert.assertNotNull(obj);
Assert.assertEquals(obj.getClass(), InstanceModel.class);
}
@Test
public void testGetInstance() {
Object obj = instance.getInstance("instanceModel");
Assert.assertNotNull(obj);
Assert.assertEquals(obj.getClass(), InstanceModel.class);
}
}

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.handler;
import cn.hippo4j.rpc.client.NettyClientConnection;
import cn.hippo4j.rpc.client.RPCClient;
import cn.hippo4j.rpc.discovery.*;
import cn.hippo4j.rpc.server.AbstractNettyServerConnection;
import cn.hippo4j.rpc.server.RPCServer;
import cn.hippo4j.rpc.support.NettyProxyCenter;
import io.netty.channel.pool.ChannelPoolHandler;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class ConnectHandlerTest {
@Test
public void handlerTest() throws IOException {
// server
Class<InstanceServerLoader> cls = InstanceServerLoader.class;
ClassRegistry.put(cls.getName(), cls);
ServerPort port = () -> 8891;
Instance instance = new DefaultInstance();
NettyServerTakeHandler serverHandler = new NettyServerTakeHandler(instance);
AbstractNettyServerConnection connection = new AbstractNettyServerConnection(serverHandler);
RPCServer rpcServer = new RPCServer(connection, port);
CompletableFuture.runAsync(rpcServer::bind);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
ChannelPoolHandler channelPoolHandler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler());
NettyClientConnection clientConnection = new NettyClientConnection("localhost", port, channelPoolHandler);
RPCClient rpcClient = new RPCClient(clientConnection);
InstanceServerLoader loader = NettyProxyCenter.getProxy(rpcClient, cls, "localhost", port);
String name = loader.getName();
Assert.assertEquals("name", name);
rpcClient.close();
rpcServer.close();
}
}

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.handler;
import io.netty.channel.ChannelHandler;
import org.junit.Assert;
import org.junit.Test;
public class NettyClientPoolHandlerTest {
@Test
public void testGetHandlerEntity() {
TestHandler handler = new TestHandler();
long order = 0;
String name = "Test";
AbstractNettyClientPoolHandler poolHandler = new AbstractNettyClientPoolHandler();
HandlerManager.HandlerEntity<ChannelHandler> entity = poolHandler.getHandlerEntity(order, handler, name);
Assert.assertEquals(entity.getName(), name);
Assert.assertEquals(entity.getOrder(), order);
Assert.assertEquals(entity.getHandler(), handler);
}
@Test
public void testCompareTo() {
TestHandler handler = new TestHandler();
long order = 0;
String name = "Test";
TestHandler handler1 = new TestHandler();
long order1 = 1;
String name1 = "Test1";
AbstractNettyClientPoolHandler poolHandler = new AbstractNettyClientPoolHandler();
HandlerManager.HandlerEntity<ChannelHandler> entity = poolHandler.getHandlerEntity(order, handler, name);
HandlerManager.HandlerEntity<ChannelHandler> entity1 = poolHandler.getHandlerEntity(order1, handler1, name1);
int compare = entity.compareTo(entity1);
Assert.assertTrue(compare < 0);
}
@Test
public void addLast() {
AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler();
Assert.assertTrue(handler.isEmpty());
handler.addLast(new TestHandler());
Assert.assertFalse(handler.isEmpty());
}
@Test
public void addFirst() {
AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler();
Assert.assertTrue(handler.isEmpty());
handler.addFirst(new TestHandler());
Assert.assertFalse(handler.isEmpty());
}
@Test
public void testAddLast() {
AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler();
Assert.assertTrue(handler.isEmpty());
handler.addLast("Test", new TestHandler());
Assert.assertFalse(handler.isEmpty());
}
@Test
public void testAddFirst() {
AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler();
Assert.assertTrue(handler.isEmpty());
handler.addFirst("Test", new TestHandler());
Assert.assertFalse(handler.isEmpty());
}
}

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.handler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
public class TestHandler implements ChannelHandler {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
}
}

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.model;
import cn.hippo4j.rpc.discovery.InstanceServerLoaderImpl;
import org.junit.Assert;
import org.junit.Test;
import java.io.*;
import java.lang.reflect.Method;
public class DefaultRequestTest {
@Test
public void testReadObject() throws IOException, ClassNotFoundException, NoSuchMethodException {
String key = "name";
String clsName = InstanceServerLoaderImpl.class.getName();
Method method = InstanceServerLoaderImpl.class.getMethod("setName", String.class);
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
Object[] parameters = new Object[1];
parameters[0] = "hippo4j";
Request request = new DefaultRequest(key, clsName, methodName, parameterTypes, parameters);
byte[] bytes;
try (
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream)) {
outputStream.writeObject(request);
outputStream.flush();
bytes = byteArrayOutputStream.toByteArray();
}
Request request1;
try (
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
request1 = (Request) objectInputStream.readObject();
}
Assert.assertEquals(request1.hashCode(), request1.hashCode());
Assert.assertEquals(key, request1.getKey());
Assert.assertEquals(clsName, request1.getClassName());
Assert.assertEquals(methodName, request1.getMethodName());
Assert.assertArrayEquals(parameterTypes, request1.getParameterTypes());
Assert.assertArrayEquals(parameters, request1.getParameters());
Assert.assertEquals(request1, request);
}
}

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.model;
import cn.hippo4j.common.web.exception.IllegalException;
import org.junit.Assert;
import org.junit.Test;
import java.io.*;
public class DefaultResponseTest {
@Test
public void testReadObject() throws IOException, ClassNotFoundException {
String key = "name";
Object o = "obj";
Class<?> cls = String.class;
Response response = new DefaultResponse(key, cls, o);
byte[] bytes;
try (
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream)) {
outputStream.writeObject(response);
outputStream.flush();
bytes = byteArrayOutputStream.toByteArray();
}
Response response1;
try (
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
response1 = (Response) objectInputStream.readObject();
}
Assert.assertEquals(response1.hashCode(), response.hashCode());
Assert.assertEquals(key, response1.getKey());
Assert.assertEquals(o, response1.getObj());
Assert.assertEquals(cls, response1.getCls());
Assert.assertEquals(response1, response);
Assert.assertFalse(response1.isErr());
}
@Test
public void testWriteObject() throws IOException, ClassNotFoundException {
String key = "name";
Throwable throwable = new IllegalException("test throwable");
String errMsg = "test throwable";
Response response = new DefaultResponse(key, throwable, errMsg);
byte[] bytes;
try (
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ObjectOutputStream outputStream = new ObjectOutputStream(byteArrayOutputStream)) {
outputStream.writeObject(response);
outputStream.flush();
bytes = byteArrayOutputStream.toByteArray();
}
Response response1;
try (
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
response1 = (Response) objectInputStream.readObject();
}
Assert.assertEquals(key, response1.getKey());
Assert.assertThrows(IllegalException.class, () -> {
throw response1.getThrowable();
});
Assert.assertEquals(response1.hashCode(), response.hashCode());
Assert.assertEquals(errMsg, response1.getErrMsg());
Assert.assertEquals(response1, response);
Assert.assertTrue(response1.isErr());
}
}

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.server;
import cn.hippo4j.rpc.handler.TestHandler;
import org.junit.Assert;
import org.junit.Test;
public class NettyServerConnectionTest {
@Test
public void addLast() {
AbstractNettyServerConnection connection = new AbstractNettyServerConnection();
Assert.assertTrue(connection.isEmpty());
connection.addLast(new TestHandler());
Assert.assertFalse(connection.isEmpty());
}
@Test
public void addFirst() {
AbstractNettyServerConnection connection = new AbstractNettyServerConnection();
Assert.assertTrue(connection.isEmpty());
connection.addFirst(new TestHandler());
Assert.assertFalse(connection.isEmpty());
}
@Test
public void testAddLast() {
AbstractNettyServerConnection connection = new AbstractNettyServerConnection();
Assert.assertTrue(connection.isEmpty());
connection.addLast("Test", new TestHandler());
Assert.assertFalse(connection.isEmpty());
}
@Test
public void testAddFirst() {
AbstractNettyServerConnection connection = new AbstractNettyServerConnection();
Assert.assertTrue(connection.isEmpty());
connection.addFirst("Test", new TestHandler());
Assert.assertFalse(connection.isEmpty());
}
}

@ -17,9 +17,10 @@
package cn.hippo4j.rpc.server;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.rpc.support.DefaultInstance;
import cn.hippo4j.rpc.support.Instance;
import cn.hippo4j.rpc.discovery.DefaultInstance;
import cn.hippo4j.rpc.discovery.Instance;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import org.junit.Assert;
@ -31,14 +32,14 @@ import java.util.concurrent.TimeUnit;
public class RPCServerTest {
public static int port = 8888;
public static ServerPort port = new TestServerPort();
@Test
public void bind() throws IOException {
Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(port, connection);
ServerConnection connection = new AbstractNettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(connection, port);
CompletableFuture.runAsync(rpcServer::bind);
try {
TimeUnit.SECONDS.sleep(3);
@ -58,8 +59,8 @@ public class RPCServerTest {
EventLoopGroup leader = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(leader, worker, handler);
RPCServer rpcServer = new RPCServer(port, connection);
ServerConnection connection = new AbstractNettyServerConnection(leader, worker, handler);
RPCServer rpcServer = new RPCServer(connection, port);
CompletableFuture.runAsync(rpcServer::bind);
try {
TimeUnit.SECONDS.sleep(3);
@ -69,4 +70,11 @@ public class RPCServerTest {
rpcServer.close();
}
static class TestServerPort implements ServerPort {
@Override
public int getPort() {
return 8888;
}
}
}

@ -17,7 +17,8 @@
package cn.hippo4j.rpc.support;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.AbstractNettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
@ -29,7 +30,7 @@ import org.junit.Test;
public class NettyConnectPoolHolderTest {
String host = "127.0.0.1";
int port = 8888;
ServerPort port = new TestServerPort();
int maxCount = 8;
int timeout = 5000;
EventLoopGroup group = new NioEventLoopGroup();
@ -37,7 +38,7 @@ public class NettyConnectPoolHolderTest {
@Test
public void createPool() {
NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler());
AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, handler);
NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port);
Assert.assertEquals(pool, connectPool);
@ -48,7 +49,7 @@ public class NettyConnectPoolHolderTest {
@Test
public void testGetPool() {
NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler());
AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port, timeout, group, handler);
NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(host, port);
Assert.assertEquals(connectPool1, connectPool);
@ -59,7 +60,7 @@ public class NettyConnectPoolHolderTest {
@Test
public void remove() {
NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler());
AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port, timeout, group, handler);
NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(host, port);
Assert.assertEquals(connectPool1, connectPool);
@ -67,4 +68,12 @@ public class NettyConnectPoolHolderTest {
NettyConnectPool connectPool2 = NettyConnectPoolHolder.getPool(host, port);
Assert.assertNull(connectPool2);
}
static class TestServerPort implements ServerPort {
@Override
public int getPort() {
return 8888;
}
}
}

@ -17,10 +17,13 @@
package cn.hippo4j.rpc.support;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import cn.hippo4j.rpc.discovery.DefaultInstance;
import cn.hippo4j.rpc.discovery.Instance;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.AbstractNettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.rpc.server.NettyServerConnection;
import cn.hippo4j.rpc.server.AbstractNettyServerConnection;
import cn.hippo4j.rpc.server.RPCServer;
import cn.hippo4j.rpc.server.ServerConnection;
import io.netty.channel.Channel;
@ -38,7 +41,7 @@ import java.util.concurrent.TimeUnit;
public class NettyConnectPoolTest {
String host = "127.0.0.1";
int port = 8888;
ServerPort port = new TestServerPort();
int maxCount = 64;
int timeout = 5000;
EventLoopGroup group = new NioEventLoopGroup();
@ -49,8 +52,8 @@ public class NettyConnectPoolTest {
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(port, connection);
ServerConnection connection = new AbstractNettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(connection, port);
CompletableFuture.runAsync(rpcServer::bind);
// Given the delay in starting the server, wait here
try {
@ -58,7 +61,7 @@ public class NettyConnectPoolTest {
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
NettyClientPoolHandler poolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
AbstractNettyClientPoolHandler poolHandler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, poolHandler);
Channel acquire = pool.acquire(timeout);
Assert.assertNotNull(acquire);
@ -71,8 +74,8 @@ public class NettyConnectPoolTest {
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(port, connection);
ServerConnection connection = new AbstractNettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(connection, port);
CompletableFuture.runAsync(rpcServer::bind);
// Given the delay in starting the server, wait here
try {
@ -80,7 +83,7 @@ public class NettyConnectPoolTest {
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
NettyClientPoolHandler poolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
AbstractNettyClientPoolHandler poolHandler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, poolHandler);
Future<Channel> acquire = pool.acquire();
Assert.assertNotNull(acquire);
@ -92,8 +95,8 @@ public class NettyConnectPoolTest {
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(port, connection);
ServerConnection connection = new AbstractNettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(connection, port);
CompletableFuture.runAsync(rpcServer::bind);
// Given the delay in starting the server, wait here
try {
@ -102,7 +105,7 @@ public class NettyConnectPoolTest {
throw new RuntimeException(e);
}
NettyClientPoolHandler poolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
AbstractNettyClientPoolHandler poolHandler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, poolHandler);
Channel acquire = pool.acquire(timeout);
Assert.assertNotNull(acquire);
@ -110,4 +113,12 @@ public class NettyConnectPoolTest {
pool.close();
rpcServer.close();
}
static class TestServerPort implements ServerPort {
@Override
public int getPort() {
return 8888;
}
}
}

@ -18,33 +18,27 @@
package cn.hippo4j.rpc.support;
import cn.hippo4j.common.web.exception.IllegalException;
import cn.hippo4j.rpc.exception.ConnectionException;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.AbstractNettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import org.junit.Assert;
import org.junit.Test;
public class NettyProxyCenterTest {
ServerPort port = new TestServerPort();
@Test
public void getProxy() {
NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler());
ProxyInterface localhost = NettyProxyCenter.getProxy(ProxyInterface.class, "localhost", 8888, handler);
AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler());
ProxyInterface localhost = NettyProxyCenter.getProxy(ProxyInterface.class, "localhost", port, handler);
Assert.assertNotNull(localhost);
}
@Test(expected = IllegalException.class)
public void getProxyTest() {
NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler());
ProxyClass localhost = NettyProxyCenter.getProxy(ProxyClass.class, "localhost", 8888, handler);
Assert.assertNotNull(localhost);
}
@Test(expected = ConnectionException.class)
public void getProxyTestCall() {
NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler());
ProxyInterface localhost = NettyProxyCenter.getProxy(ProxyInterface.class, "localhost", 8888, handler);
localhost.hello();
AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler());
ProxyClass localhost = NettyProxyCenter.getProxy(ProxyClass.class, "localhost", port, handler);
Assert.assertNotNull(localhost);
}
@ -56,4 +50,12 @@ public class NettyProxyCenterTest {
static class ProxyClass {
}
static class TestServerPort implements ServerPort {
@Override
public int getPort() {
return 8888;
}
}
}

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.support;
import cn.hippo4j.rpc.discovery.InstanceServerLoader;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class NettyServerSupportTest {
@Test
public void bind() throws IOException {
NettyServerSupport support = new NettyServerSupport(() -> 8890, InstanceServerLoader.class);
CompletableFuture.runAsync(support::bind);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Assert.assertTrue(support.isActive());
support.close();
Assert.assertFalse(support.isActive());
}
}
Loading…
Cancel
Save