Refactor the rpc module (#1309)

pull/1297/head
pizihao 1 year ago committed by GitHub
parent 53e1de5eee
commit 630e8402cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -11,18 +11,14 @@
<dependencies>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-infra-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-infra-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>

@ -30,8 +30,7 @@ import java.io.Closeable;
* able to communicate with the server. Wait for the server's Response ({@link Response})
* <h3>METHOD</h3>
* <ul>
* <li>{@link #connection(Request)}</li>
* <li>{@link #isActive()}</li>
* <li>{@link #connect(Request)}</li>
* <li>{@link #close()}</li>
* </ul>
* You can usually use the client in this way:
@ -55,13 +54,6 @@ public interface Client extends Closeable {
* @param request Request information, Requested methods and parameters
* @return response Response from server side
*/
Response connection(Request request);
/**
* Check whether the client is active
*
* @return Whether active
*/
boolean isActive();
<R> R connect(Request request);
}

@ -15,15 +15,19 @@
* limitations under the License.
*/
package cn.hippo4j.rpc.support;
package cn.hippo4j.rpc.client;
import cn.hippo4j.rpc.client.Client;
import cn.hippo4j.rpc.client.ClientConnection;
import cn.hippo4j.rpc.client.NettyClientConnection;
import cn.hippo4j.rpc.client.RPCClient;
import cn.hippo4j.rpc.connection.ClientConnection;
import cn.hippo4j.rpc.connection.SimpleClientConnection;
import cn.hippo4j.rpc.exception.OperationException;
import cn.hippo4j.rpc.handler.ErrorClientHandler;
import cn.hippo4j.rpc.handler.HandlerManager;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import cn.hippo4j.rpc.handler.ClientPoolHandler;
import cn.hippo4j.rpc.handler.ClientTakeHandler;
import cn.hippo4j.rpc.model.DefaultRequest;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.support.AddressUtil;
import cn.hippo4j.rpc.server.ServerSupport;
import io.netty.channel.ChannelHandler;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
@ -33,6 +37,7 @@ import java.lang.ref.WeakReference;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
@ -48,12 +53,12 @@ import java.util.concurrent.ConcurrentHashMap;
* by the container
*
* @see cn.hippo4j.rpc.client.RPCClient
* @see cn.hippo4j.rpc.client.NettyClientConnection
* @see NettyServerSupport
* @see SimpleClientConnection
* @see ServerSupport
* @since 2.0.0
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class NettyClientSupport {
public final class ClientSupport {
/**
* the cache for client
@ -69,13 +74,14 @@ public final class NettyClientSupport {
*/
public static Client getClient(InetSocketAddress address, HandlerManager<ChannelHandler> handlerManager) {
return CLIENT_MAP.computeIfAbsent(address, a -> {
NettyClientPoolHandler handler = (handlerManager instanceof NettyClientPoolHandler)
? (NettyClientPoolHandler) handlerManager
: new NettyClientPoolHandler();
ClientPoolHandler handler = (handlerManager instanceof ClientPoolHandler)
? (ClientPoolHandler) handlerManager
: new ClientPoolHandler();
if (handler.isEmpty()) {
handler.addFirst(null, new NettyClientTakeHandler());
handler.addFirst(null, new ClientTakeHandler());
}
NettyClientConnection connection = new NettyClientConnection(address, handler);
handler.addLast(null, new ErrorClientHandler());
SimpleClientConnection connection = new SimpleClientConnection(address, handler);
return new RPCClient(connection);
});
}
@ -87,7 +93,44 @@ public final class NettyClientSupport {
* @return Client
*/
public static Client getClient(InetSocketAddress address) {
return getClient(address, new NettyClientPoolHandler());
return getClient(address, new ClientPoolHandler());
}
/**
* Find a suitable client and send a request to the server
*
* @param address address
* @param handlerName The handler that can handle this request
* @param param parameter
* @return result
*/
public static <R> R clientSend(String address, String handlerName, Object[] param) {
InetSocketAddress socketAddress = AddressUtil.getInetAddress(address);
Client client = getClient(socketAddress);
Request request = new DefaultRequest(UUID.randomUUID().toString(), handlerName, param);
return client.connect(request);
}
public static <R> R clientSend(String address, String handlerName, Object param) {
Object[] params = {param};
InetSocketAddress socketAddress = AddressUtil.getInetAddress(address);
Client client = getClient(socketAddress);
Request request = new DefaultRequest(UUID.randomUUID().toString(), handlerName, params);
return client.connect(request);
}
/**
* Find a suitable client and send a request to the server
*
* @param address address
* @param handlerName The handler that can handle this request
* @return result
*/
public static <R> R clientSend(String address, String handlerName) {
InetSocketAddress socketAddress = AddressUtil.getInetAddress(address);
Client client = getClient(socketAddress);
Request request = new DefaultRequest(UUID.randomUUID().toString(), handlerName);
return client.connect(request);
}
/**
@ -102,7 +145,7 @@ public final class NettyClientSupport {
try {
c.close();
} catch (IOException e) {
throw new RuntimeException(e);
throw new OperationException(e);
}
});
}

@ -1,118 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.client;
import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.rpc.exception.TimeOutException;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import cn.hippo4j.rpc.support.NettyConnectPool;
import cn.hippo4j.rpc.support.NettyConnectPoolHolder;
import cn.hippo4j.rpc.support.ResultHolder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.pool.ChannelPoolHandler;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.Optional;
import java.util.concurrent.locks.LockSupport;
/**
* Client implemented using netty
*
* @since 2.0.0
*/
@Slf4j
public class NettyClientConnection implements ClientConnection {
InetSocketAddress address;
/**
* Obtain the connection timeout period. The default value is 30s
*/
long timeout = 30000L;
final int nanosPerMilliSecond = 1000000;
EventLoopGroup worker = new NioEventLoopGroup();
NettyConnectPool connectionPool;
ChannelFuture future;
Channel channel;
public NettyClientConnection(InetSocketAddress address,
ChannelPoolHandler handler) {
Assert.notNull(worker);
this.address = address;
this.connectionPool = NettyConnectPoolHolder.getPool(address, timeout, worker, handler);
}
@Override
public Response connect(Request request) {
this.channel = connectionPool.acquire(timeout);
boolean debugEnabled = log.isDebugEnabled();
Response response;
try {
String key = request.getKey();
this.future = channel.writeAndFlush(request);
if (debugEnabled) {
log.debug("Call successful, target address is {}:{}, request key is {}", address.getHostName(), address.getPort(), key);
}
// Wait for execution to complete
ResultHolder.putThread(key, Thread.currentThread());
LockSupport.parkNanos(timeout() * nanosPerMilliSecond);
response = ResultHolder.get(key);
if (response == null) {
throw new TimeOutException("Timeout waiting for server-side response");
}
if (debugEnabled) {
log.debug("The response from {}:{} was received successfully with the response key {}.", address.getHostName(), address.getPort(), key);
}
return response;
} catch (Exception ex) {
throw ex;
} finally {
connectionPool.release(this.channel);
}
}
@Override
public long timeout() {
return timeout;
}
@Override
public void setTimeout(long timeout) {
this.timeout = timeout;
}
@Override
public void close() {
Optional.ofNullable(this.channel)
.ifPresent(c -> {
worker.shutdownGracefully();
this.future.channel().close();
this.channel.close();
});
}
@Override
public boolean isActive() {
return channel.isActive();
}
}

@ -17,8 +17,8 @@
package cn.hippo4j.rpc.client;
import cn.hippo4j.rpc.connection.ClientConnection;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import java.io.IOException;
@ -37,15 +37,10 @@ public class RPCClient implements Client {
}
@Override
public Response connection(Request request) {
public <R> R connect(Request request) {
return clientConnection.connect(request);
}
@Override
public boolean isActive() {
return clientConnection.isActive();
}
/**
* Close the client and release all connections.
*

@ -32,13 +32,13 @@ import java.io.Serializable;
*
* @since 2.0.0
*/
public class NettyEncoder extends MessageToByteEncoder<Serializable> {
public class ObjectEncoder extends MessageToByteEncoder<Serializable> {
private static final int BYTE_LENGTH = 4;
private static final byte[] BYTE = new byte[BYTE_LENGTH];
@Override
protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception {
protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) {
int startIndex = out.writerIndex();
try (ByteBufOutputStream outPut = new ByteBufOutputStream(out)) {
outPut.write(BYTE);

@ -15,25 +15,31 @@
* limitations under the License.
*/
package cn.hippo4j.rpc.client;
package cn.hippo4j.rpc.connection;
import cn.hippo4j.rpc.handler.Connection;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import java.io.Closeable;
/**
* Applicable to client connections<br>
* Represents a network request connection and provides IO layer support<br>
* <p>
* This is not a strict and stateless Connection interface, it contains the necessary
* operations that should be done in the connection. It is more like integrating the
* connection and the connection channel together, so creating {@link ClientConnection} is
* very resource intensive, for which caching is recommended
*
* @since 2.0.0
*/
public interface ClientConnection extends Connection {
public interface ClientConnection extends Closeable {
/**
* Establish a connection and process
*
* @param request Request information
*/
Response connect(Request request);
<R> R connect(Request request);
/**
* Get timeout, ms

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.rpc.support;
package cn.hippo4j.rpc.connection;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
@ -35,16 +35,16 @@ import java.util.concurrent.ConcurrentHashMap;
* @since 2.0.0
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class NettyConnectPoolHolder {
public class ConnectPoolHolder {
static int maxConnect = 256;
static Map<String, NettyConnectPool> connectPoolMap = new ConcurrentHashMap<>();
static Map<String, SimpleConnectPool> connectPoolMap = new ConcurrentHashMap<>();
private static NettyConnectPool initPool(InetSocketAddress address,
long timeout, EventLoopGroup worker,
ChannelPoolHandler handler) {
return new NettyConnectPool(address, maxConnect, timeout, worker, NioSocketChannel.class, handler);
private static SimpleConnectPool initPool(InetSocketAddress address,
long timeout, EventLoopGroup worker,
ChannelPoolHandler handler) {
return new SimpleConnectPool(address, maxConnect, timeout, worker, NioSocketChannel.class, handler);
}
private static String getKey(InetSocketAddress address) {
@ -58,7 +58,7 @@ public class NettyConnectPoolHolder {
* @param address the InetSocketAddress
* @param pool This parameter applies only to the connection pool of netty
*/
public static void createPool(InetSocketAddress address, NettyConnectPool pool) {
public static void createPool(InetSocketAddress address, SimpleConnectPool pool) {
connectPoolMap.put(getKey(address), pool);
}
@ -68,7 +68,7 @@ public class NettyConnectPoolHolder {
* @param address the InetSocketAddress
* @return Map to the connection pool
*/
public static NettyConnectPool getPool(InetSocketAddress address) {
public static SimpleConnectPool getPool(InetSocketAddress address) {
return connectPoolMap.get(getKey(address));
}
@ -82,13 +82,13 @@ public class NettyConnectPoolHolder {
* @param handler the chandler for netty
* @return Map to the connection pool
*/
public static synchronized NettyConnectPool getPool(InetSocketAddress address,
long timeout, EventLoopGroup worker,
ChannelPoolHandler handler) {
public static synchronized SimpleConnectPool getPool(InetSocketAddress address,
long timeout, EventLoopGroup worker,
ChannelPoolHandler handler) {
/*
* this cannot use the computeIfAbsent method directly here because put is already used in init. Details refer to https://bugs.openjdk.java.net/browse/JDK-8062841
*/
NettyConnectPool pool = getPool(address);
SimpleConnectPool pool = getPool(address);
return pool == null ? initPool(address, timeout, worker, handler) : pool;
}

@ -15,28 +15,34 @@
* limitations under the License.
*/
package cn.hippo4j.rpc.handler;
package cn.hippo4j.rpc.connection;
import cn.hippo4j.rpc.discovery.ServerPort;
import java.io.Closeable;
/**
* This applies to server-side connections
* <p>
* Represents a network request connection and provides IO layer support<br>
* <p>
* This is not a strict and stateless Connection interface, it contains the necessary
* operations that should be done in the connection. It is more like integrating the
* connection and the connection channel together, so creating {@link Connection} is
* connection and the connection channel together, so creating {@link ServerConnection} is
* very resource intensive, for which caching is recommended
*
* @since 2.0.0
*/
public interface Connection extends Closeable {
public interface ServerConnection extends Closeable {
/**
* Bind ports and process them
*/
void bind(ServerPort port);
/**
* Gets the state of the connection, which is interpreted differently by different programs<br>
* <p>
* Client: Active connection indicates that a connection is being maintained with the server.
* Inactive connection indicates that no connection is being established with the server<br>
* <p>
* Server: The active connection indicates that the server has been started, is receiving ports,
* and can obtain requests at any time. The inactive connection indicates that the server has been
* shut down and the ports have been released

@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.connection;
import cn.hippo4j.rpc.exception.ConnectionException;
import cn.hippo4j.rpc.exception.TimeOutException;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import cn.hippo4j.rpc.support.ResultHolder;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.pool.ChannelPoolHandler;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.concurrent.locks.LockSupport;
/**
* Client implemented using netty
*
* @since 2.0.0
*/
@Slf4j
public class SimpleClientConnection implements ClientConnection {
InetSocketAddress address;
/**
* Obtain the connection timeout period. The default value is 30s
*/
long timeout = 30000L;
EventLoopGroup worker = new NioEventLoopGroup();
SimpleConnectPool connectionPool;
static final String TIME_OUT_MSG = "Timeout waiting for server-side response";
public SimpleClientConnection(InetSocketAddress address,
ChannelPoolHandler handler) {
this.address = address;
this.connectionPool = ConnectPoolHolder.getPool(address, timeout, worker, handler);
}
@Override
public <R> R connect(Request request) {
Channel channel = connectionPool.acquire(timeout);
try {
channel.writeAndFlush(request);
return wait(request.getRID());
} finally {
connectionPool.release(channel);
}
}
/**
* wait the Response
*
* @param requestId RID
* @return Response
*/
@SuppressWarnings("unchecked")
public <R> R wait(String requestId) {
Response response;
if (log.isDebugEnabled()) {
log.debug("Call successful, target address is {}:{}, request key is {}", address.getHostName(), address.getPort(), requestId);
}
// Wait for execution to complete
ResultHolder.putThread(requestId, Thread.currentThread());
LockSupport.parkNanos(timeout() * 1000000);
response = ResultHolder.get(requestId);
if (response == null) {
throw new TimeOutException(TIME_OUT_MSG);
}
if (response.isErr()) {
throw new ConnectionException(response.getErrMsg());
}
if (log.isDebugEnabled()) {
log.debug("The response from {}:{} was received successfully with the response key {}.", address.getHostName(), address.getPort(), requestId);
}
return (R) response.getObj();
}
@Override
public long timeout() {
return timeout;
}
@Override
public void setTimeout(long timeout) {
this.timeout = timeout;
}
@Override
public void close() {
worker.shutdownGracefully();
connectionPool.close();
}
}

@ -15,8 +15,9 @@
* limitations under the License.
*/
package cn.hippo4j.rpc.support;
package cn.hippo4j.rpc.connection;
import cn.hippo4j.rpc.client.ClientSupport;
import cn.hippo4j.rpc.exception.ConnectionException;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
@ -39,7 +40,7 @@ import java.util.concurrent.TimeUnit;
* @since 2.0.0
*/
@Slf4j
public class NettyConnectPool {
public class SimpleConnectPool {
ChannelHealthChecker healthCheck = ChannelHealthChecker.ACTIVE;
FixedChannelPool.AcquireTimeoutAction acquireTimeoutAction = FixedChannelPool.AcquireTimeoutAction.NEW;
@ -48,10 +49,10 @@ public class NettyConnectPool {
ChannelPool pool;
InetSocketAddress address;
public NettyConnectPool(InetSocketAddress address, int maxConnect,
long timeout, EventLoopGroup worker,
Class<? extends Channel> socketChannelCls,
ChannelPoolHandler handler) {
public SimpleConnectPool(InetSocketAddress address, int maxConnect,
long timeout, EventLoopGroup worker,
Class<? extends Channel> socketChannelCls,
ChannelPoolHandler handler) {
Bootstrap bootstrap = new Bootstrap()
.group(worker)
.channel(socketChannelCls)
@ -64,7 +65,7 @@ public class NettyConnectPool {
if (log.isDebugEnabled()) {
log.info("The connection pool is established with the connection target {}:{}", address.getHostName(), address.getPort());
}
NettyConnectPoolHolder.createPool(address, this);
ConnectPoolHolder.createPool(address, this);
}
public Channel acquire(long timeoutMillis) {
@ -72,7 +73,7 @@ public class NettyConnectPool {
Future<Channel> fch = pool.acquire();
return fch.get(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (Exception e) {
NettyClientSupport.closeClient(address);
ClientSupport.closeClient(address);
throw new ConnectionException("Failed to get the connection", e);
}
}
@ -81,7 +82,7 @@ public class NettyConnectPool {
try {
return pool.acquire();
} catch (Exception e) {
NettyClientSupport.closeClient(address);
ClientSupport.closeClient(address);
throw new ConnectionException("Failed to get the connection", e);
}
}
@ -92,7 +93,7 @@ public class NettyConnectPool {
try {
pool.release(channel);
} catch (Exception e) {
NettyClientSupport.closeClient(address);
ClientSupport.closeClient(address);
throw new ConnectionException("Failed to release the connection", e);
}
});
@ -101,9 +102,9 @@ public class NettyConnectPool {
public void close() {
try {
pool.close();
NettyConnectPoolHolder.remove(address);
ConnectPoolHolder.remove(address);
} catch (Exception e) {
NettyClientSupport.closeClient(address);
ClientSupport.closeClient(address);
throw new ConnectionException("Failed to close the connection pool", e);
}
}

@ -15,13 +15,12 @@
* limitations under the License.
*/
package cn.hippo4j.rpc.server;
package cn.hippo4j.rpc.connection;
import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.rpc.coder.NettyEncoder;
import cn.hippo4j.rpc.coder.ObjectEncoder;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.exception.ConnectionException;
import cn.hippo4j.rpc.handler.AbstractNettyHandlerManager;
import cn.hippo4j.rpc.handler.AbstractHandlerManager;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@ -48,7 +47,7 @@ import java.util.List;
* @since 2.0.0
*/
@Slf4j
public class NettyServerConnection extends AbstractNettyHandlerManager implements ServerConnection {
public class SimpleServerConnection extends AbstractHandlerManager implements ServerConnection {
ServerPort port;
EventLoopGroup leader;
@ -56,32 +55,30 @@ public class NettyServerConnection extends AbstractNettyHandlerManager implement
Class<? extends ServerChannel> socketChannelCls = NioServerSocketChannel.class;
ChannelFuture future;
Channel channel;
private final int maxPortNum = 65535;
private static final int MAX_PORT_NUM = 65535;
public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, List<ChannelHandler> handlers) {
public SimpleServerConnection(EventLoopGroup leader, EventLoopGroup worker, List<ChannelHandler> handlers) {
super(handlers);
Assert.notNull(leader);
Assert.notNull(worker);
this.leader = leader;
this.worker = worker;
}
public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, ChannelHandler... handlers) {
public SimpleServerConnection(EventLoopGroup leader, EventLoopGroup worker, ChannelHandler... handlers) {
this(leader, worker, (handlers != null ? Arrays.asList(handlers) : Collections.emptyList()));
}
public NettyServerConnection(ChannelHandler... handlers) {
public SimpleServerConnection(ChannelHandler... handlers) {
this(handlers != null ? Arrays.asList(handlers) : Collections.emptyList());
}
public NettyServerConnection(List<ChannelHandler> handlers) {
public SimpleServerConnection(List<ChannelHandler> handlers) {
this(new NioEventLoopGroup(), new NioEventLoopGroup(), handlers);
}
@Override
public void bind(ServerPort port) {
int serverPort = port.getPort();
if (serverPort < 0 || serverPort > maxPortNum) {
if (serverPort < 0 || serverPort > MAX_PORT_NUM) {
throw new ConnectionException("The port number " + serverPort + " is outside 0~65535, which is not a legal port number");
}
ServerBootstrap server = new ServerBootstrap();
@ -93,7 +90,7 @@ public class NettyServerConnection extends AbstractNettyHandlerManager implement
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new NettyEncoder());
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
handlerEntities.stream()
.sorted()
@ -106,18 +103,12 @@ public class NettyServerConnection extends AbstractNettyHandlerManager implement
});
}
});
try {
this.future = server.bind(serverPort).sync();
this.channel = this.future.channel();
if (log.isDebugEnabled()) {
log.debug("The server is started and can receive requests. The listening port is {}", serverPort);
}
this.port = port;
this.future.channel().closeFuture().sync();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new ConnectionException("Listening port failed, Please check whether the port is occupied", ex);
this.future = server.bind(serverPort);
this.channel = this.future.channel();
if (log.isDebugEnabled()) {
log.debug("The server is started and can receive requests. The listening port is {}", serverPort);
}
this.port = port;
}
@Override
@ -143,13 +134,13 @@ public class NettyServerConnection extends AbstractNettyHandlerManager implement
}
@Override
public NettyServerConnection addLast(String name, ChannelHandler handler) {
public SimpleServerConnection addLast(String name, ChannelHandler handler) {
super.addLast(name, handler);
return this;
}
@Override
public NettyServerConnection addFirst(String name, ChannelHandler handler) {
public SimpleServerConnection addFirst(String name, ChannelHandler handler) {
super.addFirst(name, handler);
return this;
}

@ -1,76 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.discovery;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* the registration center for Client and Server
*
* @since 2.0.0
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class ClassRegistry {
private static final Map<String, Class<?>> SERVER_REGISTER = new ConcurrentHashMap<>();
/**
* get a Obj in Registry center <br>
*
* @param s key
* @return t element
*/
public static Class<?> get(String s) {
return SERVER_REGISTER.get(s);
}
/**
* add the element to Registry Table <br>
* if the key already exists, failure, and return before the value of the key. <br>
* if success return the element
*
* @param s key
* @param cls element
* @return final mapped value
*/
public static Class<?> set(String s, Class<?> cls) {
return SERVER_REGISTER.putIfAbsent(s, cls);
}
/**
* add the element to Registry Table <br>
* if the key already exists, failure, replace it
*
* @param s key
* @param cls element
*/
public static Class<?> put(String s, Class<?> cls) {
return SERVER_REGISTER.put(s, cls);
}
/**
* clear
*/
public static void clear() {
SERVER_REGISTER.clear();
}
}

@ -1,53 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.discovery;
import cn.hippo4j.common.toolkit.ReflectUtil;
import java.util.Iterator;
import java.util.ServiceLoader;
/**
* You simply create an instance of a class based on its name and specific type.
* Load through the ServiceLoader first. If the load fails, load directly through the instantiation.
* If it is an interface, throw an exception. This is not elegant implementation
*
* @since 2.0.0
*/
public class DefaultInstance implements Instance {
@Override
public Object getInstance(Class<?> cls) {
ServiceLoader<?> load = ServiceLoader.load(cls);
Iterator<?> iterator = load.iterator();
if (iterator.hasNext()) {
return iterator.next();
}
return ReflectUtil.createInstance(cls);
}
@Override
public Object getInstance(String name) {
try {
Class<?> cls = Class.forName(name);
return getInstance(cls);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
}

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.exception;
/**
* HandlerNotFoundException occurs when no executable handler can be found
*
* @since 2.0.0
*/
public class HandlerNotFoundException extends RuntimeException {
private static final long serialVersionUID = 8247610319171014183L;
public HandlerNotFoundException() {
super();
}
public HandlerNotFoundException(String message) {
super(message);
}
public HandlerNotFoundException(Throwable e) {
super(e.getMessage(), e);
}
public HandlerNotFoundException(String message, Throwable throwable) {
super(message, throwable);
}
public HandlerNotFoundException(String message, Throwable throwable, boolean enableSuppression, boolean writableStackTrace) {
super(message, throwable, enableSuppression, writableStackTrace);
}
}

@ -15,30 +15,35 @@
* limitations under the License.
*/
package cn.hippo4j.rpc.discovery;
package cn.hippo4j.rpc.exception;
/**
* Instance interface to get an instance
* a generic operational exception
*
* @since 2.0.0
*/
public interface Instance {
/**
* get a instance
*
* @param cls Class object
* @return Information about instances created or found
*/
Object getInstance(Class<?> cls);
/**
* Gets an instance of a class with a recognizable identity,
* which can be the fully qualified name of class. It can also be a unique name in a container
*
* @param name Identifying name
* @return Information about instances created or found
*/
Object getInstance(String name);
public class OperationException extends RuntimeException {
private static final long serialVersionUID = 8247610319171014183L;
public OperationException() {
super();
}
public OperationException(String message) {
super(message);
}
public OperationException(Throwable e) {
super(e.getMessage(), e);
}
public OperationException(String message, Throwable throwable) {
super(message, throwable);
}
public OperationException(String message, Throwable throwable, boolean enableSuppression, boolean writableStackTrace) {
super(message, throwable, enableSuppression, writableStackTrace);
}
}

@ -17,7 +17,6 @@
package cn.hippo4j.rpc.handler;
import cn.hippo4j.common.toolkit.Assert;
import io.netty.channel.ChannelHandler;
import java.util.Arrays;
@ -33,7 +32,7 @@ import java.util.stream.Collectors;
*
* @since 2.0.0
*/
public abstract class AbstractNettyHandlerManager implements HandlerManager<ChannelHandler> {
public abstract class AbstractHandlerManager implements HandlerManager<ChannelHandler> {
protected final List<HandlerEntity<ChannelHandler>> handlerEntities;
@ -41,19 +40,18 @@ public abstract class AbstractNettyHandlerManager implements HandlerManager<Chan
AtomicLong lastIndex = new AtomicLong(0);
protected AbstractNettyHandlerManager(List<ChannelHandler> handlerEntities) {
Assert.notNull(handlerEntities);
protected AbstractHandlerManager(List<ChannelHandler> handlerEntities) {
this.handlerEntities = handlerEntities.stream()
.filter(Objects::nonNull)
.map(c -> getHandlerEntity(lastIndex.getAndIncrement(), c, null))
.collect(Collectors.toList());
}
protected AbstractNettyHandlerManager(ChannelHandler... handlerEntities) {
protected AbstractHandlerManager(ChannelHandler... handlerEntities) {
this(handlerEntities != null ? Arrays.asList(handlerEntities) : Collections.emptyList());
}
protected AbstractNettyHandlerManager() {
protected AbstractHandlerManager() {
this.handlerEntities = new LinkedList<>();
}
@ -69,8 +67,7 @@ public abstract class AbstractNettyHandlerManager implements HandlerManager<Chan
* @param handler handler
* @return NettyHandlerManager
*/
public AbstractNettyHandlerManager addLast(String name, ChannelHandler handler) {
Assert.notNull(handler);
public AbstractHandlerManager addLast(String name, ChannelHandler handler) {
this.handlerEntities.add(getHandlerEntity(lastIndex.getAndIncrement(), handler, name));
return this;
}
@ -82,8 +79,7 @@ public abstract class AbstractNettyHandlerManager implements HandlerManager<Chan
* @param handler handler
* @return NettyHandlerManager
*/
public AbstractNettyHandlerManager addFirst(String name, ChannelHandler handler) {
Assert.notNull(handler);
public AbstractHandlerManager addFirst(String name, ChannelHandler handler) {
this.handlerEntities.add(getHandlerEntity(firstIndex.getAndIncrement(), handler, name));
return this;
}

@ -17,12 +17,12 @@
package cn.hippo4j.rpc.handler;
import cn.hippo4j.rpc.exception.ConnectionException;
import cn.hippo4j.rpc.model.Response;
import cn.hippo4j.rpc.support.ResultHolder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import java.util.Optional;
@ -31,7 +31,8 @@ import java.util.Optional;
*
* @since 2.0.0
*/
public abstract class AbstractNettyTakeHandler extends ChannelInboundHandlerAdapter implements ConnectHandler {
@Slf4j
abstract class AbstractTakeHandler extends ChannelInboundHandlerAdapter implements ConnectHandler {
/**
* Manual disconnection is used here in case the server and client are disconnected due to a sudden exception
@ -40,15 +41,16 @@ public abstract class AbstractNettyTakeHandler extends ChannelInboundHandlerAdap
* @param cause the throwable
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
Channel channel = ctx.channel();
if (channel.isActive()) {
ctx.close();
}
Optional.ofNullable(cause)
.ifPresent(t -> {
throw new ConnectionException(cause);
if (log.isWarnEnabled()) {
log.warn(cause.getMessage());
}
});
}
@ -59,8 +61,8 @@ public abstract class AbstractNettyTakeHandler extends ChannelInboundHandlerAdap
*/
@Override
public void handler(Response response) {
ResultHolder.put(response.getKey(), response);
ResultHolder.wake(response.getKey());
ResultHolder.put(response.getRID(), response);
ResultHolder.wake(response.getRID());
}
}

@ -17,7 +17,7 @@
package cn.hippo4j.rpc.handler;
import cn.hippo4j.rpc.coder.NettyEncoder;
import cn.hippo4j.rpc.coder.ObjectEncoder;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
@ -36,28 +36,28 @@ import java.util.List;
* @since 2.0.0
*/
@Slf4j
public class NettyClientPoolHandler extends AbstractNettyHandlerManager implements ChannelPoolHandler {
public class ClientPoolHandler extends AbstractHandlerManager implements ChannelPoolHandler {
public NettyClientPoolHandler(List<ChannelHandler> handlers) {
public ClientPoolHandler(List<ChannelHandler> handlers) {
super(handlers);
}
public NettyClientPoolHandler(ChannelHandler... handlers) {
public ClientPoolHandler(ChannelHandler... handlers) {
super(handlers);
}
public NettyClientPoolHandler() {
public ClientPoolHandler() {
super();
}
@Override
public NettyClientPoolHandler addLast(String name, ChannelHandler handler) {
public ClientPoolHandler addLast(String name, ChannelHandler handler) {
super.addLast(name, handler);
return this;
}
@Override
public NettyClientPoolHandler addFirst(String name, ChannelHandler handler) {
public ClientPoolHandler addFirst(String name, ChannelHandler handler) {
super.addFirst(name, handler);
return this;
}
@ -81,7 +81,7 @@ public class NettyClientPoolHandler extends AbstractNettyHandlerManager implemen
channel.config()
.setTcpNoDelay(false);
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new NettyEncoder());
pipeline.addLast(new ObjectEncoder());
pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
this.handlerEntities.stream()
.sorted()

@ -17,6 +17,7 @@
package cn.hippo4j.rpc.handler;
import cn.hippo4j.rpc.exception.ConnectionException;
import cn.hippo4j.rpc.model.Response;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
@ -27,17 +28,20 @@ import io.netty.channel.ChannelHandlerContext;
* @since 2.0.0
*/
@ChannelHandler.Sharable
public class NettyClientTakeHandler extends AbstractNettyTakeHandler implements ConnectHandler {
public class ClientTakeHandler extends AbstractTakeHandler implements ConnectHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
Response response = (Response) msg;
handler(response);
ctx.flush();
if (msg instanceof Response) {
Response response = (Response) msg;
handler(response);
ctx.flush();
} else {
ctx.fireChannelRead(msg);
}
} catch (Exception e) {
ctx.close();
throw e;
throw new ConnectionException(e);
}
}

@ -15,25 +15,29 @@
* limitations under the License.
*/
package cn.hippo4j.rpc.discovery;
package cn.hippo4j.rpc.handler;
import cn.hippo4j.rpc.exception.HandlerNotFoundException;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
/**
* Adapter Spring, The requested object is managed by spring
* The final handler, which returned an exception because no usable handler could be found
*
* @since 2.0.0
*/
public class SpringContextInstance implements Instance {
@ChannelHandler.Sharable
public class ErrorClientHandler extends AbstractTakeHandler implements ConnectHandler {
@Override
public Object getInstance(Class<?> cls) {
// return ApplicationContextHolder.getBean(cls);
return null;
}
private static final String ERR_MSG = "no handler found that matches the request";
@Override
public Object getInstance(String name) {
// return ApplicationContextHolder.getInstance().getBean(name);
return null;
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
throw new HandlerNotFoundException(ERR_MSG);
} finally {
ctx.close();
}
}
}
}

@ -17,56 +17,33 @@
package cn.hippo4j.rpc.handler;
import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.common.toolkit.ReflectUtil;
import cn.hippo4j.rpc.discovery.ClassRegistry;
import cn.hippo4j.rpc.discovery.Instance;
import cn.hippo4j.rpc.exception.HandlerNotFoundException;
import cn.hippo4j.rpc.model.DefaultRequest;
import cn.hippo4j.rpc.model.DefaultResponse;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import java.lang.reflect.Method;
/**
* netty adaptation layer
* The final handler, which returned an exception because no usable handler could be found
*
* @since 2.0.0
*/
@ChannelHandler.Sharable
public class NettyServerTakeHandler extends AbstractNettyTakeHandler implements ConnectHandler {
Instance instance;
public class ErrorServerHandler extends AbstractTakeHandler {
public NettyServerTakeHandler(Instance instance) {
this.instance = instance;
}
private static final String ERR_MSG = "no handler found that matches the request";
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (!(msg instanceof Request)) {
return;
HandlerNotFoundException exception = new HandlerNotFoundException(ERR_MSG);
if (!(msg instanceof DefaultRequest)) {
ctx.close();
throw exception;
}
Request request = (Request) msg;
Response response = sendHandler(request);
Response response = new DefaultResponse(request.getRID(), ERR_MSG);
ctx.writeAndFlush(response);
}
@Override
public Response sendHandler(Request request) {
Response response;
try {
Class<?> cls = ClassRegistry.get(request.getClassName());
Method method = ReflectUtil.getMethodByName(cls, request.getMethodName(), request.getParameterTypes());
Assert.notNull(method);
Object invoke = ReflectUtil.invoke(instance.getInstance(cls), method, request.getParameters());
response = new DefaultResponse(request.getKey(), invoke.getClass(), invoke);
return response;
} catch (Exception e) {
response = new DefaultResponse(request.getKey(), e, e.getMessage());
return response;
}
}
}

@ -17,6 +17,7 @@
package cn.hippo4j.rpc.handler;
import cn.hippo4j.rpc.exception.OperationException;
import io.netty.channel.ChannelHandler;
import lombok.AllArgsConstructor;
import lombok.Getter;
@ -65,7 +66,7 @@ public interface HandlerManager<T> {
boolean b = cls.isAnnotationPresent(ChannelHandler.Sharable.class)
|| HandlerManager.class.isAssignableFrom(cls);
if (!b) {
throw new RuntimeException("Join the execution of the handler must add io.netty.channel.ChannelHandler."
throw new OperationException("Join the execution of the handler must add io.netty.channel.ChannelHandler."
+ "Sharable annotations, Please for the handler class " + cls.getName() + " add io.netty.channel."
+ "ChannelHandler.Sharable annotation");
}

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.handler;
import cn.hippo4j.rpc.exception.HandlerNotFoundException;
import cn.hippo4j.rpc.model.DefaultResponse;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import io.netty.channel.ChannelHandler;
import lombok.RequiredArgsConstructor;
import java.util.function.Supplier;
/**
* This is a processor that does not support parameters but can get the return value. <br>
* Even if the parameters passed by the user will not be recognized, it will even become an error
*
* @since 2.0.0
*/
@RequiredArgsConstructor
@ChannelHandler.Sharable
public class ServerBareTakeHandler<T> extends ServerHandler {
final String name;
final Supplier<T> fun;
@Override
String getName() {
return name;
}
@Override
public Response sendHandler(Request request) {
try {
Object[] parameters = request.getParameters();
if (parameters.length != 0) {
throw new HandlerNotFoundException("no handler found that matches the pair " + name + " and function");
}
T t = fun.get();
return new DefaultResponse(request.getRID(), t);
} catch (Exception e) {
return new DefaultResponse(request.getRID(), e.getMessage());
}
}
}

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.handler;
import cn.hippo4j.rpc.exception.HandlerNotFoundException;
import cn.hippo4j.rpc.model.DefaultRequest;
import cn.hippo4j.rpc.model.DefaultResponse;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import io.netty.channel.ChannelHandler;
import lombok.RequiredArgsConstructor;
import java.util.function.BiFunction;
/**
* netty adaptation layer about {@link DefaultRequest}<br><br>
* Parse the parameters in the request to execute the corresponding method. <br>
* This is a relatively flexible processor at present, but there are still great defects. <br>
* <br>For example:<br>
* <ul>
* <li>This handler only supports requests with two parameters, it will not work if the number of parameters does not match</li>
* <li>If you want to pass multiple parameters please wrap them, or customize the processor</li>
* <li>This processor does not consider whether the types match when parsing parameters, and an error occurs if the conversion fails</li>
* </ul>
*
* @since 2.0.0
*/
@ChannelHandler.Sharable
@RequiredArgsConstructor
public class ServerBiTakeHandler<T, P, K> extends ServerHandler {
final String name;
final BiFunction<T, P, K> fun;
@Override
String getName() {
return name;
}
@Override
@SuppressWarnings("unchecked")
public Response sendHandler(Request request) {
try {
Object[] parameters = request.getParameters();
if (parameters.length != 2) {
throw new HandlerNotFoundException("no handler found that matches the pair " + name + " and function");
}
T t = (T) parameters[0];
P p = (P) parameters[1];
K r = fun.apply(t, p);
return new DefaultResponse(request.getRID(), r);
} catch (Exception e) {
return new DefaultResponse(request.getRID(), e.getMessage());
}
}
}

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.handler;
import cn.hippo4j.rpc.model.DefaultRequest;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import io.netty.channel.ChannelHandlerContext;
import java.util.Objects;
/**
* The handler located on the server side provides unified operations for the server side
*
* @since 2.0.0
*/
abstract class ServerHandler extends AbstractTakeHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (!(msg instanceof DefaultRequest)) {
ctx.fireChannelRead(msg);
return;
}
Request request = (Request) msg;
if (!Objects.equals(request.getKey(), getName())) {
ctx.fireChannelRead(msg);
return;
}
Response response = sendHandler(request);
ctx.writeAndFlush(response);
}
/**
* Get the name of the current handler
*
* @return name
*/
abstract String getName();
}

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.handler;
import cn.hippo4j.rpc.exception.HandlerNotFoundException;
import cn.hippo4j.rpc.model.DefaultRequest;
import cn.hippo4j.rpc.model.DefaultResponse;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import io.netty.channel.ChannelHandler;
import lombok.RequiredArgsConstructor;
import java.util.function.Function;
/**
* netty adaptation layer about {@link DefaultRequest}<br><br>
* Parse the parameters in the request to execute the corresponding method. <br>
* This is a relatively flexible processor at present, but there are still great defects. <br>
* <br>For example:<br>
* <ul>
* <li>This handler only supports requests with one parameters, it will not work if the number of parameters does not match</li>
* <li>If you want to pass multiple parameters please wrap them, or customize the processor</li>
* <li>This processor does not consider whether the types match when parsing parameters, and an error occurs if the conversion fails</li>
* </ul>
*
* @since 2.0.0
*/
@ChannelHandler.Sharable
@RequiredArgsConstructor
public class ServerTakeHandler<T, R> extends ServerHandler {
final String name;
final Function<T, R> fun;
@Override
String getName() {
return name;
}
@Override
@SuppressWarnings("unchecked")
public Response sendHandler(Request request) {
try {
Object[] parameters = request.getParameters();
if (parameters.length != 1) {
throw new HandlerNotFoundException("no handler found that matches the pair " + name + " and function");
}
T t = (T) parameters[0];
R r = fun.apply(t);
return new DefaultResponse(request.getRID(), r);
} catch (Exception e) {
return new DefaultResponse(request.getRID(), e.getMessage());
}
}
}

@ -30,38 +30,30 @@ import java.util.Objects;
*/
public final class DefaultRequest implements Request {
String RID;
String key;
String className;
String methodName;
Class<?>[] parameterTypes;
int length;
transient Object[] parameters;
public DefaultRequest(String key, String className, String methodName, Class<?>[] parameterTypes, Object[] parameters) {
public DefaultRequest(String RID, String key, Object[] parameters) {
this.RID = RID;
this.key = key;
this.className = className;
this.methodName = methodName;
this.parameterTypes = parameterTypes;
this.parameters = parameters;
this.length = parameters.length;
}
@Override
public String getKey() {
return key;
}
@Override
public String getClassName() {
return className;
public DefaultRequest(String RID, String key) {
this(RID, key, new Object[]{});
}
@Override
public String getMethodName() {
return methodName;
public String getRID() {
return RID;
}
@Override
public Class<?>[] getParameterTypes() {
return parameterTypes;
public String getKey() {
return key;
}
@Override
@ -78,14 +70,12 @@ public final class DefaultRequest implements Request {
return false;
}
DefaultRequest that = (DefaultRequest) o;
return Objects.equals(key, that.key)
&& Objects.equals(className, that.className)
&& Objects.equals(methodName, that.methodName);
return Objects.equals(key, that.key) && Objects.equals(RID, that.RID);
}
@Override
public int hashCode() {
return Objects.hash(key, className, methodName);
return Objects.hash(key, RID);
}
/**
@ -111,11 +101,7 @@ public final class DefaultRequest implements Request {
*/
private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
s.defaultReadObject();
if (parameterTypes == null) {
return;
}
// Deserialization parameters
int length = parameterTypes.length;
Object[] a = new Object[length];
for (int i = 0; i < length; i++) {
a[i] = s.readObject();

@ -30,36 +30,27 @@ import java.util.Objects;
*/
public class DefaultResponse implements Response {
String key;
Class<?> cls;
String RID;
transient Object obj;
Throwable throwable;
String errMsg;
public DefaultResponse(String key, Class<?> cls, Object obj, Throwable throwable, String errMsg) {
this.key = key;
this.cls = cls;
public DefaultResponse(String RID, Object obj, String errMsg) {
this.RID = RID;
this.obj = obj;
this.throwable = throwable;
this.errMsg = errMsg;
}
public DefaultResponse(String key, Throwable throwable, String errMsg) {
this(key, null, null, throwable, errMsg);
public DefaultResponse(String RID, String errMsg) {
this(RID, null, errMsg);
}
public DefaultResponse(String key, Class<?> cls, Object obj) {
this(key, cls, obj, null, null);
public DefaultResponse(String RID, Object obj) {
this(RID, obj, null);
}
@Override
public String getKey() {
return key;
}
@Override
public Class<?> getCls() {
return cls;
public String getRID() {
return RID;
}
@Override
@ -67,11 +58,6 @@ public class DefaultResponse implements Response {
return obj;
}
@Override
public Throwable getThrowable() {
return throwable;
}
@Override
public String getErrMsg() {
return errMsg;
@ -79,7 +65,7 @@ public class DefaultResponse implements Response {
@Override
public boolean isErr() {
return throwable != null || errMsg != null;
return errMsg != null;
}
@Override
@ -91,12 +77,12 @@ public class DefaultResponse implements Response {
return false;
}
DefaultResponse that = (DefaultResponse) o;
return Objects.equals(key, that.key) && Objects.equals(cls, that.cls);
return Objects.equals(RID, that.RID);
}
@Override
public int hashCode() {
return Objects.hash(key, cls);
return Objects.hash(RID);
}
/**

@ -27,24 +27,14 @@ import java.io.Serializable;
public interface Request extends Serializable {
/**
* The unique identity of the current request
*/
String getKey();
/**
* The Class name of the current request
* the request id, Used to specify an executable handler
*/
String getClassName();
String getRID();
/**
* The Method name of the current request
*/
String getMethodName();
/**
* The parameter type of the current request
* The unique identity of the current request
*/
Class<?>[] getParameterTypes();
String getKey();
/**
* The parameters of the current request

@ -29,23 +29,13 @@ public interface Response extends Serializable {
/**
* The unique identity of the current Response
*/
String getKey();
/**
* The class of the current Response, The target of deserialization
*/
Class<?> getCls();
String getRID();
/**
* The results of this request can be obtained, The source of deserialization
*/
Object getObj();
/**
* The Throwable of the current Response
*/
Throwable getThrowable();
/**
* the error message
*/

@ -17,7 +17,7 @@
package cn.hippo4j.rpc.server;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.rpc.connection.ServerConnection;
import cn.hippo4j.rpc.discovery.ServerPort;
import java.io.IOException;
@ -36,10 +36,7 @@ public class RPCServer implements Server {
public RPCServer(ServerConnection serverConnection, ServerPort port) {
this.port = port;
this.serverConnection = serverConnection;
this.thread = ThreadUtil.newThread(
() -> serverConnection.bind(port),
"hippo4j-rpc-" + port.getPort(),
false);
this.thread = new Thread(() -> serverConnection.bind(port), "hippo4j-rpc-" + port.getPort());
}
/**
@ -61,7 +58,7 @@ public class RPCServer implements Server {
*/
@Override
public void close() throws IOException {
thread = null;
thread.interrupt();
serverConnection.close();
}
}

@ -15,22 +15,17 @@
* limitations under the License.
*/
package cn.hippo4j.rpc.support;
package cn.hippo4j.rpc.server;
import cn.hippo4j.rpc.discovery.ClassRegistry;
import cn.hippo4j.rpc.discovery.DefaultInstance;
import cn.hippo4j.rpc.client.ClientSupport;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.ErrorServerHandler;
import cn.hippo4j.rpc.handler.HandlerManager;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.rpc.server.NettyServerConnection;
import cn.hippo4j.rpc.server.RPCServer;
import cn.hippo4j.rpc.server.Server;
import cn.hippo4j.rpc.connection.ServerConnection;
import cn.hippo4j.rpc.connection.SimpleServerConnection;
import io.netty.channel.ChannelHandler;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* This is a server-side build class that allows you to quickly prepare data on the server side and start the server side.<br>
@ -38,17 +33,11 @@ import java.util.List;
* The composite pattern is adopted, which means that it is itself a server-side implementation, so it is stateless.
*
* @see RPCServer
* @see NettyServerConnection
* @see NettyClientSupport
* @see ServerConnection
* @see ClientSupport
* @since 2.0.0
*/
public class NettyServerSupport implements Server {
/**
* The interface that the server side can call,
* All the methods in the interface are brokered during initialization
*/
protected List<Class<?>> classes;
public class ServerSupport implements Server {
/**
* Extract the port number of the web container,
@ -63,20 +52,11 @@ public class NettyServerSupport implements Server {
protected Server server;
public NettyServerSupport(ServerPort serverPort, Class<?>... classes) {
this(serverPort, new NettyServerConnection(), classes);
}
public NettyServerSupport(ServerPort serverPort, List<Class<?>> classes) {
this(serverPort, new NettyServerConnection(), classes);
}
public NettyServerSupport(ServerPort serverPort, HandlerManager<ChannelHandler> handlerManager, Class<?>... classes) {
this(serverPort, handlerManager, classes != null ? Arrays.asList(classes) : Collections.emptyList());
public ServerSupport(ServerPort serverPort) {
this(serverPort, new SimpleServerConnection());
}
public NettyServerSupport(ServerPort serverPort, HandlerManager<ChannelHandler> handlerManager, List<Class<?>> classes) {
this.classes = classes;
public ServerSupport(ServerPort serverPort, HandlerManager<ChannelHandler> handlerManager) {
this.serverPort = serverPort;
this.handlerManager = handlerManager;
initServer();
@ -88,16 +68,11 @@ public class NettyServerSupport implements Server {
* If no processor is available, a default processor is provided
*/
protected void initServer() {
// Register the interface that can be invoked
classes.stream().filter(Class::isInterface)
.forEach(cls -> ClassRegistry.put(cls.getName(), cls));
NettyServerConnection connection = (handlerManager instanceof NettyServerConnection)
? (NettyServerConnection) handlerManager
: new NettyServerConnection();
SimpleServerConnection connection = (handlerManager instanceof SimpleServerConnection)
? (SimpleServerConnection) handlerManager
: new SimpleServerConnection();
// Assign a default handler if no handler exists
if (connection.isEmpty()) {
connection.addFirst(null, new NettyServerTakeHandler(new DefaultInstance()));
}
connection.addLast(null, new ErrorServerHandler());
server = new RPCServer(connection, serverPort);
}

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.support;
import cn.hippo4j.rpc.exception.ConnectionException;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import java.net.InetSocketAddress;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class AddressUtil {
private static final String HTTP = "http://";
private static final String HTTPS = "https://";
/**
* parsing hostname
*
* @param address address
* @return InetAddress
*/
public static InetSocketAddress getInetAddress(String address) {
if (address.startsWith(HTTP)) {
address = address.replaceFirst(HTTP, "");
}
if (address.startsWith(HTTPS)) {
address = address.replaceFirst(HTTPS, "");
}
String[] addressStr = address.split(":");
if (addressStr.length < 2) {
throw new ConnectionException("Failed to connect to the server because the IP address is invalid. Procedure");
}
return InetSocketAddress.createUnresolved(addressStr[0], Integer.parseInt(addressStr[1]));
}
}

@ -1,140 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.support;
import cn.hippo4j.common.toolkit.IdUtil;
import cn.hippo4j.rpc.client.Client;
import cn.hippo4j.rpc.exception.ConnectionException;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import cn.hippo4j.rpc.model.DefaultRequest;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Add a proxy for the request, {@link Proxy} and {@link InvocationHandler}
*
* @since 2.0.0
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class NettyProxyCenter {
// cache
static Map<String, Object> map = new ConcurrentHashMap<>();
/**
* A proxy object for PRC is obtained through an interface
*
* @param cls The interface type
* @param address address
* @param <T> Object type
* @param handler the pool handler for netty
* @return Proxy objects
*/
@SuppressWarnings("unchecked")
public static <T> T getProxy(Class<T> cls, InetSocketAddress address, NettyClientPoolHandler handler) {
Client client = NettyClientSupport.getClient(address, handler);
String s = address + cls.getName();
Object o = map.get(s);
if (o != null) {
return (T) o;
}
return createProxy(client, cls, address);
}
/**
* A proxy object for PRC is obtained through an interface
*
* @param cls The interface type
* @param address address String
* @param <T> Object type
* @return Proxy objects
*/
@SuppressWarnings("unchecked")
public static <T> T getProxy(Class<T> cls, String address) {
String[] addressStr = address.split(":");
if (addressStr.length < 2) {
throw new ConnectionException("Failed to connect to the server because the IP address is invalid. Procedure");
}
InetSocketAddress socketAddress = InetSocketAddress.createUnresolved(addressStr[0], Integer.parseInt(addressStr[1]));
String s = socketAddress + cls.getName();
Object o = map.get(s);
if (o != null) {
return (T) o;
}
Client client = NettyClientSupport.getClient(socketAddress);
return createProxy(client, cls, socketAddress);
}
/**
* remove proxy object
*
* @param cls the class
* @param address address String
*/
public static void removeProxy(Class<?> cls, String address) {
String[] addressStr = address.split(":");
if (addressStr.length < 2) {
throw new ConnectionException("Failed to connect to the server because the IP address is invalid. Procedure");
}
InetSocketAddress socketAddress = InetSocketAddress.createUnresolved(addressStr[0], Integer.parseInt(addressStr[1]));
String s = socketAddress + cls.getName();
NettyClientSupport.closeClient(socketAddress);
map.remove(s);
}
@SuppressWarnings("unchecked")
public static <T> T createProxy(Client client, Class<T> cls, InetSocketAddress address) {
boolean b = cls.isInterface();
if (!b) {
throw new RuntimeException(cls.getName() + "is not a Interface");
}
String s = address.toString() + cls.getName();
Object o = map.get(s);
if (o != null) {
return (T) o;
}
T obj = (T) Proxy.newProxyInstance(
cls.getClassLoader(),
new Class[]{cls},
(proxy, method, args) -> {
String clsName = cls.getName();
String methodName = method.getName();
String key = address + clsName + methodName + IdUtil.simpleUUID();
Class<?>[] parameterTypes = method.getParameterTypes();
Request request = new DefaultRequest(key, clsName, methodName, parameterTypes, args);
Response response = client.connection(request);
if (response == null) {
return null;
}
if (response.isErr()) {
throw new RuntimeException(response.getErrMsg(), response.getThrowable());
}
return response.getObj();
});
map.put(s, obj);
return obj;
}
}

@ -17,7 +17,8 @@
package cn.hippo4j.rpc.client;
import cn.hippo4j.common.toolkit.ThreadUtil;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class CallManager {
@ -25,13 +26,17 @@ public class CallManager {
return 1;
}
public int callTest(Integer a, Integer b) {
public int call(Integer a) {
return a;
}
public int call(Integer a, Integer b) {
return a + b;
}
public int callTestTimeout() {
// thread sleep for 10 seconds
ThreadUtil.sleep(10000);
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10L));
return 1;
}

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.client;
import cn.hippo4j.rpc.connection.ClientConnection;
import cn.hippo4j.rpc.connection.ServerConnection;
import cn.hippo4j.rpc.connection.SimpleClientConnection;
import cn.hippo4j.rpc.connection.SimpleServerConnection;
import cn.hippo4j.rpc.handler.ServerBareTakeHandler;
import cn.hippo4j.rpc.handler.ServerBiTakeHandler;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.ClientPoolHandler;
import cn.hippo4j.rpc.handler.ClientTakeHandler;
import cn.hippo4j.rpc.handler.ServerTakeHandler;
import cn.hippo4j.rpc.server.RPCServer;
import io.netty.channel.pool.ChannelPoolHandler;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class ClientSupportTest {
static ServerPort port = new TestServerPort();
static final String addressStr = "localhost";
static final String take = "serverTake";
static final String biTake = "biTake";
static final String bareTake = "bareTake";
static final String timeout = "timeout";
static RPCServer rpcServer;
@BeforeClass
public static void startServer() {
CallManager manager = new CallManager();
ServerTakeHandler<Integer, Integer> takeHandler = new ServerTakeHandler<>(biTake, manager::call);
ServerBiTakeHandler<Integer, Integer, Integer> biTakeHandler = new ServerBiTakeHandler<>(take, manager::call);
ServerBareTakeHandler<Integer> bareTakeHandler = new ServerBareTakeHandler<>(bareTake, manager::call);
ServerBareTakeHandler<Integer> timeoutHandler = new ServerBareTakeHandler<>(timeout, manager::callTestTimeout);
ServerConnection connection = new SimpleServerConnection(takeHandler, bareTakeHandler, biTakeHandler, timeoutHandler);
rpcServer = new RPCServer(connection, port);
rpcServer.bind();
while (!rpcServer.isActive()) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1L));
}
}
@AfterClass
public static void stopServer() throws IOException {
if (rpcServer.isActive()) {
rpcServer.close();
}
}
@Test
public void closeTest() throws IOException {
InetSocketAddress address = InetSocketAddress.createUnresolved(addressStr, port.getPort());
ChannelPoolHandler channelPoolHandler = new ClientPoolHandler(new ClientTakeHandler());
ClientConnection clientConnection = new SimpleClientConnection(address, channelPoolHandler);
RPCClient rpcClient = new RPCClient(clientConnection);
ClientSupport.closeClient(new InetSocketAddress(addressStr, port.getPort()));
rpcClient.close();
}
static class TestServerPort implements ServerPort {
int port = RandomPort.getSafeRandomPort();
@Override
public int getPort() {
return port;
}
}
}

@ -1,161 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.client;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.rpc.discovery.ClassRegistry;
import cn.hippo4j.rpc.discovery.DefaultInstance;
import cn.hippo4j.rpc.discovery.Instance;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.rpc.model.DefaultRequest;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import cn.hippo4j.rpc.server.NettyServerConnection;
import cn.hippo4j.rpc.server.RPCServer;
import cn.hippo4j.rpc.server.ServerConnection;
import io.netty.channel.pool.ChannelPoolHandler;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
public class RPCClientTest {
String host = "localhost";
ServerPort port = new TestServerPort();
ServerPort portTest = new TestPortServerPort();
/**
* This test case can be overridden under the handler and coder packages
*/
@Test
public void connection() throws IOException {
Class<CallManager> cls = CallManager.class;
String className = cls.getName();
ClassRegistry.put(className, cls);
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(connection, portTest);
rpcServer.bind();
while (!rpcServer.isActive()) {
ThreadUtil.sleep(100L);
}
InetSocketAddress address = InetSocketAddress.createUnresolved(host, portTest.getPort());
ChannelPoolHandler channelPoolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
NettyClientConnection clientConnection = new NettyClientConnection(address, channelPoolHandler);
RPCClient rpcClient = new RPCClient(clientConnection);
Class<?>[] classes = new Class<?>[2];
classes[0] = Integer.class;
classes[1] = Integer.class;
Object[] objects = new Object[2];
objects[0] = 1;
objects[1] = 2;
Request request = new DefaultRequest("127.0.0.18889", className, "callTest", classes, objects);
Response response = rpcClient.connection(request);
boolean active = rpcClient.isActive();
Assert.assertTrue(active);
Assert.assertEquals(response.getObj(), 3);
rpcClient.close();
rpcServer.close();
}
@Test
public void connectionTest() throws IOException {
Class<CallManager> cls = CallManager.class;
String className = cls.getName();
ClassRegistry.put(className, cls);
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(connection, port);
rpcServer.bind();
while (!rpcServer.isActive()) {
ThreadUtil.sleep(100L);
}
InetSocketAddress address = InetSocketAddress.createUnresolved(host, port.getPort());
ChannelPoolHandler channelPoolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
ClientConnection clientConnection = new NettyClientConnection(address, channelPoolHandler);
RPCClient rpcClient = new RPCClient(clientConnection);
Request request = new DefaultRequest("127.0.0.18888", className, "call", null, null);
for (int i = 0; i < 50; i++) {
Response response = rpcClient.connection(request);
boolean active = rpcClient.isActive();
Assert.assertTrue(active);
Assert.assertEquals(response.getObj(), 1);
}
rpcClient.close();
rpcServer.close();
}
@Test(expected = Exception.class)
public void responseNullExceptionTest() throws IOException {
Class<CallManager> cls = CallManager.class;
String className = cls.getName();
ClassRegistry.put(className, cls);
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(connection, port);
rpcServer.bind();
while (!rpcServer.isActive()) {
ThreadUtil.sleep(100L);
}
InetSocketAddress address = InetSocketAddress.createUnresolved(host, port.getPort());
ChannelPoolHandler channelPoolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
ClientConnection clientConnection = new NettyClientConnection(address, channelPoolHandler);
clientConnection.setTimeout(300L);
try (RPCClient rpcClient = new RPCClient(clientConnection)) {
Request request = new DefaultRequest("127.0.0.18888", className, "callTestTimeout", null, null);
Response response = rpcClient.connection(request);
Assert.assertNotNull(response.getErrMsg());
Assert.assertNotNull(response.getThrowable());
} catch (IOException e) {
// no something
} finally {
rpcServer.close();
}
}
static class TestServerPort implements ServerPort {
int port = RandomPort.getSafeRandomPort();
@Override
public int getPort() {
return port;
}
}
static class TestPortServerPort implements ServerPort {
int port = RandomPort.getSafeRandomPort();
@Override
public int getPort() {
return port;
}
}
}

@ -1,67 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.discovery;
import org.junit.Assert;
import org.junit.Test;
public class ClassRegistryTest {
@Test
public void get() {
String getStr = "GetModel";
Class<?> cls = ClassRegistry.get(getStr);
Assert.assertNull(cls);
ClassRegistry.put(getStr, GetModel.class);
Class<?> aClass = ClassRegistry.get(getStr);
Assert.assertNotNull(aClass);
ClassRegistry.clear();
}
@Test
public void set() {
String getStr = "GetModel";
ClassRegistry.set(getStr, GetModel.class);
Class<?> aClass = ClassRegistry.get(getStr);
Assert.assertEquals(aClass, GetModel.class);
ClassRegistry.set(getStr, SetModel.class);
Class<?> aClass1 = ClassRegistry.get(getStr);
Assert.assertEquals(aClass1, GetModel.class);
ClassRegistry.clear();
}
@Test
public void put() {
String getStr = "GetModel";
ClassRegistry.put(getStr, GetModel.class);
Class<?> aClass = ClassRegistry.get(getStr);
Assert.assertEquals(aClass, GetModel.class);
ClassRegistry.put(getStr, SetModel.class);
Class<?> aClass1 = ClassRegistry.get(getStr);
Assert.assertEquals(aClass1, SetModel.class);
ClassRegistry.clear();
}
public static class GetModel {
}
public static class SetModel {
}
}

@ -1,59 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.discovery;
import org.junit.Assert;
import org.junit.Test;
public class DefaultInstanceTest {
Instance instance = new DefaultInstance();
@Test
public void getInstance() {
Class<InstanceModel> cls = InstanceModel.class;
Object instanceInstance = instance.getInstance(cls);
Assert.assertNotNull(instanceInstance);
Assert.assertEquals(cls, instanceInstance.getClass());
}
@Test
public void testGetInstance() {
String className = "cn.hippo4j.rpc.discovery.InstanceModel";
Object instanceInstance = instance.getInstance(className);
Assert.assertNotNull(instanceInstance);
Assert.assertEquals(className, instanceInstance.getClass().getName());
}
@Test(expected = RuntimeException.class)
public void testGetInstanceTest() {
String className = "cn.hippo4j.rpc.discovery.InstanceModelTest";
Object instanceInstance = instance.getInstance(className);
Assert.assertNotNull(instanceInstance);
Assert.assertEquals(className, instanceInstance.getClass().getName());
}
@Test
public void getInstanceTest() {
Class<InstanceServerLoader> cls = InstanceServerLoader.class;
Object instanceInstance = instance.getInstance(cls);
Assert.assertNotNull(instanceInstance);
Assert.assertEquals(InstanceServerLoaderImpl.class, instanceInstance.getClass());
}
}

@ -1,34 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.discovery;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.springframework.boot.test.context.TestComponent;
@Setter
@Getter
@AllArgsConstructor
@NoArgsConstructor
@TestComponent
public class InstanceModel {
String name;
}

@ -1,24 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.discovery;
public interface InstanceServerLoader {
String getName();
}

@ -1,33 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.discovery;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class InstanceServerLoaderImpl implements InstanceServerLoader {
String name = "name";
}

@ -1,48 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.discovery;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* TODO Common module removes spring dependency leftovers
*/
// @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = {InstanceModel.class})
// @RunWith(SpringJUnit4ClassRunner.class)
public class SpringContextInstanceTest {
Instance instance = new SpringContextInstance();
// @Test
public void getInstance() {
Object obj = instance.getInstance(InstanceModel.class);
Assert.assertNotNull(obj);
Assert.assertEquals(obj.getClass(), InstanceModel.class);
}
// @Test
public void testGetInstance() {
Object obj = instance.getInstance("instanceModel");
Assert.assertNotNull(obj);
Assert.assertEquals(obj.getClass(), InstanceModel.class);
}
}

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.handler;
import cn.hippo4j.rpc.exception.OperationException;
import io.netty.channel.ChannelHandler;
import org.junit.Assert;
import org.junit.Test;
public class ClientPoolHandlerTest {
static final String test = "Test";
static final String test1 = "Test1";
@Test
public void testGetHandlerEntity() {
TestHandler handler = new TestHandler();
long order = 0;
String name = test;
ClientPoolHandler poolHandler = new ClientPoolHandler();
HandlerManager.HandlerEntity<ChannelHandler> entity = poolHandler.getHandlerEntity(order, handler, name);
Assert.assertEquals(entity.getName(), name);
Assert.assertEquals(entity.getOrder(), order);
Assert.assertEquals(entity.getHandler(), handler);
}
@Test
public void testCompareTo() {
TestHandler handler = new TestHandler();
long order = 0;
TestHandler handler1 = new TestHandler();
long order1 = 1;
ClientPoolHandler poolHandler = new ClientPoolHandler();
HandlerManager.HandlerEntity<ChannelHandler> entity = poolHandler.getHandlerEntity(order, handler, test);
HandlerManager.HandlerEntity<ChannelHandler> entity1 = poolHandler.getHandlerEntity(order1, handler1, test1);
int compare = entity.compareTo(entity1);
Assert.assertTrue(compare < 0);
}
@Test
public void addLast() {
ClientPoolHandler handler = new ClientPoolHandler();
Assert.assertTrue(handler.isEmpty());
handler.addLast(null, new TestHandler());
Assert.assertFalse(handler.isEmpty());
}
@Test
public void addFirst() {
ClientPoolHandler handler = new ClientPoolHandler();
Assert.assertTrue(handler.isEmpty());
handler.addFirst(null, new TestHandler());
Assert.assertFalse(handler.isEmpty());
}
@Test
public void testAddLast() {
ClientPoolHandler handler = new ClientPoolHandler();
Assert.assertTrue(handler.isEmpty());
handler.addLast(test, new TestHandler());
Assert.assertFalse(handler.isEmpty());
}
@Test
public void testAddFirst() {
ClientPoolHandler handler = new ClientPoolHandler();
Assert.assertTrue(handler.isEmpty());
handler.addFirst(test, new TestHandler());
Assert.assertFalse(handler.isEmpty());
}
@Test(expected = OperationException.class)
public void testGetHandlerEntityFalse() {
TestFalseHandler handler = new TestFalseHandler();
long order = 0;
ClientPoolHandler poolHandler = new ClientPoolHandler();
poolHandler.getHandlerEntity(order, handler, test);
}
}

@ -17,73 +17,70 @@
package cn.hippo4j.rpc.handler;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.rpc.client.NettyClientConnection;
import cn.hippo4j.rpc.client.RPCClient;
import cn.hippo4j.rpc.client.CallManager;
import cn.hippo4j.rpc.connection.ServerConnection;
import cn.hippo4j.rpc.client.RandomPort;
import cn.hippo4j.rpc.discovery.ClassRegistry;
import cn.hippo4j.rpc.discovery.DefaultInstance;
import cn.hippo4j.rpc.discovery.Instance;
import cn.hippo4j.rpc.discovery.InstanceServerLoader;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.model.DefaultRequest;
import cn.hippo4j.rpc.model.DefaultResponse;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import cn.hippo4j.rpc.server.NettyServerConnection;
import cn.hippo4j.rpc.connection.SimpleServerConnection;
import cn.hippo4j.rpc.server.RPCServer;
import cn.hippo4j.rpc.support.NettyProxyCenter;
import io.netty.channel.pool.ChannelPoolHandler;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class ConnectHandlerTest {
@Test
public void handlerTest() throws IOException {
// server
Class<InstanceServerLoader> cls = InstanceServerLoader.class;
ClassRegistry.put(cls.getName(), cls);
ServerPort port = new TestServerPort();
Instance instance = new DefaultInstance();
NettyServerTakeHandler serverHandler = new NettyServerTakeHandler(instance);
NettyServerConnection connection = new NettyServerConnection(serverHandler);
RPCServer rpcServer = new RPCServer(connection, port);
static final String take = "serverTake";
static final String biTake = "biTake";
static final String bareTake = "bareTake";
static final String timeout = "timeout";
static final String key = "key";
static final String test = "test";
static RPCServer rpcServer;
static ServerPort port = new TestServerPort();
@BeforeClass
public static void startServer() {
CallManager manager = new CallManager();
ServerTakeHandler<Integer, Integer> takeHandler = new ServerTakeHandler<>(biTake, manager::call);
ServerBiTakeHandler<Integer, Integer, Integer> biTakeHandler = new ServerBiTakeHandler<>(take, manager::call);
ServerBareTakeHandler<Integer> bareTakeHandler = new ServerBareTakeHandler<>(bareTake, manager::call);
ServerBareTakeHandler<Integer> timeoutHandler = new ServerBareTakeHandler<>(timeout, manager::callTestTimeout);
ServerConnection connection = new SimpleServerConnection(takeHandler, bareTakeHandler, biTakeHandler, timeoutHandler);
rpcServer = new RPCServer(connection, port);
rpcServer.bind();
while (!rpcServer.isActive()) {
ThreadUtil.sleep(100L);
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1L));
}
InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", port.getPort());
ChannelPoolHandler channelPoolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
NettyClientConnection clientConnection = new NettyClientConnection(address, channelPoolHandler);
RPCClient rpcClient = new RPCClient(clientConnection);
}
InstanceServerLoader loader = NettyProxyCenter.createProxy(rpcClient, cls, address);
String name = loader.getName();
Assert.assertEquals("name", name);
rpcClient.close();
rpcServer.close();
@AfterClass
public static void stopServer() throws IOException {
if (rpcServer.isActive()) {
rpcServer.close();
}
}
@Test
public void testConnectHandlerDefault() {
ConnectHandler handler = new TestConnectHandler();
Request request = new DefaultRequest("key", "className", "methodName", new Class[0], new Object[0]);
Request request = new DefaultRequest(key, take, new Object[0]);
Response response = handler.sendHandler(request);
Assert.assertNull(response);
Response response1 = new DefaultResponse("key", this.getClass(), handler);
String key = response1.getKey();
Class<?> cls = response1.getCls();
Response response1 = new DefaultResponse(key, test);
String key = response1.getRID();
Object obj = response1.getObj();
handler.handler(response1);
Assert.assertEquals(key, response1.getKey());
Assert.assertEquals(cls, response1.getCls());
Assert.assertEquals(key, response1.getRID());
Assert.assertEquals(obj, response1.getObj());
}
static class TestConnectHandler implements ConnectHandler {

@ -15,12 +15,12 @@
* limitations under the License.
*/
package cn.hippo4j.rpc.support;
package cn.hippo4j.rpc.handler;
import cn.hippo4j.rpc.client.RandomPort;
import cn.hippo4j.rpc.connection.ConnectPoolHolder;
import cn.hippo4j.rpc.connection.SimpleConnectPool;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
@ -30,7 +30,7 @@ import org.junit.Test;
import java.net.InetSocketAddress;
public class NettyConnectPoolHolderTest {
public class ConnectPoolHolderTest {
String host = "127.0.0.1";
ServerPort port = new TestServerPort();
@ -41,37 +41,37 @@ public class NettyConnectPoolHolderTest {
@Test
public void createPool() {
NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler());
ClientPoolHandler handler = new ClientPoolHandler(new ClientTakeHandler());
InetSocketAddress address = InetSocketAddress.createUnresolved(host, port.getPort());
NettyConnectPool pool = new NettyConnectPool(address, maxCount, timeout, group, cls, handler);
NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(address);
SimpleConnectPool pool = new SimpleConnectPool(address, maxCount, timeout, group, cls, handler);
SimpleConnectPool connectPool = ConnectPoolHolder.getPool(address);
Assert.assertEquals(pool, connectPool);
NettyConnectPoolHolder.clear();
NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(address);
ConnectPoolHolder.clear();
SimpleConnectPool connectPool1 = ConnectPoolHolder.getPool(address);
Assert.assertNull(connectPool1);
}
@Test
public void testGetPool() {
NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler());
ClientPoolHandler handler = new ClientPoolHandler(new ClientTakeHandler());
InetSocketAddress address = InetSocketAddress.createUnresolved(host, port.getPort());
NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(address, timeout, group, handler);
NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(address);
SimpleConnectPool connectPool = ConnectPoolHolder.getPool(address, timeout, group, handler);
SimpleConnectPool connectPool1 = ConnectPoolHolder.getPool(address);
Assert.assertEquals(connectPool1, connectPool);
NettyConnectPoolHolder.clear();
NettyConnectPool connectPool2 = NettyConnectPoolHolder.getPool(address);
ConnectPoolHolder.clear();
SimpleConnectPool connectPool2 = ConnectPoolHolder.getPool(address);
Assert.assertNull(connectPool2);
}
@Test
public void remove() {
NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler());
ClientPoolHandler handler = new ClientPoolHandler(new ClientTakeHandler());
InetSocketAddress address = InetSocketAddress.createUnresolved(host, port.getPort());
NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(address, timeout, group, handler);
NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(address);
SimpleConnectPool connectPool = ConnectPoolHolder.getPool(address, timeout, group, handler);
SimpleConnectPool connectPool1 = ConnectPoolHolder.getPool(address);
Assert.assertEquals(connectPool1, connectPool);
NettyConnectPoolHolder.remove(address);
NettyConnectPool connectPool2 = NettyConnectPoolHolder.getPool(address);
ConnectPoolHolder.remove(address);
SimpleConnectPool connectPool2 = ConnectPoolHolder.getPool(address);
Assert.assertNull(connectPool2);
}

@ -1,155 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.handler;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.rpc.client.CallManager;
import cn.hippo4j.rpc.client.ClientConnection;
import cn.hippo4j.rpc.client.NettyClientConnection;
import cn.hippo4j.rpc.client.RPCClient;
import cn.hippo4j.rpc.client.RandomPort;
import cn.hippo4j.rpc.discovery.ClassRegistry;
import cn.hippo4j.rpc.discovery.DefaultInstance;
import cn.hippo4j.rpc.discovery.Instance;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.model.DefaultRequest;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import cn.hippo4j.rpc.server.NettyServerConnection;
import cn.hippo4j.rpc.server.RPCServer;
import cn.hippo4j.rpc.server.ServerConnection;
import io.netty.channel.ChannelHandler;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
public class NettyClientPoolHandlerTest {
@Test
public void testGetHandlerEntity() {
TestHandler handler = new TestHandler();
long order = 0;
String name = "Test";
NettyClientPoolHandler poolHandler = new NettyClientPoolHandler();
HandlerManager.HandlerEntity<ChannelHandler> entity = poolHandler.getHandlerEntity(order, handler, name);
Assert.assertEquals(entity.getName(), name);
Assert.assertEquals(entity.getOrder(), order);
Assert.assertEquals(entity.getHandler(), handler);
}
@Test
public void testCompareTo() {
TestHandler handler = new TestHandler();
long order = 0;
String name = "Test";
TestHandler handler1 = new TestHandler();
long order1 = 1;
String name1 = "Test1";
NettyClientPoolHandler poolHandler = new NettyClientPoolHandler();
HandlerManager.HandlerEntity<ChannelHandler> entity = poolHandler.getHandlerEntity(order, handler, name);
HandlerManager.HandlerEntity<ChannelHandler> entity1 = poolHandler.getHandlerEntity(order1, handler1, name1);
int compare = entity.compareTo(entity1);
Assert.assertTrue(compare < 0);
}
@Test
public void addLast() {
NettyClientPoolHandler handler = new NettyClientPoolHandler();
Assert.assertTrue(handler.isEmpty());
handler.addLast(null, new TestHandler());
Assert.assertFalse(handler.isEmpty());
}
@Test
public void addFirst() {
NettyClientPoolHandler handler = new NettyClientPoolHandler();
Assert.assertTrue(handler.isEmpty());
handler.addFirst(null, new TestHandler());
Assert.assertFalse(handler.isEmpty());
}
@Test
public void testAddLast() {
NettyClientPoolHandler handler = new NettyClientPoolHandler();
Assert.assertTrue(handler.isEmpty());
handler.addLast("Test", new TestHandler());
Assert.assertFalse(handler.isEmpty());
}
@Test
public void testAddFirst() {
NettyClientPoolHandler handler = new NettyClientPoolHandler();
Assert.assertTrue(handler.isEmpty());
handler.addFirst("Test", new TestHandler());
Assert.assertFalse(handler.isEmpty());
}
@Test(expected = RuntimeException.class)
public void testGetHandlerEntityFalse() {
TestFalseHandler handler = new TestFalseHandler();
long order = 0;
String name = "Test";
NettyClientPoolHandler poolHandler = new NettyClientPoolHandler();
poolHandler.getHandlerEntity(order, handler, name);
}
@Test
public void connectionTest() throws IOException {
ServerPort port = new ServerPort() {
final int a = RandomPort.getSafeRandomPort();
@Override
public int getPort() {
return a;
}
};
Class<CallManager> cls = CallManager.class;
String className = cls.getName();
ClassRegistry.put(className, cls);
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(connection, port);
rpcServer.bind();
while (!rpcServer.isActive()) {
ThreadUtil.sleep(100L);
}
InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", port.getPort());
List<ChannelHandler> handlers = new ArrayList<>();
handlers.add(new NettyClientTakeHandler());
NettyClientPoolHandler channelPoolHandler = new NettyClientPoolHandler(handlers);
channelPoolHandler.addLast("test", new TestHandler());
ClientConnection clientConnection = new NettyClientConnection(address, channelPoolHandler);
RPCClient rpcClient = new RPCClient(clientConnection);
Request request = new DefaultRequest("127.0.0.18888", className, "call", null, null);
for (int i = 0; i < 50; i++) {
Response response = rpcClient.connection(request);
boolean active = rpcClient.isActive();
Assert.assertTrue(active);
Assert.assertEquals(response.getObj(), 1);
}
rpcClient.close();
rpcServer.close();
}
}

@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.handler;
import cn.hippo4j.rpc.client.CallManager;
import cn.hippo4j.rpc.client.ClientSupport;
import cn.hippo4j.rpc.client.RPCClient;
import cn.hippo4j.rpc.client.RandomPort;
import cn.hippo4j.rpc.connection.ServerConnection;
import cn.hippo4j.rpc.connection.SimpleClientConnection;
import cn.hippo4j.rpc.connection.SimpleServerConnection;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.exception.ConnectionException;
import cn.hippo4j.rpc.model.DefaultRequest;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.server.RPCServer;
import cn.hippo4j.rpc.support.AddressUtil;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class ServerHandlerTest {
static final String host = "localhost";
static ServerPort port = new TestServerPort();
static final String take = "serverTakeServer";
static final String biTake = "biTakeServer";
static final String bareTake = "bareTakeServer";
static final String timeout = "timeoutServer";
static final String error = "errorServer";
static RPCServer rpcServer;
@BeforeClass
public static void startServer() {
CallManager manager = new CallManager();
ServerTakeHandler<Integer, Integer> takeHandler = new ServerTakeHandler<>(take, manager::call);
ServerBiTakeHandler<Integer, Integer, Integer> biTakeHandler = new ServerBiTakeHandler<>(biTake, manager::call);
ServerBareTakeHandler<Integer> bareTakeHandler = new ServerBareTakeHandler<>(bareTake, manager::call);
ServerBareTakeHandler<Integer> timeoutHandler = new ServerBareTakeHandler<>(timeout, manager::callTestTimeout);
ErrorServerHandler error = new ErrorServerHandler();
ServerConnection connection = new SimpleServerConnection(takeHandler, bareTakeHandler, biTakeHandler, timeoutHandler, error);
rpcServer = new RPCServer(connection, port);
rpcServer.bind();
while (!rpcServer.isActive()) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1L));
}
}
@AfterClass
public static void stopServer() throws IOException {
if (rpcServer.isActive()) {
rpcServer.close();
}
}
/**
* This test case can be overridden under the handler and coder packages
*/
@Test
public void connection() {
String s = host + ":" + port.getPort();
int send = ClientSupport.clientSend(s, take, 1);
Assert.assertEquals(send, 1);
InetSocketAddress socketAddress = AddressUtil.getInetAddress(s);
ClientSupport.closeClient(socketAddress);
}
@Test(expected = ConnectionException.class)
public void connectionError() {
String s = host + ":" + port.getPort();
int send = ClientSupport.clientSend(s, error, 1);
Assert.assertEquals(1, send);
InetSocketAddress socketAddress = AddressUtil.getInetAddress(s);
ClientSupport.closeClient(socketAddress);
}
@Test
public void connectionTest() {
Integer[] params = {1, 6};
String s = host + ":" + port.getPort();
int send = ClientSupport.clientSend(s, biTake, params);
Assert.assertEquals(7, send);
InetSocketAddress socketAddress = AddressUtil.getInetAddress(s);
ClientSupport.closeClient(socketAddress);
}
@Test
public void connectionTestBare() {
String s = host + ":" + port.getPort();
int send = ClientSupport.clientSend(s, bareTake);
Assert.assertEquals(1, send);
InetSocketAddress socketAddress = AddressUtil.getInetAddress(s);
ClientSupport.closeClient(socketAddress);
}
@Test(expected = Exception.class)
public void responseNullExceptionTest() {
String s = host + ":" + port.getPort();
ClientPoolHandler handler = new ClientPoolHandler(new ClientTakeHandler());
InetSocketAddress socketAddress = AddressUtil.getInetAddress(s);
SimpleClientConnection connection = new SimpleClientConnection(socketAddress, handler);
connection.setTimeout(1L);
RPCClient client = new RPCClient(connection);
Request request = new DefaultRequest(UUID.randomUUID().toString(), timeout);
client.connect(request);
}
static class TestServerPort implements ServerPort {
int port = RandomPort.getSafeRandomPort();
@Override
public int getPort() {
return port;
}
}
}

@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.handler;
import cn.hippo4j.rpc.client.CallManager;
import cn.hippo4j.rpc.client.RandomPort;
import cn.hippo4j.rpc.connection.SimpleConnectPool;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.connection.SimpleServerConnection;
import cn.hippo4j.rpc.server.RPCServer;
import cn.hippo4j.rpc.connection.ServerConnection;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.Future;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class SimpleConnectPoolTest {
String host = "127.0.0.1";
int maxCount = 64;
int timeout = 5000;
EventLoopGroup group = new NioEventLoopGroup();
Class<? extends Channel> cls = NioSocketChannel.class;
static ServerPort port = new TestServerPort();
static final String take = "serverTake";
static final String biTake = "biTake";
static final String bareTake = "bareTake";
static final String timeoutTake = "timeout";
static RPCServer rpcServer;
@BeforeClass
public static void startServer() {
CallManager manager = new CallManager();
ServerTakeHandler<Integer, Integer> takeHandler = new ServerTakeHandler<>(biTake, manager::call);
ServerBiTakeHandler<Integer, Integer, Integer> biTakeHandler = new ServerBiTakeHandler<>(take, manager::call);
ServerBareTakeHandler<Integer> bareTakeHandler = new ServerBareTakeHandler<>(bareTake, manager::call);
ServerBareTakeHandler<Integer> timeoutHandler = new ServerBareTakeHandler<>(timeoutTake, manager::callTestTimeout);
ServerConnection connection = new SimpleServerConnection(takeHandler, bareTakeHandler, biTakeHandler, timeoutHandler);
rpcServer = new RPCServer(connection, port);
rpcServer.bind();
while (!rpcServer.isActive()) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1L));
}
}
@AfterClass
public static void stopServer() throws IOException {
if (rpcServer.isActive()) {
rpcServer.close();
}
}
@Test
public void acquire() {
InetSocketAddress address = InetSocketAddress.createUnresolved(host, port.getPort());
ClientPoolHandler poolHandler = new ClientPoolHandler(new ClientTakeHandler());
SimpleConnectPool pool = new SimpleConnectPool(address, maxCount, timeout, group, cls, poolHandler);
Channel acquire = pool.acquire(timeout);
Assert.assertNotNull(acquire);
pool.release(acquire);
}
@Test
public void testAcquire() {
InetSocketAddress address = InetSocketAddress.createUnresolved(host, port.getPort());
ClientPoolHandler poolHandler = new ClientPoolHandler(new ClientTakeHandler());
SimpleConnectPool pool = new SimpleConnectPool(address, maxCount, timeout, group, cls, poolHandler);
Future<Channel> acquire = pool.acquire();
Assert.assertNotNull(acquire);
}
@Test
public void close() {
InetSocketAddress address = InetSocketAddress.createUnresolved(host, port.getPort());
ClientPoolHandler poolHandler = new ClientPoolHandler(new ClientTakeHandler());
SimpleConnectPool pool = new SimpleConnectPool(address, maxCount, timeout, group, cls, poolHandler);
Channel acquire = pool.acquire(timeout);
Assert.assertNotNull(acquire);
pool.release(acquire);
pool.close();
}
static class TestServerPort implements ServerPort {
int port = RandomPort.getSafeRandomPort();
@Override
public int getPort() {
return port;
}
}
}

@ -17,7 +17,6 @@
package cn.hippo4j.rpc.model;
import cn.hippo4j.rpc.discovery.InstanceServerLoaderImpl;
import org.junit.Assert;
import org.junit.Test;
@ -26,20 +25,18 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
public class DefaultRequestTest {
static final String name = "name";
static final String rid = "rid";
static final String hippo4j = "hippo4j";
@Test
public void testReadObject() throws IOException, ClassNotFoundException, NoSuchMethodException {
String key = "name";
String clsName = InstanceServerLoaderImpl.class.getName();
Method method = InstanceServerLoaderImpl.class.getMethod("setName", String.class);
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
public void testReadObject() throws IOException, ClassNotFoundException {
Object[] parameters = new Object[1];
parameters[0] = "hippo4j";
Request request = new DefaultRequest(key, clsName, methodName, parameterTypes, parameters);
parameters[0] = hippo4j;
Request request = new DefaultRequest(rid, name, parameters);
byte[] bytes;
try (
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
@ -55,26 +52,17 @@ public class DefaultRequestTest {
request1 = (Request) objectInputStream.readObject();
}
Assert.assertEquals(request1.hashCode(), request1.hashCode());
Assert.assertEquals(key, request1.getKey());
Assert.assertEquals(clsName, request1.getClassName());
Assert.assertEquals(methodName, request1.getMethodName());
Assert.assertArrayEquals(parameterTypes, request1.getParameterTypes());
Assert.assertEquals(name, request1.getKey());
Assert.assertEquals(rid, request1.getRID());
Assert.assertArrayEquals(parameters, request1.getParameters());
Assert.assertEquals(request1, request);
}
@Test
public void testEquals() throws NoSuchMethodException {
String key = "name";
String clsName = InstanceServerLoaderImpl.class.getName();
Method method = InstanceServerLoaderImpl.class.getMethod("setName", String.class);
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
Object[] parameters = new Object[1];
parameters[0] = "hippo4j";
Request request = new DefaultRequest(key, clsName, methodName, parameterTypes, parameters);
public void testEquals() {
Request request = new DefaultRequest(rid, name);
Assert.assertTrue(request.equals(request));
Assert.assertFalse(request.equals(null));
Assert.assertFalse(request == null);
}
}

@ -28,12 +28,13 @@ import java.io.ObjectOutputStream;
public class DefaultResponseTest {
static final String rid = "name";
static final Object o = "obj";
static final String errMsg = "test throwable";
@Test
public void testReadObject() throws IOException, ClassNotFoundException {
String key = "name";
Object o = "obj";
Class<?> cls = String.class;
Response response = new DefaultResponse(key, cls, o);
Response response = new DefaultResponse(rid, o);
byte[] bytes;
try (
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
@ -49,19 +50,15 @@ public class DefaultResponseTest {
response1 = (Response) objectInputStream.readObject();
}
Assert.assertEquals(response1.hashCode(), response.hashCode());
Assert.assertEquals(key, response1.getKey());
Assert.assertEquals(rid, response1.getRID());
Assert.assertEquals(o, response1.getObj());
Assert.assertEquals(cls, response1.getCls());
Assert.assertEquals(response1, response);
Assert.assertFalse(response1.isErr());
}
@Test
public void testWriteObject() throws IOException, ClassNotFoundException {
String key = "name";
Throwable throwable = new RuntimeException("test throwable");
String errMsg = "test throwable";
Response response = new DefaultResponse(key, throwable, errMsg);
Response response = new DefaultResponse(rid, errMsg);
byte[] bytes;
try (
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
@ -76,10 +73,7 @@ public class DefaultResponseTest {
ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)) {
response1 = (Response) objectInputStream.readObject();
}
Assert.assertEquals(key, response1.getKey());
Assert.assertThrows(RuntimeException.class, () -> {
throw response1.getThrowable();
});
Assert.assertEquals(rid, response1.getRID());
Assert.assertEquals(response1.hashCode(), response.hashCode());
Assert.assertEquals(errMsg, response1.getErrMsg());
Assert.assertEquals(response1, response);
@ -87,11 +81,8 @@ public class DefaultResponseTest {
}
@Test
public void testEquals() throws NoSuchMethodException {
String key = "name";
Object o = "obj";
Class<?> cls = String.class;
Response response = new DefaultResponse(key, cls, o);
public void testEquals() {
Response response = new DefaultResponse(rid, o);
Assert.assertTrue(response.equals(response));
Assert.assertFalse(response.equals(null));
}

@ -17,41 +17,48 @@
package cn.hippo4j.rpc.server;
import cn.hippo4j.rpc.connection.SimpleServerConnection;
import cn.hippo4j.rpc.handler.TestHandler;
import org.junit.Assert;
import org.junit.Test;
public class NettyServerConnectionTest {
static final String test = "Test";
@Test
public void addLast() {
NettyServerConnection connection = new NettyServerConnection();
Assert.assertTrue(connection.isEmpty());
connection.addLast(null, new TestHandler());
Assert.assertFalse(connection.isEmpty());
try (SimpleServerConnection connection = new SimpleServerConnection()) {
Assert.assertTrue(connection.isEmpty());
connection.addLast(null, new TestHandler());
Assert.assertFalse(connection.isEmpty());
}
}
@Test
public void addFirst() {
NettyServerConnection connection = new NettyServerConnection();
Assert.assertTrue(connection.isEmpty());
connection.addFirst(null, new TestHandler());
Assert.assertFalse(connection.isEmpty());
try (SimpleServerConnection connection = new SimpleServerConnection()) {
Assert.assertTrue(connection.isEmpty());
connection.addFirst(null, new TestHandler());
Assert.assertFalse(connection.isEmpty());
}
}
@Test
public void testAddLast() {
NettyServerConnection connection = new NettyServerConnection();
Assert.assertTrue(connection.isEmpty());
connection.addLast("Test", new TestHandler());
Assert.assertFalse(connection.isEmpty());
try (SimpleServerConnection connection = new SimpleServerConnection()) {
Assert.assertTrue(connection.isEmpty());
connection.addLast(test, new TestHandler());
Assert.assertFalse(connection.isEmpty());
}
}
@Test
public void testAddFirst() {
NettyServerConnection connection = new NettyServerConnection();
Assert.assertTrue(connection.isEmpty());
connection.addFirst("Test", new TestHandler());
Assert.assertFalse(connection.isEmpty());
try (SimpleServerConnection connection = new SimpleServerConnection()) {
Assert.assertTrue(connection.isEmpty());
connection.addFirst(test, new TestHandler());
Assert.assertFalse(connection.isEmpty());
}
}
}

@ -17,43 +17,31 @@
package cn.hippo4j.rpc.server;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.rpc.client.CallManager;
import cn.hippo4j.rpc.client.ClientConnection;
import cn.hippo4j.rpc.client.NettyClientConnection;
import cn.hippo4j.rpc.client.RPCClient;
import cn.hippo4j.rpc.client.RandomPort;
import cn.hippo4j.rpc.discovery.ClassRegistry;
import cn.hippo4j.rpc.discovery.DefaultInstance;
import cn.hippo4j.rpc.discovery.Instance;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.rpc.handler.TestHandler;
import cn.hippo4j.rpc.model.DefaultRequest;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import cn.hippo4j.rpc.connection.ServerConnection;
import cn.hippo4j.rpc.connection.SimpleServerConnection;
import cn.hippo4j.rpc.handler.ServerTakeHandler;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.pool.ChannelPoolHandler;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class RPCServerTest {
static final String instance = "instance";
@Test
public void bind() throws IOException {
Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler);
ServerTakeHandler handler = new ServerTakeHandler(instance, o -> 1);
ServerConnection connection = new SimpleServerConnection(handler);
RPCServer rpcServer = new RPCServer(connection, RandomPort::getSafeRandomPort);
rpcServer.bind();
while (!rpcServer.isActive()) {
ThreadUtil.sleep(100L);
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1L));
}
boolean active = rpcServer.isActive();
Assert.assertTrue(active);
@ -62,71 +50,18 @@ public class RPCServerTest {
@Test
public void bindTest() throws IOException {
Instance instance = new DefaultInstance();
EventLoopGroup leader = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(leader, worker, handler);
ServerTakeHandler handler = new ServerTakeHandler(instance, o -> 1);
ServerConnection connection = new SimpleServerConnection(leader, worker, handler);
RPCServer rpcServer = new RPCServer(connection, RandomPort::getSafeRandomPort);
rpcServer.bind();
while (!rpcServer.isActive()) {
ThreadUtil.sleep(100L);
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(1L));
}
boolean active = rpcServer.isActive();
Assert.assertTrue(active);
rpcServer.close();
}
@Test
public void bindPipelineTest() throws IOException {
ServerPort serverPort = new ServerPort() {
final int port = RandomPort.getSafeRandomPort();
@Override
public int getPort() {
return port;
}
};
Class<CallManager> cls = CallManager.class;
String className = cls.getName();
ClassRegistry.put(className, cls);
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
NettyServerConnection connection = new NettyServerConnection(handler);
connection.addLast("Test", new TestHandler());
RPCServer rpcServer = new RPCServer(connection, serverPort);
rpcServer.bind();
while (!rpcServer.isActive()) {
ThreadUtil.sleep(100L);
}
InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", serverPort.getPort());
ChannelPoolHandler channelPoolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
ClientConnection clientConnection = new NettyClientConnection(address, channelPoolHandler);
RPCClient rpcClient = new RPCClient(clientConnection);
Request request = new DefaultRequest("127.0.0.18888", className, "call", null, null);
for (int i = 0; i < 50; i++) {
Response response = rpcClient.connection(request);
boolean active = rpcClient.isActive();
Assert.assertTrue(active);
Assert.assertEquals(response.getObj(), 1);
}
rpcClient.close();
rpcServer.close();
}
@Test
public void bindNegativeTest() {
ServerPort serverPort = () -> -1;
Class<CallManager> cls = CallManager.class;
String className = cls.getName();
ClassRegistry.put(className, cls);
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
NettyServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(connection, serverPort);
rpcServer.bind();
}
}

@ -17,19 +17,25 @@
package cn.hippo4j.rpc.server;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.Connection;
import cn.hippo4j.rpc.client.RandomPort;
import org.junit.Assert;
import org.junit.Test;
/**
* This applies to server-side connections
*
* @since 2.0.0
*/
public interface ServerConnection extends Connection {
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class ServerSupportTest {
/**
* Bind ports and process them
*/
void bind(ServerPort port);
@Test
public void bind() throws IOException {
ServerSupport support = new ServerSupport(RandomPort::getSafeRandomPort);
support.bind();
while (!support.isActive()) {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100L));
}
Assert.assertTrue(support.isActive());
support.close();
}
}
}

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.support;
import cn.hippo4j.rpc.exception.ConnectionException;
import org.junit.Assert;
import org.junit.Test;
import java.net.InetSocketAddress;
public class AddressUtilTest {
String address1 = "http://hippo4j.cn/login:8080";
String address2 = "https://hippo4j.cn/login:8080";
String address3 = "https://hippo4j.cn/login";
String addressHostName = "hippo4j.cn/login";
int addressPort = 8080;
@Test
public void test() {
InetSocketAddress address = AddressUtil.getInetAddress(address1);
Assert.assertEquals(addressHostName, address.getHostName());
Assert.assertEquals(addressPort, address.getPort());
}
@Test
public void testAddress2() {
InetSocketAddress address = AddressUtil.getInetAddress(address2);
Assert.assertEquals(addressHostName, address.getHostName());
Assert.assertEquals(addressPort, address.getPort());
}
@Test(expected = ConnectionException.class)
public void testAddress3() {
AddressUtil.getInetAddress(address3);
}
}

@ -1,72 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.support;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.rpc.client.CallManager;
import cn.hippo4j.rpc.client.ClientConnection;
import cn.hippo4j.rpc.client.NettyClientConnection;
import cn.hippo4j.rpc.client.RPCClient;
import cn.hippo4j.rpc.client.RandomPort;
import cn.hippo4j.rpc.discovery.ClassRegistry;
import cn.hippo4j.rpc.discovery.DefaultInstance;
import cn.hippo4j.rpc.discovery.Instance;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.rpc.handler.TestHandler;
import cn.hippo4j.rpc.server.NettyServerConnection;
import cn.hippo4j.rpc.server.RPCServer;
import io.netty.channel.pool.ChannelPoolHandler;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
public class NettyClientSupportTest {
@Test
public void closeTest() throws IOException {
int port = RandomPort.getSafeRandomPort();
ServerPort serverPort = () -> port;
Class<CallManager> cls = CallManager.class;
String className = cls.getName();
ClassRegistry.put(className, cls);
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
NettyServerConnection connection = new NettyServerConnection(handler);
connection.addLast("Test", new TestHandler());
RPCServer rpcServer = new RPCServer(connection, serverPort);
rpcServer.bind();
while (!rpcServer.isActive()) {
ThreadUtil.sleep(100L);
}
InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", port);
ChannelPoolHandler channelPoolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
ClientConnection clientConnection = new NettyClientConnection(address, channelPoolHandler);
RPCClient rpcClient = new RPCClient(clientConnection);
NettyClientSupport.closeClient(new InetSocketAddress("localhost", port));
rpcClient.close();
rpcServer.close();
}
}

@ -1,122 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.support;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.rpc.client.RandomPort;
import cn.hippo4j.rpc.discovery.DefaultInstance;
import cn.hippo4j.rpc.discovery.Instance;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.rpc.server.NettyServerConnection;
import cn.hippo4j.rpc.server.RPCServer;
import cn.hippo4j.rpc.server.ServerConnection;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.Future;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
public class NettyConnectPoolTest {
String host = "127.0.0.1";
ServerPort port = new TestServerPort();
int maxCount = 64;
int timeout = 5000;
EventLoopGroup group = new NioEventLoopGroup();
Class<? extends Channel> cls = NioSocketChannel.class;
@Test
public void acquire() throws IOException {
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(connection, port);
rpcServer.bind();
// Given the delay in starting the server, wait here
while (!rpcServer.isActive()) {
ThreadUtil.sleep(100L);
}
InetSocketAddress address = InetSocketAddress.createUnresolved(host, port.getPort());
NettyClientPoolHandler poolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool pool = new NettyConnectPool(address, maxCount, timeout, group, cls, poolHandler);
Channel acquire = pool.acquire(timeout);
Assert.assertNotNull(acquire);
pool.release(acquire);
rpcServer.close();
}
@Test
public void testAcquire() throws IOException {
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(connection, port);
rpcServer.bind();
// Given the delay in starting the server, wait here
while (!rpcServer.isActive()) {
ThreadUtil.sleep(100L);
}
InetSocketAddress address = InetSocketAddress.createUnresolved(host, port.getPort());
NettyClientPoolHandler poolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool pool = new NettyConnectPool(address, maxCount, timeout, group, cls, poolHandler);
Future<Channel> acquire = pool.acquire();
Assert.assertNotNull(acquire);
rpcServer.close();
}
@Test
public void close() throws IOException {
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(connection, port);
rpcServer.bind();
// Given the delay in starting the server, wait here
while (!rpcServer.isActive()) {
ThreadUtil.sleep(100L);
}
InetSocketAddress address = InetSocketAddress.createUnresolved(host, port.getPort());
NettyClientPoolHandler poolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool pool = new NettyConnectPool(address, maxCount, timeout, group, cls, poolHandler);
Channel acquire = pool.acquire(timeout);
Assert.assertNotNull(acquire);
pool.release(acquire);
pool.close();
rpcServer.close();
}
static class TestServerPort implements ServerPort {
int port = RandomPort.getSafeRandomPort();
@Override
public int getPort() {
return port;
}
}
}

@ -1,134 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.support;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.rpc.client.Client;
import cn.hippo4j.rpc.client.NettyClientConnection;
import cn.hippo4j.rpc.client.RPCClient;
import cn.hippo4j.rpc.client.RandomPort;
import cn.hippo4j.rpc.discovery.ClassRegistry;
import cn.hippo4j.rpc.discovery.DefaultInstance;
import cn.hippo4j.rpc.discovery.Instance;
import cn.hippo4j.rpc.discovery.InstanceServerLoader;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.exception.ConnectionException;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.rpc.server.NettyServerConnection;
import cn.hippo4j.rpc.server.RPCServer;
import io.netty.channel.pool.ChannelPoolHandler;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
public class NettyProxyCenterTest {
ServerPort port = new TestServerPort();
@Test
public void getProxy() {
InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", port.getPort());
NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler());
ProxyInterface localhost = NettyProxyCenter.getProxy(ProxyInterface.class, address, handler);
Assert.assertNotNull(localhost);
}
@Test
public void createProxy() {
ProxyInterface localhost = NettyProxyCenter.getProxy(ProxyInterface.class, "localhost:8894");
Assert.assertNotNull(localhost);
NettyProxyCenter.getProxy(ProxyInterface.class, "localhost:8894");
InetSocketAddress socketAddress = InetSocketAddress.createUnresolved("localhost", 8894);
Client client = NettyClientSupport.getClient(socketAddress);
ProxyInterface proxy = NettyProxyCenter.createProxy(client, ProxyInterface.class, socketAddress);
Assert.assertNotNull(proxy);
}
@Test(expected = ConnectionException.class)
public void createProxyException() {
NettyProxyCenter.getProxy(ProxyInterface.class, "localhost8894");
}
@Test
public void removeProxy() {
NettyProxyCenter.getProxy(ProxyInterface.class, "localhost:8894");
NettyProxyCenter.removeProxy(ProxyInterface.class, "localhost:8894");
}
@Test(expected = ConnectionException.class)
public void removeProxyException() {
NettyProxyCenter.removeProxy(ProxyInterface.class, "localhost8894");
}
@Test(expected = RuntimeException.class)
public void getProxyTest() {
InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", port.getPort());
NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler());
ProxyClass localhost = NettyProxyCenter.getProxy(ProxyClass.class, address, handler);
Assert.assertNotNull(localhost);
}
@Test
public void bindPipelineTest() throws IOException {
// server
Class<InstanceServerLoader> cls = InstanceServerLoader.class;
ClassRegistry.put(cls.getName(), cls);
ServerPort port = new TestServerPort();
Instance instance = new DefaultInstance();
NettyServerTakeHandler serverHandler = new NettyServerTakeHandler(instance);
NettyServerConnection connection = new NettyServerConnection(serverHandler);
RPCServer rpcServer = new RPCServer(connection, port);
rpcServer.bind();
while (!rpcServer.isActive()) {
ThreadUtil.sleep(100L);
}
InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", port.getPort());
ChannelPoolHandler channelPoolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
NettyClientConnection clientConnection = new NettyClientConnection(address, channelPoolHandler);
RPCClient rpcClient = new RPCClient(clientConnection);
InstanceServerLoader loader = NettyProxyCenter.createProxy(rpcClient, cls, address);
String name = loader.getName();
Assert.assertEquals("name", name);
rpcClient.close();
rpcServer.close();
}
interface ProxyInterface {
void hello();
}
static class ProxyClass {
}
static class TestServerPort implements ServerPort {
int port = RandomPort.getSafeRandomPort();
@Override
public int getPort() {
return port;
}
}
}

@ -1,56 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.support;
import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.rpc.client.RandomPort;
import cn.hippo4j.rpc.discovery.InstanceServerLoader;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class NettyServerSupportTest {
@Test
public void bind() throws IOException {
NettyServerSupport support = new NettyServerSupport(RandomPort::getSafeRandomPort, InstanceServerLoader.class);
support.bind();
while (!support.isActive()) {
ThreadUtil.sleep(100L);
}
Assert.assertTrue(support.isActive());
support.close();
}
@Test
public void bindTest() throws IOException {
List<Class<?>> classes = new ArrayList<>();
classes.add(InstanceServerLoader.class);
NettyServerSupport support = new NettyServerSupport(RandomPort::getSafeRandomPort, classes);
support.bind();
while (!support.isActive()) {
ThreadUtil.sleep(100L);
}
Assert.assertTrue(support.isActive());
support.close();
}
}

@ -17,10 +17,10 @@
package cn.hippo4j.rpc.support;
import cn.hippo4j.common.toolkit.IdUtil;
import org.junit.Assert;
import org.junit.Test;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -28,12 +28,15 @@ import java.util.concurrent.locks.LockSupport;
public class ResultHolderTest {
static final String str1 = "1";
static final String str2 = "2";
@Test
public void test() {
String s1 = IdUtil.simpleUUID();
String o1 = s1 + "1";
String s2 = IdUtil.simpleUUID();
String o2 = s2 + "2";
String s1 = UUID.randomUUID().toString();
String o1 = s1 + str1;
String s2 = UUID.randomUUID().toString();
String o2 = s2 + str2;
ResultHolder.put(s1, o1);
ResultHolder.put(s2, o2);
@ -48,8 +51,8 @@ public class ResultHolderTest {
@Test
public void testThread() throws InterruptedException {
AtomicInteger a = new AtomicInteger();
String s1 = IdUtil.simpleUUID();
String o1 = s1 + "1";
String s1 = UUID.randomUUID().toString();
String o1 = s1 + str1;
CompletableFuture.runAsync(() -> {
ResultHolder.putThread(o1, Thread.currentThread());
LockSupport.park();

@ -1,18 +0,0 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
cn.hippo4j.rpc.discovery.InstanceServerLoaderImpl
Loading…
Cancel
Save