diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/NettyClientConnection.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/NettyClientConnection.java index fce2deba..e6337999 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/NettyClientConnection.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/NettyClientConnection.java @@ -22,8 +22,6 @@ import cn.hippo4j.common.web.exception.IllegalException; import cn.hippo4j.rpc.exception.TimeOutException; import cn.hippo4j.rpc.model.Request; import cn.hippo4j.rpc.model.Response; -import cn.hippo4j.rpc.process.ActivePostProcess; -import cn.hippo4j.rpc.process.ActiveProcessChain; import cn.hippo4j.rpc.support.NettyConnectPool; import cn.hippo4j.rpc.support.NettyConnectPoolHolder; import cn.hippo4j.rpc.support.ResultHolder; @@ -35,12 +33,13 @@ import io.netty.channel.pool.ChannelPoolHandler; import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; -import java.util.LinkedList; -import java.util.List; +import java.util.Optional; import java.util.concurrent.locks.LockSupport; /** * Client implemented using netty + * + * @since 1.5.1 */ @Slf4j public class NettyClientConnection implements ClientConnection { @@ -51,33 +50,28 @@ public class NettyClientConnection implements ClientConnection { */ long timeout = 30000L; EventLoopGroup worker = new NioEventLoopGroup(); - ActiveProcessChain activeProcessChain; NettyConnectPool connectionPool; ChannelFuture future; Channel channel; public NettyClientConnection(InetSocketAddress address, - List activeProcesses, ChannelPoolHandler handler) { Assert.notNull(worker); this.address = address; - this.activeProcessChain = new ActiveProcessChain(activeProcesses); this.connectionPool = NettyConnectPoolHolder.getPool(address, timeout, worker, handler); } - public NettyClientConnection(InetSocketAddress address, ChannelPoolHandler handler) { - this(address, new LinkedList<>(), handler); - } - @Override public Response connect(Request request) { - activeProcessChain.applyPreHandle(request); this.channel = connectionPool.acquire(timeout); - Response response = null; + boolean debugEnabled = log.isDebugEnabled(); + Response response; try { String key = request.getKey(); this.future = channel.writeAndFlush(request); - log.info("Call successful, target address is {}:{}, request key is {}", address.getHostName(), address.getPort(), key); + if (debugEnabled) { + log.debug("Call successful, target address is {}:{}, request key is {}", address.getHostName(), address.getPort(), key); + } // Wait for execution to complete ResultHolder.putThread(key, Thread.currentThread()); LockSupport.parkNanos(timeout() * 1000000); @@ -85,14 +79,13 @@ public class NettyClientConnection implements ClientConnection { if (response == null) { throw new TimeOutException("Timeout waiting for server-side response"); } - activeProcessChain.applyPostHandle(request, response); - log.info("The response from {}:{} was received successfully with the response key {}.", address.getHostName(), address.getPort(), key); + if (debugEnabled) { + log.debug("The response from {}:{} was received successfully with the response key {}.", address.getHostName(), address.getPort(), key); + } return response; } catch (Exception ex) { - activeProcessChain.afterCompletion(request, response, ex); throw new IllegalException(ex); } finally { - activeProcessChain.afterCompletion(request, response, null); connectionPool.release(this.channel); } } @@ -108,13 +101,13 @@ public class NettyClientConnection implements ClientConnection { } @Override - public synchronized void close() { - if (this.channel == null) { - return; - } - worker.shutdownGracefully(); - this.future.channel().close(); - this.channel.close(); + public void close() { + Optional.ofNullable(this.channel) + .ifPresent(c -> { + worker.shutdownGracefully(); + this.future.channel().close(); + this.channel.close(); + }); } @Override diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/coder/NettyDecoder.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/coder/NettyDecoder.java deleted file mode 100644 index 42223e44..00000000 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/coder/NettyDecoder.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.hippo4j.rpc.coder; - -import cn.hippo4j.rpc.exception.CoderException; -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.serialization.ClassResolver; -import io.netty.handler.codec.serialization.ObjectDecoder; - -/** - * According to the decoder for java objects implemented by ObjectDecoder, - * it is necessary to ensure that the transmitted objects can be serialized - */ -public class NettyDecoder extends ObjectDecoder { - - public NettyDecoder(ClassResolver classResolver) { - super(classResolver); - } - - @Override - protected Object decode(ChannelHandlerContext ctx, ByteBuf in) { - ByteBuf byteBuf = in.retainedDuplicate(); - try { - Object o = super.decode(ctx, in); - if (o == null) { - return byteBuf; - } else { - return o; - } - } catch (Exception e) { - throw new CoderException("The encoding is abnormal, which may be caused by the failure of the transfer object to be deserialized"); - } - } -} diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/AbstractNettyTakeHandler.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/AbstractNettyTakeHandler.java index 4cb61a28..3bc02728 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/AbstractNettyTakeHandler.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/AbstractNettyTakeHandler.java @@ -24,8 +24,12 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; +import java.util.Optional; + /** * the abstract base of {@link ConnectHandler} and {@link ChannelInboundHandlerAdapter} + * + * @since 1.5.1 */ public abstract class AbstractNettyTakeHandler extends ChannelInboundHandlerAdapter implements ConnectHandler { @@ -42,9 +46,10 @@ public abstract class AbstractNettyTakeHandler extends ChannelInboundHandlerAdap if (channel.isActive()) { ctx.close(); } - if (cause != null) { - throw new ConnectionException(cause); - } + Optional.ofNullable(cause) + .ifPresent(t -> { + throw new ConnectionException(cause); + }); } /** diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/HandlerManager.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/HandlerManager.java index f4ebb4b9..1e6d5f94 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/HandlerManager.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/HandlerManager.java @@ -17,12 +17,16 @@ package cn.hippo4j.rpc.handler; +import cn.hippo4j.common.web.exception.IllegalException; +import io.netty.channel.ChannelHandler; import lombok.AllArgsConstructor; import lombok.Getter; /** * Manage the Handler used in the processing.
* The Handler must be able to exist multiple times and be invoked once in a single execution + * + * @since 1.5.1 */ public interface HandlerManager { @@ -42,20 +46,6 @@ public interface HandlerManager { */ HandlerManager addFirst(String name, T handler); - /** - * Add handler to the end of the Handler chain, without specifying a name - * - * @param handler handler - */ - HandlerManager addLast(T handler); - - /** - * Adds handler to the head of the Handler chain, without specifying a name - * - * @param handler handler - */ - HandlerManager addFirst(T handler); - /** * Whether handler exists * @@ -72,6 +62,14 @@ public interface HandlerManager { * @return HandlerEntity */ default HandlerEntity getHandlerEntity(long order, T handler, String name) { + Class cls = handler.getClass(); + boolean b = cls.isAnnotationPresent(ChannelHandler.Sharable.class) + || HandlerManager.class.isAssignableFrom(cls); + if (!b) { + throw new IllegalException("Join the execution of the handler must add io.netty.channel.ChannelHandler." + + "Sharable annotations, Please for the handler class " + cls.getName() + " add io.netty.channel." + + "ChannelHandler.Sharable annotation"); + } return new HandlerEntity<>(order, handler, name); } diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyServerTakeHandler.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyServerTakeHandler.java index a4eaddc2..e0951c03 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyServerTakeHandler.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyServerTakeHandler.java @@ -19,36 +19,28 @@ package cn.hippo4j.rpc.handler; import cn.hippo4j.common.toolkit.Assert; import cn.hippo4j.common.toolkit.ReflectUtil; -import cn.hippo4j.rpc.process.ActivePostProcess; -import cn.hippo4j.rpc.process.ActiveProcessChain; -import cn.hippo4j.rpc.model.Request; -import cn.hippo4j.rpc.model.DefaultResponse; -import cn.hippo4j.rpc.model.Response; import cn.hippo4j.rpc.discovery.ClassRegistry; import cn.hippo4j.rpc.discovery.Instance; +import cn.hippo4j.rpc.model.DefaultResponse; +import cn.hippo4j.rpc.model.Request; +import cn.hippo4j.rpc.model.Response; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import java.lang.reflect.Method; -import java.util.LinkedList; -import java.util.List; /** * netty adaptation layer + * + * @since 1.5.1 */ @ChannelHandler.Sharable public class NettyServerTakeHandler extends AbstractNettyTakeHandler implements ConnectHandler { - ActiveProcessChain activeProcessChain; Instance instance; - public NettyServerTakeHandler(List processes, Instance instance) { - this.activeProcessChain = new ActiveProcessChain(processes); - this.instance = instance; - } - public NettyServerTakeHandler(Instance instance) { - this(new LinkedList<>(), instance); + this.instance = instance; } @Override @@ -63,24 +55,17 @@ public class NettyServerTakeHandler extends AbstractNettyTakeHandler implements @Override public Response sendHandler(Request request) { - if (!activeProcessChain.applyPreHandle(request)) { - return null; - } - Response response = null; + Response response; try { Class cls = ClassRegistry.get(request.getClassName()); Method method = ReflectUtil.getMethodByName(cls, request.getMethodName(), request.getParameterTypes()); Assert.notNull(method); Object invoke = ReflectUtil.invoke(instance.getInstance(cls), method, request.getParameters()); response = new DefaultResponse(request.getKey(), invoke.getClass(), invoke); - activeProcessChain.applyPostHandle(request, response); return response; } catch (Exception e) { response = new DefaultResponse(request.getKey(), e, e.getMessage()); - activeProcessChain.afterCompletion(request, response, e); return response; - } finally { - activeProcessChain.afterCompletion(request, response, null); } } diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/process/ActivePostProcess.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/process/ActivePostProcess.java deleted file mode 100644 index 9e7fee18..00000000 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/process/ActivePostProcess.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.hippo4j.rpc.process; - -import cn.hippo4j.rpc.model.Request; -import cn.hippo4j.rpc.model.Response; - -/** - * Callback while the connection is in progress - */ -public interface ActivePostProcess { - - /** - * Client: After establishing a connection and before passing parameters
- * Server: Receives parameters and performs pre-call operations
- * - * @param request request - * @return Whether to continue the execution. If it is a client, the returned value does not affect subsequent execution - */ - default boolean preHandler(Request request) { - return true; - } - - /** - * Client: Action after receiving a response
- * Server: performs the operation after the call
- * - * @param request request - * @param response response - */ - default void postHandler(Request request, Response response) { - // NO SOMETHING - } - - /** - * Called when an exception or resource is cleaned - * - * @param request request - * @param response response - * @param e Exception - */ - default void afterCompletion(Request request, Response response, Exception e) { - // NO SOMETHING - } -} diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/process/ActiveProcessChain.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/process/ActiveProcessChain.java deleted file mode 100644 index 4882dc74..00000000 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/process/ActiveProcessChain.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.hippo4j.rpc.process; - -import cn.hippo4j.rpc.model.Request; -import cn.hippo4j.rpc.model.Response; -import lombok.extern.slf4j.Slf4j; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -/** - * Processor chain for easier processing of processors in different scenarios
- * reference resources: spring HandlerExecutionChain - * - * @see ActivePostProcess - */ -@Slf4j -public final class ActiveProcessChain { - - /** - * A collection of processors that will be applied to their assigned programs. - * Processors will perform different actions on different occasions for both the server and the client, - * but the execution period of that action must be the same - */ - List processes; - - /** - * index
- * that identifies where the {@link ActivePostProcess#preHandler(Request)} processing is performed
- * This allows for the fact that some processors will add shutable operations to the class
- * eg: {@link java.io.Closeable}, The {@link ActivePostProcess#afterCompletion(Request, Response, Exception)} - * operation is not performed after an exception if the preprocessor is not executed - */ - int index = -1; - - public ActiveProcessChain(List processes) { - this.processes = processes; - } - - public ActiveProcessChain(ActivePostProcess... processes) { - this((processes != null ? Arrays.asList(processes) : Collections.emptyList())); - } - - /** - * Apply postHandle methods of registered processes. - */ - public boolean applyPreHandle(Request request) { - for (int i = 0; i < this.processes.size(); i++) { - ActivePostProcess handle = processes.get(i); - if (!handle.preHandler(request)) { - afterCompletion(request, null, null); - return false; - } - this.index = i; - } - return true; - } - - /** - * Apply postHandle methods of registered processes. - */ - public void applyPostHandle(Request request, Response response) { - for (int i = processes.size() - 1; i >= 0; i--) { - ActivePostProcess handle = processes.get(i); - handle.postHandler(request, response); - } - } - - /** - * Trigger afterCompletion callbacks on the mapped ActivePostProcess. - * Will just invoke afterCompletion for all interceptors whose preHandle invocation - * has successfully completed and returned true. - */ - public void afterCompletion(Request request, Response response, Exception ex) { - for (int i = this.index; i >= 0; i--) { - ActivePostProcess handle = processes.get(i); - try { - handle.afterCompletion(request, response, ex); - } catch (Throwable e) { - log.error("HandlerInterceptor.afterCompletion threw exception", e); - } - } - } - -} diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/NettyServerConnection.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/NettyServerConnection.java index 49083c03..9c827b2b 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/NettyServerConnection.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/NettyServerConnection.java @@ -18,7 +18,6 @@ package cn.hippo4j.rpc.server; import cn.hippo4j.common.toolkit.Assert; -import cn.hippo4j.rpc.coder.NettyDecoder; import cn.hippo4j.rpc.coder.NettyEncoder; import cn.hippo4j.rpc.discovery.ServerPort; import cn.hippo4j.rpc.exception.ConnectionException; @@ -29,6 +28,7 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; +import io.netty.handler.codec.serialization.ObjectDecoder; import lombok.extern.slf4j.Slf4j; import java.util.Arrays; @@ -37,6 +37,8 @@ import java.util.List; /** * adapter to the netty server + * + * @since 1.5.1 */ @Slf4j public class NettyServerConnection extends AbstractNettyHandlerManager implements ServerConnection { @@ -70,6 +72,10 @@ public class NettyServerConnection extends AbstractNettyHandlerManager implement @Override public void bind(ServerPort port) { + int serverPort = port.getPort(); + if (serverPort < 0 || serverPort > 65535) { + throw new ConnectionException("The port number " + serverPort + " is outside 0~65535, which is not a legal port number"); + } ServerBootstrap server = new ServerBootstrap(); server.group(leader, worker) .channel(socketChannelCls) @@ -77,10 +83,10 @@ public class NettyServerConnection extends AbstractNettyHandlerManager implement .childHandler(new ChannelInitializer() { @Override - protected void initChannel(SocketChannel ch) throws Exception { + protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new NettyEncoder()); - pipeline.addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null))); + pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null))); handlerEntities.stream() .sorted() .forEach(h -> { @@ -93,9 +99,11 @@ public class NettyServerConnection extends AbstractNettyHandlerManager implement } }); try { - this.future = server.bind(port.getPort()).sync(); + this.future = server.bind(serverPort).sync(); this.channel = this.future.channel(); - log.info("The server is started and can receive requests. The listening port is {}", port.getPort()); + if (log.isDebugEnabled()) { + log.debug("The server is started and can receive requests. The listening port is {}", serverPort); + } this.port = port; this.future.channel().closeFuture().sync(); } catch (InterruptedException ex) { @@ -113,7 +121,9 @@ public class NettyServerConnection extends AbstractNettyHandlerManager implement this.worker.shutdownGracefully(); this.channel.close(); this.future.channel().close(); - log.info("The server is shut down and no more requests are received. The release port is {}", port.getPort()); + if (log.isDebugEnabled()) { + log.debug("The server is shut down and no more requests are received. The release port is {}", port.getPort()); + } } @Override @@ -136,16 +146,4 @@ public class NettyServerConnection extends AbstractNettyHandlerManager implement return this; } - @Override - public NettyServerConnection addLast(ChannelHandler handler) { - super.addLast(handler); - return this; - } - - @Override - public NettyServerConnection addFirst(ChannelHandler handler) { - super.addFirst(handler); - return this; - } - } diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/ClientFactoryBean.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/ClientFactoryBean.java index b63e81b6..37b63596 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/ClientFactoryBean.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/ClientFactoryBean.java @@ -17,6 +17,7 @@ package cn.hippo4j.rpc.support; +import cn.hippo4j.common.toolkit.Assert; import cn.hippo4j.rpc.client.Client; import cn.hippo4j.rpc.discovery.DiscoveryAdapter; import cn.hippo4j.rpc.exception.ConnectionException; @@ -30,11 +31,30 @@ import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import java.net.InetSocketAddress; +import java.util.Optional; /** * A FactoryBean that builds interfaces to invoke proxy objects * is responsible for managing the entire life cycle of the proxy objects
+ *

APPLICATION START

+ * When the application is started, the request initiator needs to complete the proxy of the calling interface, + * which ensures that the method can be requested to the server side when the method is called, rather than simply + * request an interface that cannot be instantiated. The classes involved in adding proxy to the interface are: + *
    + *
  • {@link NettyClientSupport}
  • + *
  • {@link NettyProxyCenter}
  • + *
  • {@link NettyClientPoolHandler}
  • + *
+ *

AND SPRING

+ * In order to fully integrate {@link ClientFactoryBean} into the life cycle of spring beans, + * {@link ClientFactoryBean} also needs to implement the following interfaces: + *
    + *
  • {@link InitializingBean}
  • + *
  • {@link ApplicationContextAware}
  • + *
  • {@link DisposableBean}
  • + *
* + * @since 1.5.1 * @deprecated With {@link cn.hippo4j.config.service.ThreadPoolAdapterService} structure, FactoryBean is not the best choice */ @Deprecated @@ -52,10 +72,16 @@ public class ClientFactoryBean implements FactoryBean, InitializingBean, */ private String discoveryAdapterName; + /** + * The adaptation interface for obtaining ip information in the registry is used together with + * {@link #discoveryAdapterName}, so that the adapter implementation can be obtained in the container + * during the initialization phase + */ private DiscoveryAdapter discoveryAdapter; /** - * the channel handler + * the channel handler, To ensure the security and reliability of netty calls, + * {@link ChannelHandler} must be identified by {@link ChannelHandler.Sharable} */ private ChannelHandler[] handlers; @@ -70,11 +96,13 @@ public class ClientFactoryBean implements FactoryBean, InitializingBean, private ApplicationContext applicationContext; /** - * InetSocketAddress + * InetSocketAddress, It is usually converted from {@link #applicationName} and {@link #discoveryAdapter} */ InetSocketAddress address; public ClientFactoryBean(String applicationName, String discoveryAdapterName, Class cls) { + Assert.notNull(applicationName); + Assert.notNull(cls); this.applicationName = applicationName; this.discoveryAdapterName = discoveryAdapterName; this.cls = cls; @@ -82,16 +110,17 @@ public class ClientFactoryBean implements FactoryBean, InitializingBean, @Override public Object getObject() throws Exception { - this.address = discoveryAdapter.getSocketAddress(applicationName); - if (this.address == null) { - String[] addressStr = applicationName.split(":"); - if (addressStr.length < 2) { - throw new ConnectionException("Failed to connect to the server because the IP address is invalid. Procedure"); - } - this.address = InetSocketAddress.createUnresolved(addressStr[0], Integer.parseInt(addressStr[1])); - } - NettyClientPoolHandler handler = new NettyClientPoolHandler(handlers); - Client client = NettyClientSupport.getClient(this.address, handler); + this.address = Optional.ofNullable(applicationName) + .map(a -> discoveryAdapter.getSocketAddress(a)) + .map(a -> { + String[] addressStr = applicationName.split(":"); + if (addressStr.length < 2) { + throw new ConnectionException("Failed to connect to the server because the IP address is invalid. Procedure"); + } + return InetSocketAddress.createUnresolved(addressStr[0], Integer.parseInt(addressStr[1])); + }) + .orElseThrow(() -> new ConnectionException("Failed to connect to the server because the IP address is invalid. Procedure")); + Client client = NettyClientSupport.getClient(this.address, new NettyClientPoolHandler(handlers)); return NettyProxyCenter.createProxy(client, cls, this.address); } @@ -102,15 +131,15 @@ public class ClientFactoryBean implements FactoryBean, InitializingBean, @Override public void afterPropertiesSet() throws Exception { - this.discoveryAdapter = (DiscoveryAdapter) applicationContext.getBean(discoveryAdapterName); + this.discoveryAdapter = Optional.ofNullable(discoveryAdapterName) + .map(s -> (DiscoveryAdapter) applicationContext.getBean(discoveryAdapterName)) + .orElse(null); } @Override public void destroy() throws Exception { - if (this.address == null) { - return; - } - NettyClientSupport.closeClient(this.address); + Optional.ofNullable(this.address) + .ifPresent(a -> NettyClientSupport.closeClient(this.address)); } @Override diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyClientSupport.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyClientSupport.java index d11530f4..315aec4f 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyClientSupport.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyClientSupport.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.lang.ref.WeakReference; import java.net.InetSocketAddress; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; /** @@ -51,6 +52,7 @@ import java.util.concurrent.ConcurrentHashMap; * @see cn.hippo4j.rpc.client.NettyClientConnection * @see NettyServerSupport * @see ClientFactoryBean + * @since 1.5.1 */ @NoArgsConstructor(access = AccessLevel.PRIVATE) public final class NettyClientSupport { @@ -73,7 +75,7 @@ public final class NettyClientSupport { ? (NettyClientPoolHandler) handlerManager : new NettyClientPoolHandler(); if (handler.isEmpty()) { - handler.addFirst(new NettyClientTakeHandler()); + handler.addFirst(null, new NettyClientTakeHandler()); } NettyClientConnection connection = new NettyClientConnection(address, handler); return new RPCClient(connection); @@ -97,12 +99,13 @@ public final class NettyClientSupport { */ public static void closeClient(InetSocketAddress address) { Client client = clientMap.remove(address); - try { - if (client != null) { - client.close(); - } - } catch (IOException e) { - throw new IllegalException(e); - } + Optional.ofNullable(client) + .ifPresent(c -> { + try { + c.close(); + } catch (IOException e) { + throw new IllegalException(e); + } + }); } } diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPool.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPool.java index 670520d4..dc2f9b57 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPool.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPool.java @@ -30,10 +30,13 @@ import io.netty.util.concurrent.Future; import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; +import java.util.Optional; import java.util.concurrent.TimeUnit; /** * This parameter applies only to the connection pool of netty + * + * @since 1.5.1 */ @Slf4j public class NettyConnectPool { @@ -58,7 +61,9 @@ public class NettyConnectPool { this.handler = handler; this.pool = new FixedChannelPool(bootstrap, handler, healthCheck, acquireTimeoutAction, timeout, maxConnect, maxPendingAcquires, true, true); - log.info("The connection pool is established with the connection target {}:{}", address.getHostName(), address.getPort()); + if (log.isDebugEnabled()) { + log.info("The connection pool is established with the connection target {}:{}", address.getHostName(), address.getPort()); + } NettyConnectPoolHolder.createPool(address, this); } @@ -82,14 +87,15 @@ public class NettyConnectPool { } public void release(Channel channel) { - try { - if (channel != null) { - pool.release(channel); - } - } catch (Exception e) { - NettyClientSupport.closeClient(address); - throw new ConnectionException("Failed to release the connection", e); - } + Optional.ofNullable(channel) + .ifPresent(c -> { + try { + pool.release(channel); + } catch (Exception e) { + NettyClientSupport.closeClient(address); + throw new ConnectionException("Failed to release the connection", e); + } + }); } public void close() { diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyServerSupport.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyServerSupport.java index 0099663a..b2cc2af1 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyServerSupport.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyServerSupport.java @@ -40,6 +40,7 @@ import java.util.List; * @see RPCServer * @see NettyServerConnection * @see NettyClientSupport + * @since 1.5.1 */ public class NettyServerSupport implements Server { @@ -95,7 +96,7 @@ public class NettyServerSupport implements Server { : new NettyServerConnection(); // Assign a default handler if no handler exists if (connection.isEmpty()) { - connection.addFirst(new NettyServerTakeHandler(new DefaultInstance())); + connection.addFirst(null, new NettyServerTakeHandler(new DefaultInstance())); } server = new RPCServer(connection, serverPort); }