diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/client/Client.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/Client.java similarity index 82% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/client/Client.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/Client.java index e2c4a64e..81957ce5 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/client/Client.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/Client.java @@ -15,10 +15,10 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.client; +package cn.hippo4j.rpc.client; -import cn.hippo4j.config.rpc.request.Request; -import cn.hippo4j.config.rpc.response.Response; +import cn.hippo4j.rpc.request.Request; +import cn.hippo4j.rpc.response.Response; import java.io.Closeable; @@ -31,4 +31,12 @@ public interface Client extends Closeable { * Start the client and try to send and receive data */ Response connection(Request request); + + /** + * Check whether the client is active + * + * @return Whether active + */ + boolean isActive(); + } diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/client/ClientConnection.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/ClientConnection.java similarity index 86% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/client/ClientConnection.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/ClientConnection.java index f8b22c62..8d04a213 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/client/ClientConnection.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/ClientConnection.java @@ -15,11 +15,11 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.client; +package cn.hippo4j.rpc.client; -import cn.hippo4j.config.rpc.handler.Connection; -import cn.hippo4j.config.rpc.request.Request; -import cn.hippo4j.config.rpc.response.Response; +import cn.hippo4j.rpc.handler.Connection; +import cn.hippo4j.rpc.request.Request; +import cn.hippo4j.rpc.response.Response; /** * Applicable to client connections diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/client/NettyClientConnection.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/NettyClientConnection.java similarity index 71% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/client/NettyClientConnection.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/NettyClientConnection.java index 0b02c48c..5e97ded5 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/client/NettyClientConnection.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/NettyClientConnection.java @@ -15,17 +15,18 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.client; +package cn.hippo4j.rpc.client; -import cn.hippo4j.config.rpc.exception.TimeOutException; -import cn.hippo4j.config.rpc.process.ActivePostProcess; -import cn.hippo4j.config.rpc.request.Request; -import cn.hippo4j.config.rpc.response.Response; -import cn.hippo4j.config.rpc.support.NettyConnectPool; -import cn.hippo4j.config.rpc.support.NettyConnectPoolHolder; -import cn.hippo4j.config.rpc.support.ResultHolder; import cn.hippo4j.common.toolkit.Assert; import cn.hippo4j.common.web.exception.IllegalException; +import cn.hippo4j.rpc.exception.TimeOutException; +import cn.hippo4j.rpc.process.ActivePostProcess; +import cn.hippo4j.rpc.process.ActiveProcessChain; +import cn.hippo4j.rpc.request.Request; +import cn.hippo4j.rpc.response.Response; +import cn.hippo4j.rpc.support.NettyConnectPool; +import cn.hippo4j.rpc.support.NettyConnectPoolHolder; +import cn.hippo4j.rpc.support.ResultHolder; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; @@ -46,18 +47,18 @@ public class NettyClientConnection implements ClientConnection { Integer port; // Obtain the connection timeout period. The default value is 30s long timeout = 30000L; - Channel channel; EventLoopGroup worker = new NioEventLoopGroup(); - List activeProcesses; - ChannelFuture future; + ActiveProcessChain activeProcessChain; NettyConnectPool connectionPool; + ChannelFuture future; + Channel channel; public NettyClientConnection(String host, int port, List activeProcesses) { Assert.notNull(worker); this.host = host; this.port = port; - this.activeProcesses = activeProcesses; + this.activeProcessChain = new ActiveProcessChain(activeProcesses); this.connectionPool = NettyConnectPoolHolder.getPool(host, port, timeout, worker); } @@ -67,26 +68,28 @@ public class NettyClientConnection implements ClientConnection { @Override public Response connect(Request request) { - preHandlers(request); + activeProcessChain.applyPreHandle(request); this.channel = connectionPool.acquire(timeout); + Response response = null; try { String key = request.getKey(); this.future = channel.writeAndFlush(request); log.info("Call successful, target address is {}:{}, request key is {}", host, port, key); // Wait for execution to complete - ResultHolder.put(key, Thread.currentThread()); + ResultHolder.putThread(key, Thread.currentThread()); LockSupport.parkNanos(timeout() * 1000000); - Response response = ResultHolder.get(key); + response = ResultHolder.get(key); if (response == null) { throw new TimeOutException("Timeout waiting for server-side response"); } - postHandlers(request, response); + activeProcessChain.applyPostHandle(request, response); log.info("The response from {}:{} was received successfully with the response key {}.", host, port, key); return response; } catch (Exception ex) { - afterCompletions(request, null, ex); + activeProcessChain.afterCompletion(request, response, ex); throw new IllegalException(ex); } finally { + activeProcessChain.afterCompletion(request, response, null); connectionPool.release(this.channel); } } @@ -111,22 +114,8 @@ public class NettyClientConnection implements ClientConnection { this.channel.close(); } - private void preHandlers(Request request) { - for (ActivePostProcess process : activeProcesses) { - process.preHandler(request); - } - } - - private void postHandlers(Request request, Response response) { - for (ActivePostProcess process : activeProcesses) { - process.postHandler(request, response); - } - } - - private void afterCompletions(Request request, Response response, Exception e) { - for (ActivePostProcess process : activeProcesses) { - process.afterCompletion(request, response, e); - } + @Override + public boolean isActive() { + return channel.isActive(); } - } diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/client/RPCClient.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/RPCClient.java similarity index 87% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/client/RPCClient.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/RPCClient.java index 8b8451c8..e3094076 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/client/RPCClient.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/RPCClient.java @@ -15,10 +15,10 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.client; +package cn.hippo4j.rpc.client; -import cn.hippo4j.config.rpc.request.Request; -import cn.hippo4j.config.rpc.response.Response; +import cn.hippo4j.rpc.request.Request; +import cn.hippo4j.rpc.response.Response; import java.io.IOException; @@ -38,6 +38,11 @@ public class RPCClient implements Client { return clientConnection.connect(request); } + @Override + public boolean isActive() { + return clientConnection.isActive(); + } + /** * Close the client and release all connections. * diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/coder/CompactObjectOutputStream.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/coder/CompactObjectOutputStream.java similarity index 97% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/coder/CompactObjectOutputStream.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/coder/CompactObjectOutputStream.java index a8592ef0..599cf118 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/coder/CompactObjectOutputStream.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/coder/CompactObjectOutputStream.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.coder; +package cn.hippo4j.rpc.coder; import java.io.IOException; import java.io.ObjectOutputStream; diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/coder/NettyDecoder.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/coder/NettyDecoder.java similarity index 95% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/coder/NettyDecoder.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/coder/NettyDecoder.java index 334fa4c5..42223e44 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/coder/NettyDecoder.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/coder/NettyDecoder.java @@ -15,9 +15,9 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.coder; +package cn.hippo4j.rpc.coder; -import cn.hippo4j.config.rpc.exception.CoderException; +import cn.hippo4j.rpc.exception.CoderException; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.serialization.ClassResolver; diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/coder/NettyEncoder.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/coder/NettyEncoder.java similarity index 95% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/coder/NettyEncoder.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/coder/NettyEncoder.java index 48ebe628..856aae2a 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/coder/NettyEncoder.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/coder/NettyEncoder.java @@ -15,9 +15,9 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.coder; +package cn.hippo4j.rpc.coder; -import cn.hippo4j.config.rpc.exception.CoderException; +import cn.hippo4j.rpc.exception.CoderException; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufOutputStream; import io.netty.channel.ChannelHandlerContext; diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/discovery/DiscoveryAdapter.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/discovery/DiscoveryAdapter.java similarity index 96% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/discovery/DiscoveryAdapter.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/discovery/DiscoveryAdapter.java index d91fe15f..3a788f30 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/discovery/DiscoveryAdapter.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/discovery/DiscoveryAdapter.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.discovery; +package cn.hippo4j.rpc.discovery; import java.net.InetSocketAddress; diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/exception/CoderException.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/exception/CoderException.java similarity index 97% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/exception/CoderException.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/exception/CoderException.java index aa0e107a..5ac64a5a 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/exception/CoderException.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/exception/CoderException.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.exception; +package cn.hippo4j.rpc.exception; /** * During decoding and encoding, if an exception occurs, an exception of type {@link CoderException} is thrown, diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/exception/ConnectionException.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/exception/ConnectionException.java similarity index 97% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/exception/ConnectionException.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/exception/ConnectionException.java index e43a0465..87e29423 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/exception/ConnectionException.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/exception/ConnectionException.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.exception; +package cn.hippo4j.rpc.exception; /** * If an exception occurs during the connection between the server and the client, an exception of type diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/exception/TimeOutException.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/exception/TimeOutException.java similarity index 97% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/exception/TimeOutException.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/exception/TimeOutException.java index 50b902d9..ef808b57 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/exception/TimeOutException.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/exception/TimeOutException.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.exception; +package cn.hippo4j.rpc.exception; /** * If there is a timeout between the server and the client, you will get a {@link TimeOutException}, diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/handler/NettyClientTakeHandler.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/AbstractNettyTakeHandler.java similarity index 66% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/handler/NettyClientTakeHandler.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/AbstractNettyTakeHandler.java index 9e459a4b..08fac1db 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/handler/NettyClientTakeHandler.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/AbstractNettyTakeHandler.java @@ -15,33 +15,26 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.handler; +package cn.hippo4j.rpc.handler; -import cn.hippo4j.config.rpc.exception.ConnectionException; -import cn.hippo4j.config.rpc.support.ResultHolder; -import cn.hippo4j.config.rpc.response.Response; -import cn.hippo4j.common.web.exception.IllegalException; +import cn.hippo4j.rpc.exception.ConnectionException; +import cn.hippo4j.rpc.response.Response; +import cn.hippo4j.rpc.support.ResultHolder; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** - * Interconnect with the netty mediation layer + * the abstract base of {@link ConnectHandler} and {@link ChannelInboundHandlerAdapter} */ -public class NettyClientTakeHandler extends ChannelInboundHandlerAdapter implements ConnectHandler { - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) { - try { - Response response = (Response) msg; - handler(response); - ctx.flush(); - } catch (Exception e) { - ctx.close(); - throw new IllegalException(e); - } - } +public abstract class AbstractNettyTakeHandler extends ChannelInboundHandlerAdapter implements ConnectHandler { + /** + * Manual disconnection is used here in case the server and client are disconnected due to a sudden exception + * + * @param ctx the context + * @param cause the throwable + */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); @@ -53,9 +46,15 @@ public class NettyClientTakeHandler extends ChannelInboundHandlerAdapter impleme } } + /** + * This is a generic process that puts in the result and wakes up the thread + * + * @param response response + */ @Override public void handler(Response response) { ResultHolder.put(response.getKey(), response); ResultHolder.wake(response.getKey()); } + } diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/handler/ConnectHandler.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/ConnectHandler.java similarity index 80% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/handler/ConnectHandler.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/ConnectHandler.java index 4170e5b5..21e36fb9 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/handler/ConnectHandler.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/ConnectHandler.java @@ -15,15 +15,16 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.handler; +package cn.hippo4j.rpc.handler; -import cn.hippo4j.config.rpc.request.Request; -import cn.hippo4j.config.rpc.response.Response; +import cn.hippo4j.rpc.request.Request; +import cn.hippo4j.rpc.response.Response; /** * The handler in each connection, where the specific behavior of the connection * must be specified, such as serialization and parsing, requesting and receiving - * requests, and so on + * requests, and so on
+ * */ public interface ConnectHandler { @@ -32,12 +33,13 @@ public interface ConnectHandler { * * @param request request */ - default Response handler(Request request) { + default Response sendHandler(Request request) { return null; } /** - * Processing after receiving Response + * Processing after receiving Response
+ * This is mainly for subsequent processing of the results * * @param response response */ diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/handler/Connection.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/Connection.java similarity index 58% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/handler/Connection.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/Connection.java index b3b69d57..77ce2211 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/handler/Connection.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/Connection.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.handler; +package cn.hippo4j.rpc.handler; import java.io.Closeable; @@ -24,4 +24,18 @@ import java.io.Closeable; */ public interface Connection extends Closeable { + /** + * Gets the state of the connection, which is interpreted differently by different programs
+ *

+ * Client: Active connection indicates that a connection is being maintained with the server. + * Inactive connection indicates that no connection is being established with the server
+ *

+ * Server: The active connection indicates that the server has been started, is receiving ports, + * and can obtain requests at any time. The inactive connection indicates that the server has been + * shut down and the ports have been released + * + * @return Whether active + */ + boolean isActive(); + } diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/handler/NettyClientPoolHandler.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyClientPoolHandler.java similarity index 93% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/handler/NettyClientPoolHandler.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyClientPoolHandler.java index bb124e81..662d7fca 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/handler/NettyClientPoolHandler.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyClientPoolHandler.java @@ -15,10 +15,10 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.handler; +package cn.hippo4j.rpc.handler; -import cn.hippo4j.config.rpc.coder.NettyDecoder; -import cn.hippo4j.config.rpc.coder.NettyEncoder; +import cn.hippo4j.rpc.coder.NettyDecoder; +import cn.hippo4j.rpc.coder.NettyEncoder; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.pool.ChannelPoolHandler; diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyClientTakeHandler.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyClientTakeHandler.java new file mode 100644 index 00000000..eabb951a --- /dev/null +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyClientTakeHandler.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.rpc.handler; + +import cn.hippo4j.common.web.exception.IllegalException; +import cn.hippo4j.rpc.response.Response; +import io.netty.channel.ChannelHandlerContext; + +/** + * Interconnect with the netty mediation layer + */ +public class NettyClientTakeHandler extends AbstractNettyTakeHandler implements ConnectHandler { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + try { + Response response = (Response) msg; + handler(response); + ctx.flush(); + } catch (Exception e) { + ctx.close(); + throw new IllegalException(e); + } + } + +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/handler/NettyServerTakeHandler.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyServerTakeHandler.java similarity index 50% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/handler/NettyServerTakeHandler.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyServerTakeHandler.java index cac4ad6e..61eeb070 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/handler/NettyServerTakeHandler.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyServerTakeHandler.java @@ -15,20 +15,18 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.handler; +package cn.hippo4j.rpc.handler; -import cn.hippo4j.config.rpc.exception.ConnectionException; -import cn.hippo4j.config.rpc.process.ActivePostProcess; -import cn.hippo4j.config.rpc.response.DefaultResponse; -import cn.hippo4j.config.rpc.support.ClassRegistry; -import cn.hippo4j.config.rpc.support.Instance; -import cn.hippo4j.config.rpc.request.Request; -import cn.hippo4j.config.rpc.response.Response; import cn.hippo4j.common.toolkit.Assert; import cn.hippo4j.common.toolkit.ReflectUtil; -import io.netty.channel.Channel; +import cn.hippo4j.rpc.process.ActivePostProcess; +import cn.hippo4j.rpc.process.ActiveProcessChain; +import cn.hippo4j.rpc.request.Request; +import cn.hippo4j.rpc.response.DefaultResponse; +import cn.hippo4j.rpc.response.Response; +import cn.hippo4j.rpc.support.ClassRegistry; +import cn.hippo4j.rpc.support.Instance; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; import java.lang.reflect.Method; import java.util.LinkedList; @@ -37,13 +35,13 @@ import java.util.List; /** * netty adaptation layer */ -public class NettyServerTakeHandler extends ChannelInboundHandlerAdapter implements ConnectHandler { +public class NettyServerTakeHandler extends AbstractNettyTakeHandler implements ConnectHandler { - List processes; + ActiveProcessChain activeProcessChain; Instance instance; public NettyServerTakeHandler(List processes, Instance instance) { - this.processes = processes; + this.activeProcessChain = new ActiveProcessChain(processes); this.instance = instance; } @@ -57,58 +55,30 @@ public class NettyServerTakeHandler extends ChannelInboundHandlerAdapter impleme return; } Request request = (Request) msg; - Response response = handler(request); + Response response = sendHandler(request); ctx.writeAndFlush(response); } @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - Channel channel = ctx.channel(); - if (channel.isActive()) { - ctx.close(); - } else { - throw new ConnectionException(cause); - } - } - - @Override - public Response handler(Request request) { - if (!preHandlers(request)) { + public Response sendHandler(Request request) { + if (!activeProcessChain.applyPreHandle(request)) { return null; } + Response response = null; try { Class cls = ClassRegistry.get(request.getClassName()); Method method = ReflectUtil.getMethodByName(cls, request.getMethodName(), request.getParameterTypes()); Assert.notNull(method); Object invoke = ReflectUtil.invoke(instance.getInstance(cls), method, request.getParameters()); - Response response = new DefaultResponse(request.getKey(), invoke.getClass(), invoke); - postHandlers(request, response); + response = new DefaultResponse(request.getKey(), invoke.getClass(), invoke); + activeProcessChain.applyPostHandle(request, response); return response; } catch (Exception e) { - Response response = new DefaultResponse(request.getKey(), e, e.getMessage()); - afterCompletions(request, response, e); + response = new DefaultResponse(request.getKey(), e, e.getMessage()); + activeProcessChain.afterCompletion(request, response, e); return response; - } - } - - private boolean preHandlers(Request request) { - for (ActivePostProcess process : processes) { - if (!process.preHandler(request)) { - return false; - } - } - return true; - } - - private void postHandlers(Request request, Response response) { - for (ActivePostProcess process : processes) { - process.postHandler(request, response); - } - } - - private void afterCompletions(Request request, Response response, Exception e) { - for (ActivePostProcess process : processes) { - process.afterCompletion(request, response, e); + } finally { + activeProcessChain.afterCompletion(request, response, null); } } diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/process/ActivePostProcess.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/process/ActivePostProcess.java similarity index 93% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/process/ActivePostProcess.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/process/ActivePostProcess.java index 98df5998..816335f6 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/process/ActivePostProcess.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/process/ActivePostProcess.java @@ -15,10 +15,10 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.process; +package cn.hippo4j.rpc.process; -import cn.hippo4j.config.rpc.request.Request; -import cn.hippo4j.config.rpc.response.Response; +import cn.hippo4j.rpc.request.Request; +import cn.hippo4j.rpc.response.Response; /** * Callback while the connection is in progress diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/process/ActiveProcessChain.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/process/ActiveProcessChain.java new file mode 100644 index 00000000..693f83ce --- /dev/null +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/process/ActiveProcessChain.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.rpc.process; + +import cn.hippo4j.rpc.request.Request; +import cn.hippo4j.rpc.response.Response; +import lombok.extern.slf4j.Slf4j; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * Processor chain for easier processing of processors in different scenarios
+ * reference resources: spring HandlerExecutionChain + * + * @see ActivePostProcess + */ +@Slf4j +public final class ActiveProcessChain { + + /** + * A collection of processors that will be applied to their assigned programs. + * Processors will perform different actions on different occasions for both the server and the client, + * but the execution period of that action must be the same + */ + List processes; + + /** + * index
+ * that identifies where the {@link ActivePostProcess#preHandler(Request)} processing is performed
+ * This allows for the fact that some processors will add shutable operations to the class
+ * eg: {@link java.io.Closeable}, The {@link ActivePostProcess#afterCompletion(Request, Response, Exception)} + * operation is not performed after an exception if the preprocessor is not executed + */ + int index = -1; + + public ActiveProcessChain(List processes) { + this.processes = processes; + } + + public ActiveProcessChain(ActivePostProcess... processes) { + this((processes != null ? Arrays.asList(processes) : Collections.emptyList())); + } + + /** + * Apply postHandle methods of registered processes. + */ + public boolean applyPreHandle(Request request) { + for (int i = 0; i < this.processes.size(); i++) { + ActivePostProcess handle = processes.get(i); + if (!handle.preHandler(request)) { + afterCompletion(request, null, null); + return false; + } + this.index = i; + } + return true; + } + + /** + * Apply postHandle methods of registered processes. + */ + public void applyPostHandle(Request request, Response response) { + for (int i = processes.size() - 1; i >= 0; i--) { + ActivePostProcess handle = processes.get(i); + handle.postHandler(request, response); + } + } + + /** + * Trigger afterCompletion callbacks on the mapped ActivePostProcess. + * Will just invoke afterCompletion for all interceptors whose preHandle invocation + * has successfully completed and returned true. + */ + public void afterCompletion(Request request, Response response, Exception ex) { + for (int i = this.index; i >= 0; i--) { + ActivePostProcess handle = processes.get(i); + try { + handle.afterCompletion(request, response, ex); + } catch (Throwable e) { + log.error("HandlerInterceptor.afterCompletion threw exception", e); + } + } + } + +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/request/DefaultRequest.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/request/DefaultRequest.java similarity index 96% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/request/DefaultRequest.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/request/DefaultRequest.java index 6b82a102..a1d346fa 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/request/DefaultRequest.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/request/DefaultRequest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.request; +package cn.hippo4j.rpc.request; import java.io.IOException; import java.io.ObjectInputStream; @@ -94,7 +94,7 @@ public final class DefaultRequest implements Request { if (parameters == null) { return; } - // 序列化属性 parameters + // Serialization parameters for (Object parameter : parameters) { s.writeObject(parameter); } @@ -110,7 +110,7 @@ public final class DefaultRequest implements Request { if (parameterTypes == null) { return; } - // 反序列化属性 parameters + // Deserialization parameters int length = parameterTypes.length; Object[] a = new Object[length]; for (int i = 0; i < length; i++) { diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/request/Request.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/request/Request.java similarity index 97% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/request/Request.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/request/Request.java index a045fbbf..db68fe3d 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/request/Request.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/request/Request.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.request; +package cn.hippo4j.rpc.request; import java.io.Serializable; diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/response/DefaultResponse.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/response/DefaultResponse.java similarity index 97% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/response/DefaultResponse.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/response/DefaultResponse.java index 408d299f..e2d38c4d 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/response/DefaultResponse.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/response/DefaultResponse.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.response; +package cn.hippo4j.rpc.response; import java.io.IOException; import java.io.ObjectInputStream; @@ -105,7 +105,7 @@ public class DefaultResponse implements Response { if (obj == null) { return; } - // 序列化属性 obj + // Serialization obj s.writeObject(this.obj); } @@ -116,7 +116,7 @@ public class DefaultResponse implements Response { */ private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException { s.defaultReadObject(); - // 反序列化属性 obj + // Deserialization obj this.obj = s.readObject(); } } diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/response/Response.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/response/Response.java similarity index 97% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/response/Response.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/response/Response.java index 3c06fbaa..cdb26e5b 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/response/Response.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/response/Response.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.response; +package cn.hippo4j.rpc.response; import java.io.Serializable; diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/server/NettyServerConnection.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/NettyServerConnection.java similarity index 90% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/server/NettyServerConnection.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/NettyServerConnection.java index cb240442..c9684761 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/server/NettyServerConnection.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/NettyServerConnection.java @@ -15,14 +15,14 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.server; +package cn.hippo4j.rpc.server; -import cn.hippo4j.config.rpc.coder.NettyDecoder; -import cn.hippo4j.config.rpc.coder.NettyEncoder; -import cn.hippo4j.config.rpc.handler.NettyServerTakeHandler; -import cn.hippo4j.config.rpc.process.ActivePostProcess; -import cn.hippo4j.config.rpc.support.Instance; import cn.hippo4j.common.toolkit.Assert; +import cn.hippo4j.rpc.coder.NettyDecoder; +import cn.hippo4j.rpc.coder.NettyEncoder; +import cn.hippo4j.rpc.handler.NettyServerTakeHandler; +import cn.hippo4j.rpc.process.ActivePostProcess; +import cn.hippo4j.rpc.support.Instance; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; @@ -47,6 +47,7 @@ public class NettyServerConnection implements ServerConnection { List processes; Instance instance; ChannelFuture future; + Channel channel; public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, List processes, Instance instance) { Assert.notNull(processes); @@ -88,6 +89,7 @@ public class NettyServerConnection implements ServerConnection { }); try { this.future = server.bind(port); + this.channel = this.future.channel(); log.info("The server is started and can receive requests. The listening port is {}", port); this.port = port; this.future.channel().closeFuture().sync(); @@ -106,4 +108,9 @@ public class NettyServerConnection implements ServerConnection { this.future.channel().close(); log.info("The server is shut down and no more requests are received. The release port is {}", port); } + + @Override + public boolean isActive() { + return channel.isActive(); + } } diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/server/RPCServer.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/RPCServer.java similarity index 91% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/server/RPCServer.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/RPCServer.java index b961aaf6..abb5f6ff 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/server/RPCServer.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/RPCServer.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.server; +package cn.hippo4j.rpc.server; import java.io.IOException; @@ -37,6 +37,11 @@ public class RPCServer implements Server { serverConnection.bind(port); } + @Override + public boolean isActive() { + return serverConnection.isActive(); + } + /** * Shut down the server and release the port */ diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/server/Server.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/Server.java similarity index 88% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/server/Server.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/Server.java index 69056286..9c21f788 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/server/Server.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/Server.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.server; +package cn.hippo4j.rpc.server; import java.io.Closeable; @@ -30,4 +30,11 @@ public interface Server extends Closeable { */ void bind(); + /** + * Check whether the server is active + * + * @return Whether active + */ + boolean isActive(); + } diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/server/ServerConnection.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/ServerConnection.java similarity index 91% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/server/ServerConnection.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/ServerConnection.java index 1e1d8a4b..fcb5a9e1 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/server/ServerConnection.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/ServerConnection.java @@ -15,9 +15,9 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.server; +package cn.hippo4j.rpc.server; -import cn.hippo4j.config.rpc.handler.Connection; +import cn.hippo4j.rpc.handler.Connection; /** * This applies to server-side connections diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/ClassRegistry.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/ClassRegistry.java similarity index 98% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/ClassRegistry.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/ClassRegistry.java index 4b938550..ebcc86f6 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/ClassRegistry.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/ClassRegistry.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.support; +package cn.hippo4j.rpc.support; import lombok.AccessLevel; import lombok.NoArgsConstructor; diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/DefaultInstance.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/DefaultInstance.java similarity index 97% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/DefaultInstance.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/DefaultInstance.java index 1c5ec559..c6cf9a6c 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/DefaultInstance.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/DefaultInstance.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.support; +package cn.hippo4j.rpc.support; import cn.hippo4j.common.toolkit.ReflectUtil; import cn.hippo4j.common.web.exception.IllegalException; diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/Instance.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/Instance.java similarity index 97% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/Instance.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/Instance.java index e1b7f33a..840dff3a 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/Instance.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/Instance.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.support; +package cn.hippo4j.rpc.support; /** * Instance interface to get an instance diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/NettyConnectPool.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPool.java similarity index 95% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/NettyConnectPool.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPool.java index 4268ba4c..02438a9e 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/NettyConnectPool.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPool.java @@ -15,10 +15,10 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.support; +package cn.hippo4j.rpc.support; -import cn.hippo4j.config.rpc.exception.ConnectionException; -import cn.hippo4j.config.rpc.handler.NettyClientPoolHandler; +import cn.hippo4j.rpc.exception.ConnectionException; +import cn.hippo4j.rpc.handler.NettyClientPoolHandler; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/NettyConnectPoolHolder.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPoolHolder.java similarity index 94% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/NettyConnectPoolHolder.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPoolHolder.java index 37738798..4a6dffa1 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/NettyConnectPoolHolder.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPoolHolder.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.support; +package cn.hippo4j.rpc.support; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; @@ -84,8 +84,9 @@ public class NettyConnectPoolHolder { */ public static synchronized NettyConnectPool getPool(String host, int port, long timeout, EventLoopGroup worker) { - // This cannot use the computeIfAbsent method directly here because put is already used in init. - // Details refer to https://bugs.openjdk.java.net/browse/JDK-8062841 + /* + * this cannot use the computeIfAbsent method directly here because put is already used in init. Details refer to https://bugs.openjdk.java.net/browse/JDK-8062841 + */ NettyConnectPool pool = getPool(host, port); return pool == null ? initPool(host, port, timeout, worker) : pool; } diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/NettyProxyCenter.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyProxyCenter.java similarity index 92% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/NettyProxyCenter.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyProxyCenter.java index 6ba1dcf6..547dbebe 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/NettyProxyCenter.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyProxyCenter.java @@ -15,14 +15,14 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.support; +package cn.hippo4j.rpc.support; -import cn.hippo4j.config.rpc.request.DefaultRequest; -import cn.hippo4j.config.rpc.client.NettyClientConnection; -import cn.hippo4j.config.rpc.request.Request; -import cn.hippo4j.config.rpc.response.Response; import cn.hippo4j.common.toolkit.IdUtil; import cn.hippo4j.common.web.exception.IllegalException; +import cn.hippo4j.rpc.client.NettyClientConnection; +import cn.hippo4j.rpc.request.DefaultRequest; +import cn.hippo4j.rpc.request.Request; +import cn.hippo4j.rpc.response.Response; import lombok.AccessLevel; import lombok.NoArgsConstructor; diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/ResultHolder.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/ResultHolder.java similarity index 96% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/ResultHolder.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/ResultHolder.java index 2220fe8e..141368c3 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/ResultHolder.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/ResultHolder.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.support; +package cn.hippo4j.rpc.support; import lombok.AccessLevel; import lombok.NoArgsConstructor; @@ -56,7 +56,7 @@ public class ResultHolder { * @param key Request and response keys * @param t The Thread */ - public static void put(String key, Thread t) { + public static void putThread(String key, Thread t) { log.debug("Write thread, waiting to wake up"); threadMap.put(key, t); } diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/SpringContextInstance.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/SpringContextInstance.java similarity index 96% rename from hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/SpringContextInstance.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/SpringContextInstance.java index 9bd6ec56..a0d2db7a 100644 --- a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/SpringContextInstance.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/SpringContextInstance.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package cn.hippo4j.config.rpc.support; +package cn.hippo4j.rpc.support; import cn.hippo4j.common.config.ApplicationContextHolder;