fix : Add set multiple ChannelHandler(#812)

pull/912/head
pizihao 3 years ago
parent 6a88b2bd0a
commit fbb6d05dee

@ -31,6 +31,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.pool.ChannelPoolHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
@ -54,16 +55,17 @@ public class NettyClientConnection implements ClientConnection {
Channel channel;
public NettyClientConnection(String host, int port,
List<ActivePostProcess> activeProcesses) {
List<ActivePostProcess> activeProcesses,
ChannelPoolHandler handler) {
Assert.notNull(worker);
this.host = host;
this.port = port;
this.activeProcessChain = new ActiveProcessChain(activeProcesses);
this.connectionPool = NettyConnectPoolHolder.getPool(host, port, timeout, worker);
this.connectionPool = NettyConnectPoolHolder.getPool(host, port, timeout, worker, handler);
}
public NettyClientConnection(String host, int port) {
this(host, port, new LinkedList<>());
public NettyClientConnection(String host, int port, ChannelPoolHandler handler) {
this(host, port, new LinkedList<>(), handler);
}
@Override

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.handler;
import lombok.AllArgsConstructor;
import lombok.Data;
/**
* Manage the Handler used in the processing.<br>
* The Handler must be able to exist multiple times and be invoked once in a single execution
*/
public interface HandlerManager<T> {
/**
* Add handler to the end of the Handler chain
*
* @param name name
* @param handler handler
*/
HandlerManager<T> addLast(String name, T handler);
/**
* Add handler to the head of the Handler chain
*
* @param name name
* @param handler handler
*/
HandlerManager<T> addFirst(String name, T handler);
/**
* Add handler to the end of the Handler chain, without specifying a name
*
* @param handler handler
*/
HandlerManager<T> addLast(T handler);
/**
* Adds handler to the head of the Handler chain, without specifying a name
*
* @param handler handler
*/
HandlerManager<T> addFirst(T handler);
/**
* Create a handler
*
* @param order order
* @param handler Handler
* @param name Handler name
* @return HandlerEntity
*/
default HandlerEntity<T> getHandlerEntity(long order, T handler, String name) {
return new HandlerEntity<>(order, handler, name);
}
@Data
@AllArgsConstructor
class HandlerEntity<T> implements Comparable<HandlerEntity<T>>{
/**
* order, The Handler with a larger value is executed after the Handler with a smaller value
*/
long order;
/**
* handler
*/
T handler;
/**
* A high level summary of handler functionality
*/
String name;
@Override
public int compareTo(HandlerEntity<T> o) {
return (int) (this.getOrder() - o.getOrder());
}
}
}

@ -21,16 +21,51 @@ import cn.hippo4j.rpc.coder.NettyDecoder;
import cn.hippo4j.rpc.coder.NettyEncoder;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
/**
* Processing by the client connection pool handler to clean the buffer and define new connection properties
*/
@Slf4j
public class NettyClientPoolHandler implements ChannelPoolHandler {
public class NettyClientPoolHandler extends NettyHandlerManager implements ChannelPoolHandler {
public NettyClientPoolHandler(List<ChannelHandler> handlers) {
super(handlers);
}
public NettyClientPoolHandler(ChannelHandler... handlers) {
super(handlers);
}
public NettyClientPoolHandler() {
super();
}
public NettyClientPoolHandler addLast(String name, ChannelHandler handler) {
super.addLast(name, handler);
return this;
}
public NettyClientPoolHandler addFirst(String name, ChannelHandler handler) {
super.addFirst(name, handler);
return this;
}
public NettyClientPoolHandler addLast(ChannelHandler handler) {
super.addLast(handler);
return this;
}
public NettyClientPoolHandler addFirst(ChannelHandler handler) {
super.addFirst(handler);
return this;
}
@Override
public void channelReleased(Channel ch) {
@ -50,6 +85,15 @@ public class NettyClientPoolHandler implements ChannelPoolHandler {
.setTcpNoDelay(false);
ch.pipeline().addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null)));
ch.pipeline().addLast(new NettyEncoder());
ch.pipeline().addLast(new NettyClientTakeHandler());
this.handlers.stream()
.sorted()
.forEach(h -> {
if (h.getName() == null) {
ch.pipeline().addLast(h.getHandler());
} else {
ch.pipeline().addLast(h.getName(), h.getHandler());
}
});
}
}

@ -19,11 +19,13 @@ package cn.hippo4j.rpc.handler;
import cn.hippo4j.common.web.exception.IllegalException;
import cn.hippo4j.rpc.response.Response;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
/**
* Interconnect with the netty mediation layer
*/
@ChannelHandler.Sharable
public class NettyClientTakeHandler extends AbstractNettyTakeHandler implements ConnectHandler {
@Override

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.handler;
import cn.hippo4j.common.toolkit.Assert;
import io.netty.channel.ChannelHandler;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
/**
* Processor manager for ChannelHandler in netty
*/
public abstract class NettyHandlerManager implements HandlerManager<ChannelHandler> {
protected final List<HandlerEntity<ChannelHandler>> handlers;
AtomicLong firstIndex = new AtomicLong(-1);
AtomicLong lastIndex = new AtomicLong(0);
protected NettyHandlerManager(List<ChannelHandler> handlers) {
this.handlers = handlers.stream()
.filter(Objects::nonNull)
.map(c -> getHandlerEntity(lastIndex.getAndIncrement(), c, null))
.collect(Collectors.toList());
}
protected NettyHandlerManager(ChannelHandler... handlers) {
this(handlers != null ? Arrays.asList(handlers) : Collections.emptyList());
}
protected NettyHandlerManager() {
this.handlers = new LinkedList<>();
}
/**
* {@inheritDoc}
*
* @param name name
* @param handler handler
* @return NettyHandlerManager
*/
public NettyHandlerManager addLast(String name, ChannelHandler handler) {
Assert.notNull(handler);
this.handlers.add(getHandlerEntity(lastIndex.getAndIncrement(), handler, name));
return this;
}
/**
* {@inheritDoc}
*
* @param name name
* @param handler handler
* @return NettyHandlerManager
*/
public NettyHandlerManager addFirst(String name, ChannelHandler handler) {
Assert.notNull(handler);
this.handlers.add(getHandlerEntity(firstIndex.getAndIncrement(), handler, name));
return this;
}
/**
* {@inheritDoc}
*
* @param handler handler
* @return NettyHandlerManager
*/
public NettyHandlerManager addLast(ChannelHandler handler) {
Assert.notNull(handler);
this.handlers.add(getHandlerEntity(lastIndex.getAndIncrement(), handler, null));
return this;
}
/**
* {@inheritDoc}
*
* @param handler handler
* @return NettyHandlerManager
*/
public NettyHandlerManager addFirst(ChannelHandler handler) {
Assert.notNull(handler);
this.handlers.add(getHandlerEntity(firstIndex.getAndDecrement(), handler, null));
return this;
}
}

@ -26,6 +26,7 @@ import cn.hippo4j.rpc.response.DefaultResponse;
import cn.hippo4j.rpc.response.Response;
import cn.hippo4j.rpc.support.ClassRegistry;
import cn.hippo4j.rpc.support.Instance;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import java.lang.reflect.Method;
@ -35,6 +36,7 @@ import java.util.List;
/**
* netty adaptation layer
*/
@ChannelHandler.Sharable
public class NettyServerTakeHandler extends AbstractNettyTakeHandler implements ConnectHandler {
ActiveProcessChain activeProcessChain;

@ -20,9 +20,7 @@ package cn.hippo4j.rpc.server;
import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.rpc.coder.NettyDecoder;
import cn.hippo4j.rpc.coder.NettyEncoder;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.rpc.process.ActivePostProcess;
import cn.hippo4j.rpc.support.Instance;
import cn.hippo4j.rpc.handler.NettyHandlerManager;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
@ -31,45 +29,42 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* adapter to the netty server
*/
@Slf4j
public class NettyServerConnection implements ServerConnection {
public class NettyServerConnection extends NettyHandlerManager implements ServerConnection {
Integer port;
EventLoopGroup leader;
EventLoopGroup worker;
Class<? extends ServerChannel> socketChannelCls = NioServerSocketChannel.class;
List<ActivePostProcess> processes;
Instance instance;
ChannelFuture future;
Channel channel;
public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, List<ActivePostProcess> processes, Instance instance) {
Assert.notNull(processes);
Assert.notNull(instance);
public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, List<ChannelHandler> handlers) {
super(handlers);
Assert.notNull(handlers);
Assert.notNull(leader);
Assert.notNull(worker);
this.leader = leader;
this.worker = worker;
this.processes = processes;
this.instance = instance;
}
public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, Instance instance) {
this(leader, worker, new LinkedList<>(), instance);
public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, ChannelHandler... handlers) {
this(leader, worker, (handlers != null ? Arrays.asList(handlers) : Collections.emptyList()));
}
public NettyServerConnection(List<ActivePostProcess> processes, Instance instance) {
this(new NioEventLoopGroup(), new NioEventLoopGroup(), processes, instance);
public NettyServerConnection(ChannelHandler... handlers) {
this(handlers != null ? Arrays.asList(handlers) : Collections.emptyList());
}
public NettyServerConnection(Instance instance) {
this(new NioEventLoopGroup(), new NioEventLoopGroup(), new LinkedList<>(), instance);
public NettyServerConnection(List<ChannelHandler> handlers) {
this(new NioEventLoopGroup(), new NioEventLoopGroup(), handlers);
}
@Override
@ -84,7 +79,15 @@ public class NettyServerConnection implements ServerConnection {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null)));
ch.pipeline().addLast(new NettyEncoder());
ch.pipeline().addLast(new NettyServerTakeHandler(processes, instance));
handlers.stream()
.sorted()
.forEach(h -> {
if (h.getName() == null) {
ch.pipeline().addLast(h.getHandler());
} else {
ch.pipeline().addLast(h.getName(), h.getHandler());
}
});
}
});
try {
@ -113,4 +116,24 @@ public class NettyServerConnection implements ServerConnection {
public boolean isActive() {
return channel.isActive();
}
public NettyServerConnection addLast(String name, ChannelHandler handler) {
super.addLast(name, handler);
return this;
}
public NettyServerConnection addFirst(String name, ChannelHandler handler) {
super.addFirst(name, handler);
return this;
}
public NettyServerConnection addLast(ChannelHandler handler) {
super.addLast(handler);
return this;
}
public NettyServerConnection addFirst(ChannelHandler handler) {
super.addFirst(handler);
return this;
}
}

@ -18,7 +18,6 @@
package cn.hippo4j.rpc.support;
import cn.hippo4j.rpc.exception.ConnectionException;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
@ -41,14 +40,15 @@ public class NettyConnectPool {
ChannelHealthChecker healthCheck = ChannelHealthChecker.ACTIVE;
FixedChannelPool.AcquireTimeoutAction acquireTimeoutAction = FixedChannelPool.AcquireTimeoutAction.NEW;
int maxPendingAcquires = Integer.MAX_VALUE;
ChannelPoolHandler handler = new NettyClientPoolHandler();
ChannelPoolHandler handler;
ChannelPool pool;
String host;
int port;
public NettyConnectPool(String host, int port, int maxConnect,
long timeout, EventLoopGroup worker,
Class<? extends Channel> socketChannelCls) {
Class<? extends Channel> socketChannelCls,
ChannelPoolHandler handler) {
InetSocketAddress socketAddress = InetSocketAddress.createUnresolved(host, port);
Bootstrap bootstrap = new Bootstrap()
.group(worker)
@ -56,6 +56,7 @@ public class NettyConnectPool {
.remoteAddress(socketAddress);
this.host = host;
this.port = port;
this.handler = handler;
this.pool = new FixedChannelPool(bootstrap, handler, healthCheck, acquireTimeoutAction,
timeout, maxConnect, maxPendingAcquires, true, true);
log.info("The connection pool is established with the connection target {}:{}", host, port);

@ -19,6 +19,7 @@ package cn.hippo4j.rpc.support;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.AccessLevel;
@ -38,11 +39,13 @@ public class NettyConnectPoolHolder {
static Map<String, NettyConnectPool> connectPoolMap = new ConcurrentHashMap<>();
private static NettyConnectPool initPool(String host, int port,
long timeout, EventLoopGroup worker) {
long timeout, EventLoopGroup worker,
ChannelPoolHandler handler) {
return new NettyConnectPool(
host, port, maxConnect,
timeout, worker,
NioSocketChannel.class);
NioSocketChannel.class,
handler);
}
private static String getKey(String host, int port) {
@ -80,15 +83,17 @@ public class NettyConnectPoolHolder {
* @param timeout timeout
* @param worker Special {@link EventExecutorGroup} which allows registering {@link Channel}s
* that get processed for later selection during the event loop.
* @param handler the chandler for netty
* @return Map to the connection pool
*/
public static synchronized NettyConnectPool getPool(String host, int port,
long timeout, EventLoopGroup worker) {
long timeout, EventLoopGroup worker,
ChannelPoolHandler handler) {
/*
* this cannot use the computeIfAbsent method directly here because put is already used in init. Details refer to https://bugs.openjdk.java.net/browse/JDK-8062841
*/
NettyConnectPool pool = getPool(host, port);
return pool == null ? initPool(host, port, timeout, worker) : pool;
return pool == null ? initPool(host, port, timeout, worker, handler) : pool;
}
/**

@ -23,6 +23,7 @@ import cn.hippo4j.rpc.client.NettyClientConnection;
import cn.hippo4j.rpc.request.DefaultRequest;
import cn.hippo4j.rpc.request.Request;
import cn.hippo4j.rpc.response.Response;
import io.netty.channel.pool.ChannelPoolHandler;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
@ -41,16 +42,17 @@ public class NettyProxyCenter {
static Map<Class<?>, Object> map = new HashMap<>();
/**
* PRC
* A proxy object for PRC is obtained through an interface
*
* @param cls
* @param host
* @param port
* @param <T>
* @return
* @param cls The interface type
* @param host Request the address
* @param port port
* @param <T> Object type
* @param handler the pool handler for netty
* @return Proxy objects
*/
public static <T> T getProxy(Class<T> cls, String host, int port) {
NettyClientConnection connection = new NettyClientConnection(host, port);
public static <T> T getProxy(Class<T> cls, String host, int port, ChannelPoolHandler handler) {
NettyClientConnection connection = new NettyClientConnection(host, port, handler);
return getProxy(connection, cls, host, port);
}

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.client;
package cn.hippo4j.rpc.client;
public class CallManager {
@ -23,4 +23,8 @@ public class CallManager {
return 1;
}
public int callTest(Integer a, Integer b) {
return a + b;
}
}

@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.client;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.rpc.request.DefaultRequest;
import cn.hippo4j.rpc.request.Request;
import cn.hippo4j.rpc.response.Response;
import cn.hippo4j.rpc.server.NettyServerConnection;
import cn.hippo4j.rpc.server.RPCServer;
import cn.hippo4j.rpc.server.ServerConnection;
import cn.hippo4j.rpc.support.ClassRegistry;
import cn.hippo4j.rpc.support.DefaultInstance;
import cn.hippo4j.rpc.support.Instance;
import io.netty.channel.pool.ChannelPoolHandler;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class RPCClientTest {
String host = "localhost";
int port = 8888;
int portTest = 8889;
@Test
public void connection() throws IOException {
Class<CallManager> cls = CallManager.class;
String className = cls.getName();
ClassRegistry.put(className, cls);
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(port, connection);
CompletableFuture.runAsync(rpcServer::bind);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
ChannelPoolHandler channelPoolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
NettyClientConnection clientConnection = new NettyClientConnection(host, port, channelPoolHandler);
RPCClient rpcClient = new RPCClient(clientConnection);
Request request = new DefaultRequest("127.0.0.18888", className, "call", null, null);
for (int i = 0; i < 100; i++) {
Response response = rpcClient.connection(request);
boolean active = rpcClient.isActive();
Assert.assertTrue(active);
Assert.assertEquals(response.getObj(), 1);
}
rpcClient.close();
rpcServer.close();
}
/**
* This test case can be overridden under the handler and coder packages
*/
@Test
public void connectionTest() throws IOException {
Class<CallManager> cls = CallManager.class;
String className = cls.getName();
ClassRegistry.put(className, cls);
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(portTest, connection);
CompletableFuture.runAsync(rpcServer::bind);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
ChannelPoolHandler channelPoolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
NettyClientConnection clientConnection = new NettyClientConnection(host, portTest, channelPoolHandler);
RPCClient rpcClient = new RPCClient(clientConnection);
Class<?>[] classes = new Class<?>[2];
classes[0] = Integer.class;
classes[1] = Integer.class;
Object[] objects = new Object[2];
objects[0] = 1;
objects[1] = 2;
Request request = new DefaultRequest("127.0.0.18889", className, "callTest", classes, objects);
Response response = rpcClient.connection(request);
boolean active = rpcClient.isActive();
Assert.assertTrue(active);
Assert.assertEquals(response.getObj(), 3);
rpcClient.close();
rpcServer.close();
}
}

@ -15,12 +15,14 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.server;
package cn.hippo4j.rpc.server;
import cn.hippo4j.config.rpc.support.DefaultInstance;
import cn.hippo4j.config.rpc.support.Instance;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.rpc.support.DefaultInstance;
import cn.hippo4j.rpc.support.Instance;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
@ -34,7 +36,8 @@ public class RPCServerTest {
@Test
public void bind() throws IOException {
Instance instance = new DefaultInstance();
ServerConnection connection = new NettyServerConnection(instance);
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(port, connection);
CompletableFuture.runAsync(rpcServer::bind);
try {
@ -42,7 +45,11 @@ public class RPCServerTest {
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
boolean active = rpcServer.isActive();
Assert.assertTrue(active);
rpcServer.close();
boolean serverActive = rpcServer.isActive();
Assert.assertFalse(serverActive);
}
@Test
@ -50,7 +57,8 @@ public class RPCServerTest {
Instance instance = new DefaultInstance();
EventLoopGroup leader = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
ServerConnection connection = new NettyServerConnection(leader, worker, instance);
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(leader, worker, handler);
RPCServer rpcServer = new RPCServer(port, connection);
CompletableFuture.runAsync(rpcServer::bind);
try {

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
package cn.hippo4j.rpc.support;
import org.junit.Assert;
import org.junit.Test;

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
package cn.hippo4j.rpc.support;
import lombok.AllArgsConstructor;
import lombok.Getter;
@ -38,7 +38,7 @@ public class DefaultInstanceTest {
@Test
public void testGetInstance() {
String className = "cn.hippo4j.config.rpc.support.DefaultInstanceTest$InstanceModel";
String className = "cn.hippo4j.rpc.support.DefaultInstanceTest$InstanceModel";
Object instanceInstance = instance.getInstance(className);
Assert.assertNotNull(instanceInstance);
Assert.assertEquals(className, instanceInstance.getClass().getName());

@ -15,8 +15,10 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
package cn.hippo4j.rpc.support;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
@ -35,7 +37,8 @@ public class NettyConnectPoolHolderTest {
@Test
public void createPool() {
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls);
NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, handler);
NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port);
Assert.assertEquals(pool, connectPool);
NettyConnectPoolHolder.clear();
@ -45,7 +48,8 @@ public class NettyConnectPoolHolderTest {
@Test
public void testGetPool() {
NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port, timeout, group);
NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port, timeout, group, handler);
NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(host, port);
Assert.assertEquals(connectPool1, connectPool);
NettyConnectPoolHolder.clear();
@ -55,7 +59,8 @@ public class NettyConnectPoolHolderTest {
@Test
public void remove() {
NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port, timeout, group);
NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port, timeout, group, handler);
NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(host, port);
Assert.assertEquals(connectPool1, connectPool);
NettyConnectPoolHolder.remove(host, port);

@ -15,11 +15,14 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
package cn.hippo4j.rpc.support;
import cn.hippo4j.config.rpc.server.NettyServerConnection;
import cn.hippo4j.config.rpc.server.RPCServer;
import cn.hippo4j.config.rpc.server.ServerConnection;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.rpc.server.NettyServerConnection;
import cn.hippo4j.rpc.server.RPCServer;
import cn.hippo4j.rpc.server.ServerConnection;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
@ -45,7 +48,8 @@ public class NettyConnectPoolTest {
public void acquire() throws IOException {
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
ServerConnection connection = new NettyServerConnection(instance);
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(port, connection);
CompletableFuture.runAsync(rpcServer::bind);
// Given the delay in starting the server, wait here
@ -54,8 +58,8 @@ public class NettyConnectPoolTest {
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls);
NettyClientPoolHandler poolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, poolHandler);
Channel acquire = pool.acquire(timeout);
Assert.assertNotNull(acquire);
pool.release(acquire);
@ -66,7 +70,8 @@ public class NettyConnectPoolTest {
public void testAcquire() throws IOException {
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
ServerConnection connection = new NettyServerConnection(instance);
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(port, connection);
CompletableFuture.runAsync(rpcServer::bind);
// Given the delay in starting the server, wait here
@ -75,8 +80,8 @@ public class NettyConnectPoolTest {
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls);
NettyClientPoolHandler poolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, poolHandler);
Future<Channel> acquire = pool.acquire();
Assert.assertNotNull(acquire);
rpcServer.close();
@ -86,7 +91,8 @@ public class NettyConnectPoolTest {
public void close() throws IOException {
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
ServerConnection connection = new NettyServerConnection(instance);
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(port, connection);
CompletableFuture.runAsync(rpcServer::bind);
// Given the delay in starting the server, wait here
@ -96,7 +102,8 @@ public class NettyConnectPoolTest {
throw new RuntimeException(e);
}
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls);
NettyClientPoolHandler poolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, poolHandler);
Channel acquire = pool.acquire(timeout);
Assert.assertNotNull(acquire);
pool.release(acquire);

@ -15,9 +15,12 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
package cn.hippo4j.rpc.support;
import cn.hippo4j.common.web.exception.IllegalException;
import cn.hippo4j.rpc.exception.ConnectionException;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import org.junit.Assert;
import org.junit.Test;
@ -25,17 +28,29 @@ public class NettyProxyCenterTest {
@Test
public void getProxy() {
ProxyInterface localhost = NettyProxyCenter.getProxy(ProxyInterface.class, "localhost", 8888);
NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler());
ProxyInterface localhost = NettyProxyCenter.getProxy(ProxyInterface.class, "localhost", 8888, handler);
Assert.assertNotNull(localhost);
}
@Test(expected = IllegalException.class)
public void getProxyTest() {
ProxyClass localhost = NettyProxyCenter.getProxy(ProxyClass.class, "localhost", 8888);
NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler());
ProxyClass localhost = NettyProxyCenter.getProxy(ProxyClass.class, "localhost", 8888, handler);
Assert.assertNotNull(localhost);
}
@Test(expected = ConnectionException.class)
public void getProxyTestCall() {
NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler());
ProxyInterface localhost = NettyProxyCenter.getProxy(ProxyInterface.class, "localhost", 8888, handler);
localhost.hello();
Assert.assertNotNull(localhost);
}
interface ProxyInterface {
void hello();
}
static class ProxyClass {

@ -15,12 +15,17 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
package cn.hippo4j.rpc.support;
import cn.hippo4j.common.toolkit.IdUtil;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
public class ResultHolderTest {
@Test
@ -39,4 +44,21 @@ public class ResultHolderTest {
Assert.assertEquals(r1, o1);
Assert.assertEquals(r2, o2);
}
@Test
public void testThread() throws InterruptedException {
AtomicInteger a = new AtomicInteger();
String s1 = IdUtil.simpleUUID();
String o1 = s1 + "1";
CompletableFuture.runAsync(() -> {
ResultHolder.putThread(o1, Thread.currentThread());
LockSupport.park();
a.set(1);
});
Assert.assertEquals(0, a.get());
TimeUnit.SECONDS.sleep(1);
ResultHolder.wake(o1);
TimeUnit.SECONDS.sleep(1);
Assert.assertEquals(1, a.get());
}
}

@ -1,68 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.client;
import cn.hippo4j.config.rpc.request.DefaultRequest;
import cn.hippo4j.config.rpc.request.Request;
import cn.hippo4j.config.rpc.response.Response;
import cn.hippo4j.config.rpc.server.NettyServerConnection;
import cn.hippo4j.config.rpc.server.RPCServer;
import cn.hippo4j.config.rpc.server.ServerConnection;
import cn.hippo4j.config.rpc.support.DefaultInstance;
import cn.hippo4j.config.rpc.support.Instance;
import cn.hippo4j.config.rpc.support.ClassRegistry;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class RPCClientTest {
String host = "localhost";
int port = 8888;
@Test
public void connection() throws IOException {
Class<CallManager> cls = CallManager.class;
String className = cls.getName();
ClassRegistry.put(className, cls);
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
ServerConnection connection = new NettyServerConnection(instance);
RPCServer rpcServer = new RPCServer(port, connection);
CompletableFuture.runAsync(rpcServer::bind);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
NettyClientConnection clientConnection = new NettyClientConnection(host, port);
RPCClient rpcClient = new RPCClient(clientConnection);
Request request = new DefaultRequest("127.0.0.18888", className, "call", null, null);
for (int i = 0; i < 100; i++) {
Response response = rpcClient.connection(request);
Assert.assertEquals(response.getObj(), 1);
}
rpcClient.close();
rpcServer.close();
}
}
Loading…
Cancel
Save