From fbb6d05deee8883b5ea50ff470afe32cce0923c0 Mon Sep 17 00:00:00 2001 From: pizihao <2335715300@qq.com> Date: Sat, 5 Nov 2022 17:21:08 +0800 Subject: [PATCH] fix : Add set multiple ChannelHandler(#812) --- .../rpc/client/NettyClientConnection.java | 10 +- .../hippo4j/rpc/handler/HandlerManager.java | 95 +++++++++++++++ .../rpc/handler/NettyClientPoolHandler.java | 48 +++++++- .../rpc/handler/NettyClientTakeHandler.java | 2 + .../rpc/handler/NettyHandlerManager.java | 102 ++++++++++++++++ .../rpc/handler/NettyServerTakeHandler.java | 2 + .../rpc/server/NettyServerConnection.java | 61 +++++++--- .../hippo4j/rpc/support/NettyConnectPool.java | 7 +- .../rpc/support/NettyConnectPoolHolder.java | 23 ++-- .../hippo4j/rpc/support/NettyProxyCenter.java | 18 +-- .../cn/hippo4j}/rpc/client/CallManager.java | 6 +- .../cn/hippo4j/rpc/client/RPCClientTest.java | 112 ++++++++++++++++++ .../cn/hippo4j}/rpc/server/RPCServerTest.java | 18 ++- .../rpc/support/ClassRegistryTest.java | 2 +- .../rpc/support/DefaultInstanceTest.java | 4 +- .../support/NettyConnectPoolHolderTest.java | 13 +- .../rpc/support/NettyConnectPoolTest.java | 31 +++-- .../rpc/support/NettyProxyCenterTest.java | 21 +++- .../rpc/support/ResultHolderTest.java | 24 +++- .../config/rpc/client/RPCClientTest.java | 68 ----------- 20 files changed, 525 insertions(+), 142 deletions(-) create mode 100644 hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/HandlerManager.java create mode 100644 hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyHandlerManager.java rename {hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config => hippo4j-rpc/src/test/java/cn/hippo4j}/rpc/client/CallManager.java (89%) create mode 100644 hippo4j-rpc/src/test/java/cn/hippo4j/rpc/client/RPCClientTest.java rename {hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config => hippo4j-rpc/src/test/java/cn/hippo4j}/rpc/server/RPCServerTest.java (79%) rename {hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config => hippo4j-rpc/src/test/java/cn/hippo4j}/rpc/support/ClassRegistryTest.java (98%) rename {hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config => hippo4j-rpc/src/test/java/cn/hippo4j}/rpc/support/DefaultInstanceTest.java (92%) rename {hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config => hippo4j-rpc/src/test/java/cn/hippo4j}/rpc/support/NettyConnectPoolHolderTest.java (81%) rename {hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config => hippo4j-rpc/src/test/java/cn/hippo4j}/rpc/support/NettyConnectPoolTest.java (78%) rename {hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config => hippo4j-rpc/src/test/java/cn/hippo4j}/rpc/support/NettyProxyCenterTest.java (60%) rename {hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config => hippo4j-rpc/src/test/java/cn/hippo4j}/rpc/support/ResultHolderTest.java (63%) delete mode 100644 hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/client/RPCClientTest.java diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/NettyClientConnection.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/NettyClientConnection.java index 5e97ded5..a725ed6a 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/NettyClientConnection.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/NettyClientConnection.java @@ -31,6 +31,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.pool.ChannelPoolHandler; import lombok.extern.slf4j.Slf4j; import java.util.LinkedList; @@ -54,16 +55,17 @@ public class NettyClientConnection implements ClientConnection { Channel channel; public NettyClientConnection(String host, int port, - List activeProcesses) { + List activeProcesses, + ChannelPoolHandler handler) { Assert.notNull(worker); this.host = host; this.port = port; this.activeProcessChain = new ActiveProcessChain(activeProcesses); - this.connectionPool = NettyConnectPoolHolder.getPool(host, port, timeout, worker); + this.connectionPool = NettyConnectPoolHolder.getPool(host, port, timeout, worker, handler); } - public NettyClientConnection(String host, int port) { - this(host, port, new LinkedList<>()); + public NettyClientConnection(String host, int port, ChannelPoolHandler handler) { + this(host, port, new LinkedList<>(), handler); } @Override diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/HandlerManager.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/HandlerManager.java new file mode 100644 index 00000000..8750678d --- /dev/null +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/HandlerManager.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.rpc.handler; + +import lombok.AllArgsConstructor; +import lombok.Data; + +/** + * Manage the Handler used in the processing.
+ * The Handler must be able to exist multiple times and be invoked once in a single execution + */ +public interface HandlerManager { + + /** + * Add handler to the end of the Handler chain + * + * @param name name + * @param handler handler + */ + HandlerManager addLast(String name, T handler); + + /** + * Add handler to the head of the Handler chain + * + * @param name name + * @param handler handler + */ + HandlerManager addFirst(String name, T handler); + + /** + * Add handler to the end of the Handler chain, without specifying a name + * + * @param handler handler + */ + HandlerManager addLast(T handler); + + /** + * Adds handler to the head of the Handler chain, without specifying a name + * + * @param handler handler + */ + HandlerManager addFirst(T handler); + + /** + * Create a handler + * + * @param order order + * @param handler Handler + * @param name Handler name + * @return HandlerEntity + */ + default HandlerEntity getHandlerEntity(long order, T handler, String name) { + return new HandlerEntity<>(order, handler, name); + } + + @Data + @AllArgsConstructor + class HandlerEntity implements Comparable>{ + + /** + * order, The Handler with a larger value is executed after the Handler with a smaller value + */ + long order; + + /** + * handler + */ + T handler; + + /** + * A high level summary of handler functionality + */ + String name; + + @Override + public int compareTo(HandlerEntity o) { + return (int) (this.getOrder() - o.getOrder()); + } + } +} diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyClientPoolHandler.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyClientPoolHandler.java index 662d7fca..b9e38d36 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyClientPoolHandler.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyClientPoolHandler.java @@ -21,16 +21,51 @@ import cn.hippo4j.rpc.coder.NettyDecoder; import cn.hippo4j.rpc.coder.NettyEncoder; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; import io.netty.channel.pool.ChannelPoolHandler; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import lombok.extern.slf4j.Slf4j; +import java.util.List; + /** * Processing by the client connection pool handler to clean the buffer and define new connection properties */ @Slf4j -public class NettyClientPoolHandler implements ChannelPoolHandler { +public class NettyClientPoolHandler extends NettyHandlerManager implements ChannelPoolHandler { + + public NettyClientPoolHandler(List handlers) { + super(handlers); + } + + public NettyClientPoolHandler(ChannelHandler... handlers) { + super(handlers); + } + + public NettyClientPoolHandler() { + super(); + } + + public NettyClientPoolHandler addLast(String name, ChannelHandler handler) { + super.addLast(name, handler); + return this; + } + + public NettyClientPoolHandler addFirst(String name, ChannelHandler handler) { + super.addFirst(name, handler); + return this; + } + + public NettyClientPoolHandler addLast(ChannelHandler handler) { + super.addLast(handler); + return this; + } + + public NettyClientPoolHandler addFirst(ChannelHandler handler) { + super.addFirst(handler); + return this; + } @Override public void channelReleased(Channel ch) { @@ -50,6 +85,15 @@ public class NettyClientPoolHandler implements ChannelPoolHandler { .setTcpNoDelay(false); ch.pipeline().addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null))); ch.pipeline().addLast(new NettyEncoder()); - ch.pipeline().addLast(new NettyClientTakeHandler()); + this.handlers.stream() + .sorted() + .forEach(h -> { + if (h.getName() == null) { + ch.pipeline().addLast(h.getHandler()); + } else { + ch.pipeline().addLast(h.getName(), h.getHandler()); + } + }); } + } diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyClientTakeHandler.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyClientTakeHandler.java index eabb951a..b2cd5d6f 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyClientTakeHandler.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyClientTakeHandler.java @@ -19,11 +19,13 @@ package cn.hippo4j.rpc.handler; import cn.hippo4j.common.web.exception.IllegalException; import cn.hippo4j.rpc.response.Response; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; /** * Interconnect with the netty mediation layer */ +@ChannelHandler.Sharable public class NettyClientTakeHandler extends AbstractNettyTakeHandler implements ConnectHandler { @Override diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyHandlerManager.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyHandlerManager.java new file mode 100644 index 00000000..0649dde3 --- /dev/null +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyHandlerManager.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.rpc.handler; + +import cn.hippo4j.common.toolkit.Assert; +import io.netty.channel.ChannelHandler; + +import java.util.*; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; + +/** + * Processor manager for ChannelHandler in netty + */ +public abstract class NettyHandlerManager implements HandlerManager { + + protected final List> handlers; + + AtomicLong firstIndex = new AtomicLong(-1); + + AtomicLong lastIndex = new AtomicLong(0); + + protected NettyHandlerManager(List handlers) { + this.handlers = handlers.stream() + .filter(Objects::nonNull) + .map(c -> getHandlerEntity(lastIndex.getAndIncrement(), c, null)) + .collect(Collectors.toList()); + } + + protected NettyHandlerManager(ChannelHandler... handlers) { + this(handlers != null ? Arrays.asList(handlers) : Collections.emptyList()); + } + + protected NettyHandlerManager() { + this.handlers = new LinkedList<>(); + } + + /** + * {@inheritDoc} + * + * @param name name + * @param handler handler + * @return NettyHandlerManager + */ + public NettyHandlerManager addLast(String name, ChannelHandler handler) { + Assert.notNull(handler); + this.handlers.add(getHandlerEntity(lastIndex.getAndIncrement(), handler, name)); + return this; + } + + /** + * {@inheritDoc} + * + * @param name name + * @param handler handler + * @return NettyHandlerManager + */ + public NettyHandlerManager addFirst(String name, ChannelHandler handler) { + Assert.notNull(handler); + this.handlers.add(getHandlerEntity(firstIndex.getAndIncrement(), handler, name)); + return this; + } + + /** + * {@inheritDoc} + * + * @param handler handler + * @return NettyHandlerManager + */ + public NettyHandlerManager addLast(ChannelHandler handler) { + Assert.notNull(handler); + this.handlers.add(getHandlerEntity(lastIndex.getAndIncrement(), handler, null)); + return this; + } + + /** + * {@inheritDoc} + * + * @param handler handler + * @return NettyHandlerManager + */ + public NettyHandlerManager addFirst(ChannelHandler handler) { + Assert.notNull(handler); + this.handlers.add(getHandlerEntity(firstIndex.getAndDecrement(), handler, null)); + return this; + } +} diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyServerTakeHandler.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyServerTakeHandler.java index 61eeb070..0bbf7b02 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyServerTakeHandler.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyServerTakeHandler.java @@ -26,6 +26,7 @@ import cn.hippo4j.rpc.response.DefaultResponse; import cn.hippo4j.rpc.response.Response; import cn.hippo4j.rpc.support.ClassRegistry; import cn.hippo4j.rpc.support.Instance; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import java.lang.reflect.Method; @@ -35,6 +36,7 @@ import java.util.List; /** * netty adaptation layer */ +@ChannelHandler.Sharable public class NettyServerTakeHandler extends AbstractNettyTakeHandler implements ConnectHandler { ActiveProcessChain activeProcessChain; diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/NettyServerConnection.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/NettyServerConnection.java index c9684761..671e0748 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/NettyServerConnection.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/NettyServerConnection.java @@ -20,9 +20,7 @@ package cn.hippo4j.rpc.server; import cn.hippo4j.common.toolkit.Assert; import cn.hippo4j.rpc.coder.NettyDecoder; import cn.hippo4j.rpc.coder.NettyEncoder; -import cn.hippo4j.rpc.handler.NettyServerTakeHandler; -import cn.hippo4j.rpc.process.ActivePostProcess; -import cn.hippo4j.rpc.support.Instance; +import cn.hippo4j.rpc.handler.NettyHandlerManager; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; @@ -31,45 +29,42 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import lombok.extern.slf4j.Slf4j; -import java.util.LinkedList; +import java.util.Arrays; +import java.util.Collections; import java.util.List; /** * adapter to the netty server */ @Slf4j -public class NettyServerConnection implements ServerConnection { +public class NettyServerConnection extends NettyHandlerManager implements ServerConnection { Integer port; EventLoopGroup leader; EventLoopGroup worker; Class socketChannelCls = NioServerSocketChannel.class; - List processes; - Instance instance; ChannelFuture future; Channel channel; - public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, List processes, Instance instance) { - Assert.notNull(processes); - Assert.notNull(instance); + public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, List handlers) { + super(handlers); + Assert.notNull(handlers); Assert.notNull(leader); Assert.notNull(worker); this.leader = leader; this.worker = worker; - this.processes = processes; - this.instance = instance; } - public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, Instance instance) { - this(leader, worker, new LinkedList<>(), instance); + public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, ChannelHandler... handlers) { + this(leader, worker, (handlers != null ? Arrays.asList(handlers) : Collections.emptyList())); } - public NettyServerConnection(List processes, Instance instance) { - this(new NioEventLoopGroup(), new NioEventLoopGroup(), processes, instance); + public NettyServerConnection(ChannelHandler... handlers) { + this(handlers != null ? Arrays.asList(handlers) : Collections.emptyList()); } - public NettyServerConnection(Instance instance) { - this(new NioEventLoopGroup(), new NioEventLoopGroup(), new LinkedList<>(), instance); + public NettyServerConnection(List handlers) { + this(new NioEventLoopGroup(), new NioEventLoopGroup(), handlers); } @Override @@ -84,7 +79,15 @@ public class NettyServerConnection implements ServerConnection { protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null))); ch.pipeline().addLast(new NettyEncoder()); - ch.pipeline().addLast(new NettyServerTakeHandler(processes, instance)); + handlers.stream() + .sorted() + .forEach(h -> { + if (h.getName() == null) { + ch.pipeline().addLast(h.getHandler()); + } else { + ch.pipeline().addLast(h.getName(), h.getHandler()); + } + }); } }); try { @@ -113,4 +116,24 @@ public class NettyServerConnection implements ServerConnection { public boolean isActive() { return channel.isActive(); } + + public NettyServerConnection addLast(String name, ChannelHandler handler) { + super.addLast(name, handler); + return this; + } + + public NettyServerConnection addFirst(String name, ChannelHandler handler) { + super.addFirst(name, handler); + return this; + } + + public NettyServerConnection addLast(ChannelHandler handler) { + super.addLast(handler); + return this; + } + + public NettyServerConnection addFirst(ChannelHandler handler) { + super.addFirst(handler); + return this; + } } diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPool.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPool.java index 02438a9e..a34159e9 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPool.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPool.java @@ -18,7 +18,6 @@ package cn.hippo4j.rpc.support; import cn.hippo4j.rpc.exception.ConnectionException; -import cn.hippo4j.rpc.handler.NettyClientPoolHandler; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; @@ -41,14 +40,15 @@ public class NettyConnectPool { ChannelHealthChecker healthCheck = ChannelHealthChecker.ACTIVE; FixedChannelPool.AcquireTimeoutAction acquireTimeoutAction = FixedChannelPool.AcquireTimeoutAction.NEW; int maxPendingAcquires = Integer.MAX_VALUE; - ChannelPoolHandler handler = new NettyClientPoolHandler(); + ChannelPoolHandler handler; ChannelPool pool; String host; int port; public NettyConnectPool(String host, int port, int maxConnect, long timeout, EventLoopGroup worker, - Class socketChannelCls) { + Class socketChannelCls, + ChannelPoolHandler handler) { InetSocketAddress socketAddress = InetSocketAddress.createUnresolved(host, port); Bootstrap bootstrap = new Bootstrap() .group(worker) @@ -56,6 +56,7 @@ public class NettyConnectPool { .remoteAddress(socketAddress); this.host = host; this.port = port; + this.handler = handler; this.pool = new FixedChannelPool(bootstrap, handler, healthCheck, acquireTimeoutAction, timeout, maxConnect, maxPendingAcquires, true, true); log.info("The connection pool is established with the connection target {}:{}", host, port); diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPoolHolder.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPoolHolder.java index 4a6dffa1..e143948f 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPoolHolder.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPoolHolder.java @@ -19,6 +19,7 @@ package cn.hippo4j.rpc.support; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; +import io.netty.channel.pool.ChannelPoolHandler; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.concurrent.EventExecutorGroup; import lombok.AccessLevel; @@ -38,11 +39,13 @@ public class NettyConnectPoolHolder { static Map connectPoolMap = new ConcurrentHashMap<>(); private static NettyConnectPool initPool(String host, int port, - long timeout, EventLoopGroup worker) { + long timeout, EventLoopGroup worker, + ChannelPoolHandler handler) { return new NettyConnectPool( host, port, maxConnect, timeout, worker, - NioSocketChannel.class); + NioSocketChannel.class, + handler); } private static String getKey(String host, int port) { @@ -75,20 +78,22 @@ public class NettyConnectPoolHolder { /** * Gets a connection pool, and if there is no connectPoolMapping, creates one with the values provided and joins the connectPoolMapping * - * @param host the host - * @param port the port - * @param timeout timeout - * @param worker Special {@link EventExecutorGroup} which allows registering {@link Channel}s - * that get processed for later selection during the event loop. + * @param host the host + * @param port the port + * @param timeout timeout + * @param worker Special {@link EventExecutorGroup} which allows registering {@link Channel}s + * that get processed for later selection during the event loop. + * @param handler the chandler for netty * @return Map to the connection pool */ public static synchronized NettyConnectPool getPool(String host, int port, - long timeout, EventLoopGroup worker) { + long timeout, EventLoopGroup worker, + ChannelPoolHandler handler) { /* * this cannot use the computeIfAbsent method directly here because put is already used in init. Details refer to https://bugs.openjdk.java.net/browse/JDK-8062841 */ NettyConnectPool pool = getPool(host, port); - return pool == null ? initPool(host, port, timeout, worker) : pool; + return pool == null ? initPool(host, port, timeout, worker, handler) : pool; } /** diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyProxyCenter.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyProxyCenter.java index 547dbebe..e73ef944 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyProxyCenter.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyProxyCenter.java @@ -23,6 +23,7 @@ import cn.hippo4j.rpc.client.NettyClientConnection; import cn.hippo4j.rpc.request.DefaultRequest; import cn.hippo4j.rpc.request.Request; import cn.hippo4j.rpc.response.Response; +import io.netty.channel.pool.ChannelPoolHandler; import lombok.AccessLevel; import lombok.NoArgsConstructor; @@ -41,16 +42,17 @@ public class NettyProxyCenter { static Map, Object> map = new HashMap<>(); /** - * 通过一个接口得到一个适用于PRC的代理对象 + * A proxy object for PRC is obtained through an interface * - * @param cls 接口类型 - * @param host 请求地址 - * @param port 端口 - * @param 对象类型 - * @return 代理对象 + * @param cls The interface type + * @param host Request the address + * @param port port + * @param Object type + * @param handler the pool handler for netty + * @return Proxy objects */ - public static T getProxy(Class cls, String host, int port) { - NettyClientConnection connection = new NettyClientConnection(host, port); + public static T getProxy(Class cls, String host, int port, ChannelPoolHandler handler) { + NettyClientConnection connection = new NettyClientConnection(host, port, handler); return getProxy(connection, cls, host, port); } diff --git a/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/client/CallManager.java b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/client/CallManager.java similarity index 89% rename from hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/client/CallManager.java rename to hippo4j-rpc/src/test/java/cn/hippo4j/rpc/client/CallManager.java index d3fae3ae..6ad49004 100644 --- a/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/client/CallManager.java +++ b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/client/CallManager.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.client; +package cn.hippo4j.rpc.client; public class CallManager { @@ -23,4 +23,8 @@ public class CallManager { return 1; } + public int callTest(Integer a, Integer b) { + return a + b; + } + } diff --git a/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/client/RPCClientTest.java b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/client/RPCClientTest.java new file mode 100644 index 00000000..3db8311e --- /dev/null +++ b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/client/RPCClientTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.rpc.client; + +import cn.hippo4j.rpc.handler.NettyClientPoolHandler; +import cn.hippo4j.rpc.handler.NettyClientTakeHandler; +import cn.hippo4j.rpc.handler.NettyServerTakeHandler; +import cn.hippo4j.rpc.request.DefaultRequest; +import cn.hippo4j.rpc.request.Request; +import cn.hippo4j.rpc.response.Response; +import cn.hippo4j.rpc.server.NettyServerConnection; +import cn.hippo4j.rpc.server.RPCServer; +import cn.hippo4j.rpc.server.ServerConnection; +import cn.hippo4j.rpc.support.ClassRegistry; +import cn.hippo4j.rpc.support.DefaultInstance; +import cn.hippo4j.rpc.support.Instance; +import io.netty.channel.pool.ChannelPoolHandler; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +public class RPCClientTest { + + String host = "localhost"; + int port = 8888; + int portTest = 8889; + + @Test + public void connection() throws IOException { + Class cls = CallManager.class; + String className = cls.getName(); + ClassRegistry.put(className, cls); + // The mode connection was denied when the server was started on the specified port + Instance instance = new DefaultInstance(); + NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); + ServerConnection connection = new NettyServerConnection(handler); + RPCServer rpcServer = new RPCServer(port, connection); + CompletableFuture.runAsync(rpcServer::bind); + try { + TimeUnit.SECONDS.sleep(3); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + ChannelPoolHandler channelPoolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler()); + NettyClientConnection clientConnection = new NettyClientConnection(host, port, channelPoolHandler); + RPCClient rpcClient = new RPCClient(clientConnection); + Request request = new DefaultRequest("127.0.0.18888", className, "call", null, null); + for (int i = 0; i < 100; i++) { + Response response = rpcClient.connection(request); + boolean active = rpcClient.isActive(); + Assert.assertTrue(active); + Assert.assertEquals(response.getObj(), 1); + } + rpcClient.close(); + rpcServer.close(); + } + + /** + * This test case can be overridden under the handler and coder packages + */ + @Test + public void connectionTest() throws IOException { + Class cls = CallManager.class; + String className = cls.getName(); + ClassRegistry.put(className, cls); + // The mode connection was denied when the server was started on the specified port + Instance instance = new DefaultInstance(); + NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); + ServerConnection connection = new NettyServerConnection(handler); + RPCServer rpcServer = new RPCServer(portTest, connection); + CompletableFuture.runAsync(rpcServer::bind); + try { + TimeUnit.SECONDS.sleep(3); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + ChannelPoolHandler channelPoolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler()); + NettyClientConnection clientConnection = new NettyClientConnection(host, portTest, channelPoolHandler); + RPCClient rpcClient = new RPCClient(clientConnection); + Class[] classes = new Class[2]; + classes[0] = Integer.class; + classes[1] = Integer.class; + Object[] objects = new Object[2]; + objects[0] = 1; + objects[1] = 2; + Request request = new DefaultRequest("127.0.0.18889", className, "callTest", classes, objects); + Response response = rpcClient.connection(request); + boolean active = rpcClient.isActive(); + Assert.assertTrue(active); + Assert.assertEquals(response.getObj(), 3); + rpcClient.close(); + rpcServer.close(); + } +} \ No newline at end of file diff --git a/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/server/RPCServerTest.java b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/server/RPCServerTest.java similarity index 79% rename from hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/server/RPCServerTest.java rename to hippo4j-rpc/src/test/java/cn/hippo4j/rpc/server/RPCServerTest.java index 726ca910..5d06b9f8 100644 --- a/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/server/RPCServerTest.java +++ b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/server/RPCServerTest.java @@ -15,12 +15,14 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.server; +package cn.hippo4j.rpc.server; -import cn.hippo4j.config.rpc.support.DefaultInstance; -import cn.hippo4j.config.rpc.support.Instance; +import cn.hippo4j.rpc.handler.NettyServerTakeHandler; +import cn.hippo4j.rpc.support.DefaultInstance; +import cn.hippo4j.rpc.support.Instance; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; +import org.junit.Assert; import org.junit.Test; import java.io.IOException; @@ -34,7 +36,8 @@ public class RPCServerTest { @Test public void bind() throws IOException { Instance instance = new DefaultInstance(); - ServerConnection connection = new NettyServerConnection(instance); + NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); + ServerConnection connection = new NettyServerConnection(handler); RPCServer rpcServer = new RPCServer(port, connection); CompletableFuture.runAsync(rpcServer::bind); try { @@ -42,7 +45,11 @@ public class RPCServerTest { } catch (InterruptedException e) { throw new RuntimeException(e); } + boolean active = rpcServer.isActive(); + Assert.assertTrue(active); rpcServer.close(); + boolean serverActive = rpcServer.isActive(); + Assert.assertFalse(serverActive); } @Test @@ -50,7 +57,8 @@ public class RPCServerTest { Instance instance = new DefaultInstance(); EventLoopGroup leader = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); - ServerConnection connection = new NettyServerConnection(leader, worker, instance); + NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); + ServerConnection connection = new NettyServerConnection(leader, worker, handler); RPCServer rpcServer = new RPCServer(port, connection); CompletableFuture.runAsync(rpcServer::bind); try { diff --git a/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/ClassRegistryTest.java b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/ClassRegistryTest.java similarity index 98% rename from hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/ClassRegistryTest.java rename to hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/ClassRegistryTest.java index 403a7114..80fa3a53 100644 --- a/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/ClassRegistryTest.java +++ b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/ClassRegistryTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.support; +package cn.hippo4j.rpc.support; import org.junit.Assert; import org.junit.Test; diff --git a/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/DefaultInstanceTest.java b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/DefaultInstanceTest.java similarity index 92% rename from hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/DefaultInstanceTest.java rename to hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/DefaultInstanceTest.java index 6d070446..aa6dca00 100644 --- a/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/DefaultInstanceTest.java +++ b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/DefaultInstanceTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.support; +package cn.hippo4j.rpc.support; import lombok.AllArgsConstructor; import lombok.Getter; @@ -38,7 +38,7 @@ public class DefaultInstanceTest { @Test public void testGetInstance() { - String className = "cn.hippo4j.config.rpc.support.DefaultInstanceTest$InstanceModel"; + String className = "cn.hippo4j.rpc.support.DefaultInstanceTest$InstanceModel"; Object instanceInstance = instance.getInstance(className); Assert.assertNotNull(instanceInstance); Assert.assertEquals(className, instanceInstance.getClass().getName()); diff --git a/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/NettyConnectPoolHolderTest.java b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/NettyConnectPoolHolderTest.java similarity index 81% rename from hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/NettyConnectPoolHolderTest.java rename to hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/NettyConnectPoolHolderTest.java index f510ecd5..ce68b9a0 100644 --- a/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/NettyConnectPoolHolderTest.java +++ b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/NettyConnectPoolHolderTest.java @@ -15,8 +15,10 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.support; +package cn.hippo4j.rpc.support; +import cn.hippo4j.rpc.handler.NettyClientPoolHandler; +import cn.hippo4j.rpc.handler.NettyClientTakeHandler; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; @@ -35,7 +37,8 @@ public class NettyConnectPoolHolderTest { @Test public void createPool() { - NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls); + NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler()); + NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, handler); NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port); Assert.assertEquals(pool, connectPool); NettyConnectPoolHolder.clear(); @@ -45,7 +48,8 @@ public class NettyConnectPoolHolderTest { @Test public void testGetPool() { - NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port, timeout, group); + NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler()); + NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port, timeout, group, handler); NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(host, port); Assert.assertEquals(connectPool1, connectPool); NettyConnectPoolHolder.clear(); @@ -55,7 +59,8 @@ public class NettyConnectPoolHolderTest { @Test public void remove() { - NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port, timeout, group); + NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler()); + NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port, timeout, group, handler); NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(host, port); Assert.assertEquals(connectPool1, connectPool); NettyConnectPoolHolder.remove(host, port); diff --git a/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/NettyConnectPoolTest.java b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/NettyConnectPoolTest.java similarity index 78% rename from hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/NettyConnectPoolTest.java rename to hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/NettyConnectPoolTest.java index 622e575d..fd8457a4 100644 --- a/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/NettyConnectPoolTest.java +++ b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/NettyConnectPoolTest.java @@ -15,11 +15,14 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.support; +package cn.hippo4j.rpc.support; -import cn.hippo4j.config.rpc.server.NettyServerConnection; -import cn.hippo4j.config.rpc.server.RPCServer; -import cn.hippo4j.config.rpc.server.ServerConnection; +import cn.hippo4j.rpc.handler.NettyClientPoolHandler; +import cn.hippo4j.rpc.handler.NettyClientTakeHandler; +import cn.hippo4j.rpc.handler.NettyServerTakeHandler; +import cn.hippo4j.rpc.server.NettyServerConnection; +import cn.hippo4j.rpc.server.RPCServer; +import cn.hippo4j.rpc.server.ServerConnection; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; @@ -45,7 +48,8 @@ public class NettyConnectPoolTest { public void acquire() throws IOException { // The mode connection was denied when the server was started on the specified port Instance instance = new DefaultInstance(); - ServerConnection connection = new NettyServerConnection(instance); + NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); + ServerConnection connection = new NettyServerConnection(handler); RPCServer rpcServer = new RPCServer(port, connection); CompletableFuture.runAsync(rpcServer::bind); // Given the delay in starting the server, wait here @@ -54,8 +58,8 @@ public class NettyConnectPoolTest { } catch (InterruptedException e) { throw new RuntimeException(e); } - - NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls); + NettyClientPoolHandler poolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler()); + NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, poolHandler); Channel acquire = pool.acquire(timeout); Assert.assertNotNull(acquire); pool.release(acquire); @@ -66,7 +70,8 @@ public class NettyConnectPoolTest { public void testAcquire() throws IOException { // The mode connection was denied when the server was started on the specified port Instance instance = new DefaultInstance(); - ServerConnection connection = new NettyServerConnection(instance); + NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); + ServerConnection connection = new NettyServerConnection(handler); RPCServer rpcServer = new RPCServer(port, connection); CompletableFuture.runAsync(rpcServer::bind); // Given the delay in starting the server, wait here @@ -75,8 +80,8 @@ public class NettyConnectPoolTest { } catch (InterruptedException e) { throw new RuntimeException(e); } - - NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls); + NettyClientPoolHandler poolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler()); + NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, poolHandler); Future acquire = pool.acquire(); Assert.assertNotNull(acquire); rpcServer.close(); @@ -86,7 +91,8 @@ public class NettyConnectPoolTest { public void close() throws IOException { // The mode connection was denied when the server was started on the specified port Instance instance = new DefaultInstance(); - ServerConnection connection = new NettyServerConnection(instance); + NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); + ServerConnection connection = new NettyServerConnection(handler); RPCServer rpcServer = new RPCServer(port, connection); CompletableFuture.runAsync(rpcServer::bind); // Given the delay in starting the server, wait here @@ -96,7 +102,8 @@ public class NettyConnectPoolTest { throw new RuntimeException(e); } - NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls); + NettyClientPoolHandler poolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler()); + NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, poolHandler); Channel acquire = pool.acquire(timeout); Assert.assertNotNull(acquire); pool.release(acquire); diff --git a/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/NettyProxyCenterTest.java b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/NettyProxyCenterTest.java similarity index 60% rename from hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/NettyProxyCenterTest.java rename to hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/NettyProxyCenterTest.java index 9ae4ac50..41d2de53 100644 --- a/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/NettyProxyCenterTest.java +++ b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/NettyProxyCenterTest.java @@ -15,9 +15,12 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.support; +package cn.hippo4j.rpc.support; import cn.hippo4j.common.web.exception.IllegalException; +import cn.hippo4j.rpc.exception.ConnectionException; +import cn.hippo4j.rpc.handler.NettyClientPoolHandler; +import cn.hippo4j.rpc.handler.NettyClientTakeHandler; import org.junit.Assert; import org.junit.Test; @@ -25,17 +28,29 @@ public class NettyProxyCenterTest { @Test public void getProxy() { - ProxyInterface localhost = NettyProxyCenter.getProxy(ProxyInterface.class, "localhost", 8888); + NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler()); + ProxyInterface localhost = NettyProxyCenter.getProxy(ProxyInterface.class, "localhost", 8888, handler); Assert.assertNotNull(localhost); } @Test(expected = IllegalException.class) public void getProxyTest() { - ProxyClass localhost = NettyProxyCenter.getProxy(ProxyClass.class, "localhost", 8888); + NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler()); + ProxyClass localhost = NettyProxyCenter.getProxy(ProxyClass.class, "localhost", 8888, handler); Assert.assertNotNull(localhost); } + + @Test(expected = ConnectionException.class) + public void getProxyTestCall() { + NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler()); + ProxyInterface localhost = NettyProxyCenter.getProxy(ProxyInterface.class, "localhost", 8888, handler); + localhost.hello(); + Assert.assertNotNull(localhost); + } + interface ProxyInterface { + void hello(); } static class ProxyClass { diff --git a/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/ResultHolderTest.java b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/ResultHolderTest.java similarity index 63% rename from hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/ResultHolderTest.java rename to hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/ResultHolderTest.java index 3defaf3b..4302a335 100644 --- a/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/ResultHolderTest.java +++ b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/ResultHolderTest.java @@ -15,12 +15,17 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.support; +package cn.hippo4j.rpc.support; import cn.hippo4j.common.toolkit.IdUtil; import org.junit.Assert; import org.junit.Test; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.LockSupport; + public class ResultHolderTest { @Test @@ -39,4 +44,21 @@ public class ResultHolderTest { Assert.assertEquals(r1, o1); Assert.assertEquals(r2, o2); } + + @Test + public void testThread() throws InterruptedException { + AtomicInteger a = new AtomicInteger(); + String s1 = IdUtil.simpleUUID(); + String o1 = s1 + "1"; + CompletableFuture.runAsync(() -> { + ResultHolder.putThread(o1, Thread.currentThread()); + LockSupport.park(); + a.set(1); + }); + Assert.assertEquals(0, a.get()); + TimeUnit.SECONDS.sleep(1); + ResultHolder.wake(o1); + TimeUnit.SECONDS.sleep(1); + Assert.assertEquals(1, a.get()); + } } \ No newline at end of file diff --git a/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/client/RPCClientTest.java b/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/client/RPCClientTest.java deleted file mode 100644 index 54c2633e..00000000 --- a/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/client/RPCClientTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.hippo4j.config.rpc.client; - -import cn.hippo4j.config.rpc.request.DefaultRequest; -import cn.hippo4j.config.rpc.request.Request; -import cn.hippo4j.config.rpc.response.Response; -import cn.hippo4j.config.rpc.server.NettyServerConnection; -import cn.hippo4j.config.rpc.server.RPCServer; -import cn.hippo4j.config.rpc.server.ServerConnection; -import cn.hippo4j.config.rpc.support.DefaultInstance; -import cn.hippo4j.config.rpc.support.Instance; -import cn.hippo4j.config.rpc.support.ClassRegistry; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; - -public class RPCClientTest { - - String host = "localhost"; - int port = 8888; - - @Test - public void connection() throws IOException { - - Class cls = CallManager.class; - String className = cls.getName(); - ClassRegistry.put(className, cls); - // The mode connection was denied when the server was started on the specified port - Instance instance = new DefaultInstance(); - ServerConnection connection = new NettyServerConnection(instance); - RPCServer rpcServer = new RPCServer(port, connection); - CompletableFuture.runAsync(rpcServer::bind); - try { - TimeUnit.SECONDS.sleep(3); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - NettyClientConnection clientConnection = new NettyClientConnection(host, port); - RPCClient rpcClient = new RPCClient(clientConnection); - Request request = new DefaultRequest("127.0.0.18888", className, "call", null, null); - for (int i = 0; i < 100; i++) { - Response response = rpcClient.connection(request); - Assert.assertEquals(response.getObj(), 1); - } - rpcClient.close(); - rpcServer.close(); - } - -} \ No newline at end of file