diff --git a/hippo4j-rpc/pom.xml b/hippo4j-rpc/pom.xml index 49058de8..5e789ec3 100644 --- a/hippo4j-rpc/pom.xml +++ b/hippo4j-rpc/pom.xml @@ -12,7 +12,7 @@ cn.hippo4j - hippo4j-common + hippo4j-message ${revision} diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/NettyClientConnection.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/NettyClientConnection.java index 88c3bc93..0c9e8e66 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/NettyClientConnection.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/NettyClientConnection.java @@ -19,7 +19,6 @@ package cn.hippo4j.rpc.client; import cn.hippo4j.common.toolkit.Assert; import cn.hippo4j.common.web.exception.IllegalException; -import cn.hippo4j.rpc.discovery.ServerPort; import cn.hippo4j.rpc.exception.TimeOutException; import cn.hippo4j.rpc.model.Request; import cn.hippo4j.rpc.model.Response; @@ -35,6 +34,7 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.pool.ChannelPoolHandler; import lombok.extern.slf4j.Slf4j; +import java.net.InetSocketAddress; import java.util.LinkedList; import java.util.List; import java.util.concurrent.locks.LockSupport; @@ -45,9 +45,10 @@ import java.util.concurrent.locks.LockSupport; @Slf4j public class NettyClientConnection implements ClientConnection { - String host; - ServerPort port; - // Obtain the connection timeout period. The default value is 30s + InetSocketAddress address; + /** + * Obtain the connection timeout period. The default value is 30s + */ long timeout = 30000L; EventLoopGroup worker = new NioEventLoopGroup(); ActiveProcessChain activeProcessChain; @@ -55,18 +56,17 @@ public class NettyClientConnection implements ClientConnection { ChannelFuture future; Channel channel; - public NettyClientConnection(String host, ServerPort port, + public NettyClientConnection(InetSocketAddress address, List activeProcesses, ChannelPoolHandler handler) { Assert.notNull(worker); - this.host = host; - this.port = port; + this.address = address; this.activeProcessChain = new ActiveProcessChain(activeProcesses); - this.connectionPool = NettyConnectPoolHolder.getPool(host, port, timeout, worker, handler); + this.connectionPool = NettyConnectPoolHolder.getPool(address, timeout, worker, handler); } - public NettyClientConnection(String host, ServerPort port, ChannelPoolHandler handler) { - this(host, port, new LinkedList<>(), handler); + public NettyClientConnection(InetSocketAddress address, ChannelPoolHandler handler) { + this(address, new LinkedList<>(), handler); } @Override @@ -77,7 +77,7 @@ public class NettyClientConnection implements ClientConnection { try { String key = request.getKey(); this.future = channel.writeAndFlush(request); - log.info("Call successful, target address is {}:{}, request key is {}", host, port.getPort(), key); + log.info("Call successful, target address is {}:{}, request key is {}", address.getHostName(), address.getPort(), key); // Wait for execution to complete ResultHolder.putThread(key, Thread.currentThread()); LockSupport.parkNanos(timeout() * 1000000); @@ -86,7 +86,7 @@ public class NettyClientConnection implements ClientConnection { throw new TimeOutException("Timeout waiting for server-side response"); } activeProcessChain.applyPostHandle(request, response); - log.info("The response from {}:{} was received successfully with the response key {}.", host, port.getPort(), key); + log.info("The response from {}:{} was received successfully with the response key {}.", address.getHostName(), address.getPort(), key); return response; } catch (Exception ex) { activeProcessChain.afterCompletion(request, response, ex); diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/AbstractNettyTakeHandler.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/AbstractNettyTakeHandler.java index 9bdf49c2..4cb61a28 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/AbstractNettyTakeHandler.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/AbstractNettyTakeHandler.java @@ -41,7 +41,8 @@ public abstract class AbstractNettyTakeHandler extends ChannelInboundHandlerAdap Channel channel = ctx.channel(); if (channel.isActive()) { ctx.close(); - } else { + } + if (cause != null) { throw new ConnectionException(cause); } } diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/AbstractNettyClientPoolHandler.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyClientPoolHandler.java similarity index 72% rename from hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/AbstractNettyClientPoolHandler.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyClientPoolHandler.java index a0b71454..eced889a 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/AbstractNettyClientPoolHandler.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyClientPoolHandler.java @@ -22,6 +22,7 @@ import cn.hippo4j.rpc.coder.NettyEncoder; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelPipeline; import io.netty.channel.pool.ChannelPoolHandler; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; @@ -33,40 +34,40 @@ import java.util.List; * Processing by the client connection pool handler to clean the buffer and define new connection properties */ @Slf4j -public class AbstractNettyClientPoolHandler extends AbstractNettyHandlerManager implements ChannelPoolHandler { +public class NettyClientPoolHandler extends AbstractNettyHandlerManager implements ChannelPoolHandler { - public AbstractNettyClientPoolHandler(List handlers) { + public NettyClientPoolHandler(List handlers) { super(handlers); } - public AbstractNettyClientPoolHandler(ChannelHandler... handlers) { + public NettyClientPoolHandler(ChannelHandler... handlers) { super(handlers); } - public AbstractNettyClientPoolHandler() { + public NettyClientPoolHandler() { super(); } @Override - public AbstractNettyClientPoolHandler addLast(String name, ChannelHandler handler) { + public NettyClientPoolHandler addLast(String name, ChannelHandler handler) { super.addLast(name, handler); return this; } @Override - public AbstractNettyClientPoolHandler addFirst(String name, ChannelHandler handler) { + public NettyClientPoolHandler addFirst(String name, ChannelHandler handler) { super.addFirst(name, handler); return this; } @Override - public AbstractNettyClientPoolHandler addLast(ChannelHandler handler) { + public NettyClientPoolHandler addLast(ChannelHandler handler) { super.addLast(handler); return this; } @Override - public AbstractNettyClientPoolHandler addFirst(ChannelHandler handler) { + public NettyClientPoolHandler addFirst(ChannelHandler handler) { super.addFirst(handler); return this; } @@ -87,15 +88,16 @@ public class AbstractNettyClientPoolHandler extends AbstractNettyHandlerManager NioSocketChannel channel = (NioSocketChannel) ch; channel.config() .setTcpNoDelay(false); - ch.pipeline().addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null))); - ch.pipeline().addLast(new NettyEncoder()); + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(new NettyEncoder()); + pipeline.addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null))); this.handlerEntities.stream() .sorted() .forEach(h -> { if (h.getName() == null) { - ch.pipeline().addLast(h.getHandler()); + pipeline.addLast(h.getHandler()); } else { - ch.pipeline().addLast(h.getName(), h.getHandler()); + pipeline.addLast(h.getName(), h.getHandler()); } }); } diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/AbstractNettyServerConnection.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/NettyServerConnection.java similarity index 72% rename from hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/AbstractNettyServerConnection.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/NettyServerConnection.java index e3f39fc4..62bf0551 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/AbstractNettyServerConnection.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/NettyServerConnection.java @@ -21,6 +21,7 @@ import cn.hippo4j.common.toolkit.Assert; import cn.hippo4j.rpc.coder.NettyDecoder; import cn.hippo4j.rpc.coder.NettyEncoder; import cn.hippo4j.rpc.discovery.ServerPort; +import cn.hippo4j.rpc.exception.ConnectionException; import cn.hippo4j.rpc.handler.AbstractNettyHandlerManager; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; @@ -38,7 +39,7 @@ import java.util.List; * adapter to the netty server */ @Slf4j -public class AbstractNettyServerConnection extends AbstractNettyHandlerManager implements ServerConnection { +public class NettyServerConnection extends AbstractNettyHandlerManager implements ServerConnection { ServerPort port; EventLoopGroup leader; @@ -47,7 +48,7 @@ public class AbstractNettyServerConnection extends AbstractNettyHandlerManager i ChannelFuture future; Channel channel; - public AbstractNettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, List handlers) { + public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, List handlers) { super(handlers); Assert.notNull(leader); Assert.notNull(worker); @@ -55,15 +56,15 @@ public class AbstractNettyServerConnection extends AbstractNettyHandlerManager i this.worker = worker; } - public AbstractNettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, ChannelHandler... handlers) { + public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, ChannelHandler... handlers) { this(leader, worker, (handlers != null ? Arrays.asList(handlers) : Collections.emptyList())); } - public AbstractNettyServerConnection(ChannelHandler... handlers) { + public NettyServerConnection(ChannelHandler... handlers) { this(handlers != null ? Arrays.asList(handlers) : Collections.emptyList()); } - public AbstractNettyServerConnection(List handlers) { + public NettyServerConnection(List handlers) { this(new NioEventLoopGroup(), new NioEventLoopGroup(), handlers); } @@ -77,27 +78,29 @@ public class AbstractNettyServerConnection extends AbstractNettyHandlerManager i @Override protected void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null))); - ch.pipeline().addLast(new NettyEncoder()); + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(new NettyEncoder()); + pipeline.addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null))); handlerEntities.stream() .sorted() .forEach(h -> { if (h.getName() == null) { - ch.pipeline().addLast(h.getHandler()); + pipeline.addLast(h.getHandler()); } else { - ch.pipeline().addLast(h.getName(), h.getHandler()); + pipeline.addLast(h.getName(), h.getHandler()); } }); } }); try { - this.future = server.bind(port.getPort()); + this.future = server.bind(port.getPort()).sync(); this.channel = this.future.channel(); - log.info("The server is started and can receive requests. The listening port is {}", port); + log.info("The server is started and can receive requests. The listening port is {}", port.getPort()); this.port = port; this.future.channel().closeFuture().sync(); } catch (InterruptedException ex) { Thread.currentThread().interrupt(); + throw new ConnectionException("Listening port failed, Please check whether the port is occupied", ex); } } @@ -109,34 +112,37 @@ public class AbstractNettyServerConnection extends AbstractNettyHandlerManager i leader.shutdownGracefully(); worker.shutdownGracefully(); this.future.channel().close(); - log.info("The server is shut down and no more requests are received. The release port is {}", port); + log.info("The server is shut down and no more requests are received. The release port is {}", port.getPort()); } @Override public boolean isActive() { + if (channel == null) { + return false; + } return channel.isActive(); } @Override - public AbstractNettyServerConnection addLast(String name, ChannelHandler handler) { + public NettyServerConnection addLast(String name, ChannelHandler handler) { super.addLast(name, handler); return this; } @Override - public AbstractNettyServerConnection addFirst(String name, ChannelHandler handler) { + public NettyServerConnection addFirst(String name, ChannelHandler handler) { super.addFirst(name, handler); return this; } @Override - public AbstractNettyServerConnection addLast(ChannelHandler handler) { + public NettyServerConnection addLast(ChannelHandler handler) { super.addLast(handler); return this; } @Override - public AbstractNettyServerConnection addFirst(ChannelHandler handler) { + public NettyServerConnection addFirst(ChannelHandler handler) { super.addFirst(handler); return this; } diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/RPCServer.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/RPCServer.java index f69e8393..d5247a38 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/RPCServer.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/RPCServer.java @@ -18,8 +18,10 @@ package cn.hippo4j.rpc.server; import cn.hippo4j.rpc.discovery.ServerPort; +import cn.hippo4j.rpc.exception.ConnectionException; import java.io.IOException; +import java.util.concurrent.CompletableFuture; /** * Server Implementation @@ -34,9 +36,17 @@ public class RPCServer implements Server { this.serverConnection = serverConnection; } + /** + * Reference from{@link cn.hippo4j.config.netty.MonitorNettyServer}
+ * Start the server side asynchronously + */ @Override public void bind() { - serverConnection.bind(port); + CompletableFuture + .runAsync(() -> serverConnection.bind(port)) + .exceptionally(throwable -> { + throw new ConnectionException(throwable); + }); } @Override diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/ClientFactoryBean.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/ClientFactoryBean.java new file mode 100644 index 00000000..b63e81b6 --- /dev/null +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/ClientFactoryBean.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.rpc.support; + +import cn.hippo4j.rpc.client.Client; +import cn.hippo4j.rpc.discovery.DiscoveryAdapter; +import cn.hippo4j.rpc.exception.ConnectionException; +import cn.hippo4j.rpc.handler.NettyClientPoolHandler; +import io.netty.channel.ChannelHandler; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.FactoryBean; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; + +import java.net.InetSocketAddress; + +/** + * A FactoryBean that builds interfaces to invoke proxy objects + * is responsible for managing the entire life cycle of the proxy objects
+ * + * @deprecated With {@link cn.hippo4j.config.service.ThreadPoolAdapterService} structure, FactoryBean is not the best choice + */ +@Deprecated +public class ClientFactoryBean implements FactoryBean, InitializingBean, ApplicationContextAware, DisposableBean { + + /** + * Application name or address string. If it is an address string, it must be in ip:port format + */ + private String applicationName; + + /** + * The adapter name in the container needs to be used with applicationName + * to get the real server address. If it is null or the address information + * cannot be found, applicationName is treated as an address string + */ + private String discoveryAdapterName; + + private DiscoveryAdapter discoveryAdapter; + + /** + * the channel handler + */ + private ChannelHandler[] handlers; + + /** + * Type of the proxy interface + */ + private Class cls; + + /** + * Container Context + */ + private ApplicationContext applicationContext; + + /** + * InetSocketAddress + */ + InetSocketAddress address; + + public ClientFactoryBean(String applicationName, String discoveryAdapterName, Class cls) { + this.applicationName = applicationName; + this.discoveryAdapterName = discoveryAdapterName; + this.cls = cls; + } + + @Override + public Object getObject() throws Exception { + this.address = discoveryAdapter.getSocketAddress(applicationName); + if (this.address == null) { + String[] addressStr = applicationName.split(":"); + if (addressStr.length < 2) { + throw new ConnectionException("Failed to connect to the server because the IP address is invalid. Procedure"); + } + this.address = InetSocketAddress.createUnresolved(addressStr[0], Integer.parseInt(addressStr[1])); + } + NettyClientPoolHandler handler = new NettyClientPoolHandler(handlers); + Client client = NettyClientSupport.getClient(this.address, handler); + return NettyProxyCenter.createProxy(client, cls, this.address); + } + + @Override + public Class getObjectType() { + return cls; + } + + @Override + public void afterPropertiesSet() throws Exception { + this.discoveryAdapter = (DiscoveryAdapter) applicationContext.getBean(discoveryAdapterName); + } + + @Override + public void destroy() throws Exception { + if (this.address == null) { + return; + } + NettyClientSupport.closeClient(this.address); + } + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + } + + public ClientFactoryBean applicationName(String applicationName) { + this.applicationName = applicationName; + return this; + } + + public ClientFactoryBean discoveryAdapterName(String discoveryAdapterName) { + this.discoveryAdapterName = discoveryAdapterName; + return this; + } + + public ClientFactoryBean cls(Class cls) { + this.cls = cls; + return this; + } + + public ClientFactoryBean handlers(ChannelHandler[] handlers) { + this.handlers = handlers; + return this; + } + +} diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyClientSupport.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyClientSupport.java new file mode 100644 index 00000000..f83e0bb7 --- /dev/null +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyClientSupport.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.rpc.support; + +import cn.hippo4j.common.web.exception.IllegalException; +import cn.hippo4j.rpc.client.Client; +import cn.hippo4j.rpc.client.ClientConnection; +import cn.hippo4j.rpc.client.NettyClientConnection; +import cn.hippo4j.rpc.client.RPCClient; +import cn.hippo4j.rpc.handler.HandlerManager; +import cn.hippo4j.rpc.handler.NettyClientPoolHandler; +import cn.hippo4j.rpc.handler.NettyClientTakeHandler; +import io.netty.channel.ChannelHandler; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import java.io.IOException; +import java.lang.ref.WeakReference; +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Different from the management of the server side, in order not to waste resources, we pool the + * connections of different addresses and turn the client into a one-time resource. If there is no + * support from the container, the client is a resource that can be recovered after use. This is + * similar to {@link WeakReference}, but the client needs the user to set the life cycle.
+ *

+ * Typically, the client is just a front for the direct connection between the client and the server, + * and for any call to succeed, only the {@link ClientConnection} connection is required. In the + * presence of a container, it is necessary to keep the client active for a long time, when the + * client should be a specific resource in the container, following the resource lifecycle specified + * by the container + * + * @see cn.hippo4j.rpc.client.RPCClient + * @see cn.hippo4j.rpc.client.NettyClientConnection + * @see NettyServerSupport + * @see ClientFactoryBean + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public final class NettyClientSupport { + + /** + * the cache for client + */ + private static final Map clientMap = new ConcurrentHashMap<>(); + + /** + * Obtain the client connected to the server through the server address. If the client does not exist, create one + * + * @param address the address + * @param handlerManager the handlerManager + * @return Client + */ + public static Client getClient(InetSocketAddress address, HandlerManager handlerManager) { + return clientMap.computeIfAbsent(address, a -> { + NettyClientPoolHandler handler = (handlerManager instanceof NettyClientPoolHandler) + ? (NettyClientPoolHandler) handlerManager + : new NettyClientPoolHandler(); + if (handler.isEmpty()) { + handler.addFirst(new NettyClientTakeHandler()); + } + NettyClientConnection connection = new NettyClientConnection(address, handler); + return new RPCClient(connection); + }); + } + + /** + * Obtain the client connected to the server through the server address. If the client does not exist, create one by default + * + * @param address the address + * @return Client + */ + public static Client getClient(InetSocketAddress address) { + return getClient(address, new NettyClientPoolHandler()); + } + + /** + * Close a client connected to a server address. The client may have been closed + * + * @param address the address + */ + public static void closeClient(InetSocketAddress address) { + Client client = clientMap.remove(address); + try { + if (client != null){ + client.close(); + } + } catch (IOException e) { + throw new IllegalException(e); + } + } +} diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPool.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPool.java index a8406d52..670520d4 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPool.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPool.java @@ -17,10 +17,10 @@ package cn.hippo4j.rpc.support; -import cn.hippo4j.rpc.discovery.ServerPort; import cn.hippo4j.rpc.exception.ConnectionException; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; +import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.pool.ChannelHealthChecker; import io.netty.channel.pool.ChannelPool; @@ -43,25 +43,23 @@ public class NettyConnectPool { int maxPendingAcquires = Integer.MAX_VALUE; ChannelPoolHandler handler; ChannelPool pool; - String host; - ServerPort port; + InetSocketAddress address; - public NettyConnectPool(String host, ServerPort port, int maxConnect, + public NettyConnectPool(InetSocketAddress address, int maxConnect, long timeout, EventLoopGroup worker, Class socketChannelCls, ChannelPoolHandler handler) { - InetSocketAddress socketAddress = InetSocketAddress.createUnresolved(host, port.getPort()); Bootstrap bootstrap = new Bootstrap() .group(worker) .channel(socketChannelCls) - .remoteAddress(socketAddress); - this.host = host; - this.port = port; + .option(ChannelOption.TCP_NODELAY, true) + .remoteAddress(address); + this.address = address; this.handler = handler; this.pool = new FixedChannelPool(bootstrap, handler, healthCheck, acquireTimeoutAction, timeout, maxConnect, maxPendingAcquires, true, true); - log.info("The connection pool is established with the connection target {}:{}", host, port.getPort()); - NettyConnectPoolHolder.createPool(host, port, this); + log.info("The connection pool is established with the connection target {}:{}", address.getHostName(), address.getPort()); + NettyConnectPoolHolder.createPool(address, this); } public Channel acquire(long timeoutMillis) { @@ -69,6 +67,7 @@ public class NettyConnectPool { Future fch = pool.acquire(); return fch.get(timeoutMillis, TimeUnit.MILLISECONDS); } catch (Exception e) { + NettyClientSupport.closeClient(address); throw new ConnectionException("Failed to get the connection", e); } } @@ -77,6 +76,7 @@ public class NettyConnectPool { try { return pool.acquire(); } catch (Exception e) { + NettyClientSupport.closeClient(address); throw new ConnectionException("Failed to get the connection", e); } } @@ -87,6 +87,7 @@ public class NettyConnectPool { pool.release(channel); } } catch (Exception e) { + NettyClientSupport.closeClient(address); throw new ConnectionException("Failed to release the connection", e); } } @@ -94,8 +95,9 @@ public class NettyConnectPool { public void close() { try { pool.close(); - NettyConnectPoolHolder.remove(host, port); + NettyConnectPoolHolder.remove(address); } catch (Exception e) { + NettyClientSupport.closeClient(address); throw new ConnectionException("Failed to close the connection pool", e); } } diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPoolHolder.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPoolHolder.java index 32475363..f1f88856 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPoolHolder.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPoolHolder.java @@ -17,7 +17,6 @@ package cn.hippo4j.rpc.support; -import cn.hippo4j.rpc.discovery.ServerPort; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.pool.ChannelPoolHandler; @@ -26,6 +25,7 @@ import io.netty.util.concurrent.EventExecutorGroup; import lombok.AccessLevel; import lombok.NoArgsConstructor; +import java.net.InetSocketAddress; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -35,76 +35,68 @@ import java.util.concurrent.ConcurrentHashMap; @NoArgsConstructor(access = AccessLevel.PRIVATE) public class NettyConnectPoolHolder { - static int maxConnect = 64; + static int maxConnect = 256; static Map connectPoolMap = new ConcurrentHashMap<>(); - private static NettyConnectPool initPool(String host, ServerPort port, + private static NettyConnectPool initPool(InetSocketAddress address, long timeout, EventLoopGroup worker, ChannelPoolHandler handler) { - return new NettyConnectPool( - host, port, maxConnect, - timeout, worker, - NioSocketChannel.class, - handler); + return new NettyConnectPool(address, maxConnect, timeout, worker, NioSocketChannel.class, handler); } - private static String getKey(String host, ServerPort port) { - return host + ":" + port.getPort(); + private static String getKey(InetSocketAddress address) { + return address.getHostName() + ":" + address.getPort(); } /** * The connection pool connectPoolMapping may already exist before the connection pool * connectPoolMapping is established. In this case, the connection pool is directly overwritten * - * @param host the host - * @param port the port - * @param pool This parameter applies only to the connection pool of netty + * @param address the InetSocketAddress + * @param pool This parameter applies only to the connection pool of netty */ - public static void createPool(String host, ServerPort port, NettyConnectPool pool) { - connectPoolMap.put(getKey(host, port), pool); + public static void createPool(InetSocketAddress address, NettyConnectPool pool) { + connectPoolMap.put(getKey(address), pool); } /** * Gets a connection pool, or null if there is no corresponding connectPoolMapping * - * @param host the host - * @param port the port + * @param address the InetSocketAddress * @return Map to the connection pool */ - public static NettyConnectPool getPool(String host, ServerPort port) { - return connectPoolMap.get(getKey(host, port)); + public static NettyConnectPool getPool(InetSocketAddress address) { + return connectPoolMap.get(getKey(address)); } /** * Gets a connection pool, and if there is no connectPoolMapping, creates one with the values provided and joins the connectPoolMapping * - * @param host the host - * @param port the port - * @param timeout timeout - * @param worker Special {@link EventExecutorGroup} which allows registering {@link Channel}s - * that get processed for later selection during the event loop. + * @param address the InetSocketAddress + * @param timeout timeout + * @param worker Special {@link EventExecutorGroup} which allows registering {@link Channel}s + * that get processed for later selection during the event loop. * @param handler the chandler for netty * @return Map to the connection pool */ - public static synchronized NettyConnectPool getPool(String host, ServerPort port, + public static synchronized NettyConnectPool getPool(InetSocketAddress address, long timeout, EventLoopGroup worker, ChannelPoolHandler handler) { /* * this cannot use the computeIfAbsent method directly here because put is already used in init. Details refer to https://bugs.openjdk.java.net/browse/JDK-8062841 */ - NettyConnectPool pool = getPool(host, port); - return pool == null ? initPool(host, port, timeout, worker, handler) : pool; + NettyConnectPool pool = getPool(address); + return pool == null ? initPool(address, timeout, worker, handler) : pool; } /** * Disconnect a connection connectPoolMapping. This must take effect at the same time as the connection pool is closed * - * @param host host - * @param port port + * @param address the InetSocketAddress */ - public static void remove(String host, ServerPort port) { - connectPoolMap.remove(getKey(host, port)); + public static void remove(InetSocketAddress address) { + connectPoolMap.remove(getKey(address)); } /** diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyProxyCenter.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyProxyCenter.java index 35fab496..836070ea 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyProxyCenter.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyProxyCenter.java @@ -20,20 +20,19 @@ package cn.hippo4j.rpc.support; import cn.hippo4j.common.toolkit.IdUtil; import cn.hippo4j.common.web.exception.IllegalException; import cn.hippo4j.rpc.client.Client; -import cn.hippo4j.rpc.client.NettyClientConnection; -import cn.hippo4j.rpc.client.RPCClient; -import cn.hippo4j.rpc.discovery.ServerPort; +import cn.hippo4j.rpc.exception.ConnectionException; +import cn.hippo4j.rpc.handler.NettyClientPoolHandler; import cn.hippo4j.rpc.model.DefaultRequest; import cn.hippo4j.rpc.model.Request; import cn.hippo4j.rpc.model.Response; -import io.netty.channel.pool.ChannelPoolHandler; import lombok.AccessLevel; import lombok.NoArgsConstructor; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Proxy; -import java.util.HashMap; +import java.net.InetSocketAddress; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * Add a proxy for the request, {@link Proxy} and {@link InvocationHandler} @@ -42,31 +41,77 @@ import java.util.Map; public class NettyProxyCenter { // cache - static Map, Object> map = new HashMap<>(); + static Map map = new ConcurrentHashMap<>(); /** * A proxy object for PRC is obtained through an interface * * @param cls The interface type - * @param host Request the address - * @param port port + * @param address address * @param Object type * @param handler the pool handler for netty * @return Proxy objects */ - public static T getProxy(Class cls, String host, ServerPort port, ChannelPoolHandler handler) { - NettyClientConnection connection = new NettyClientConnection(host, port, handler); - Client rpcClient = new RPCClient(connection); - return getProxy(rpcClient, cls, host, port); + @SuppressWarnings("unchecked") + public static T getProxy(Class cls, InetSocketAddress address, NettyClientPoolHandler handler) { + Client client = NettyClientSupport.getClient(address, handler); + String s = address + cls.getName(); + Object o = map.get(s); + if (o != null) { + return (T) o; + } + return createProxy(client, cls, address); + } + + /** + * A proxy object for PRC is obtained through an interface + * + * @param cls The interface type + * @param address address String + * @param Object type + * @return Proxy objects + */ + @SuppressWarnings("unchecked") + public static T getProxy(Class cls, String address) { + String[] addressStr = address.split(":"); + if (addressStr.length < 2) { + throw new ConnectionException("Failed to connect to the server because the IP address is invalid. Procedure"); + } + InetSocketAddress socketAddress = InetSocketAddress.createUnresolved(addressStr[0], Integer.parseInt(addressStr[1])); + String s = socketAddress + cls.getName(); + Object o = map.get(s); + if (o != null) { + return (T) o; + } + Client client = NettyClientSupport.getClient(socketAddress); + return createProxy(client, cls, socketAddress); + } + + /** + * remove proxy object + * + * @param cls the class + * @param address address String + */ + public static void removeProxy(Class cls, String address) { + String[] addressStr = address.split(":"); + if (addressStr.length < 2) { + throw new ConnectionException("Failed to connect to the server because the IP address is invalid. Procedure"); + } + InetSocketAddress socketAddress = InetSocketAddress.createUnresolved(addressStr[0], Integer.parseInt(addressStr[1])); + String s = socketAddress + cls.getName(); + NettyClientSupport.closeClient(socketAddress); + map.remove(s); } @SuppressWarnings("unchecked") - public static T getProxy(Client client, Class cls, String host, ServerPort port) { + public static T createProxy(Client client, Class cls, InetSocketAddress address) { boolean b = cls.isInterface(); if (!b) { throw new IllegalException(cls.getName() + "is not a Interface"); } - Object o = map.get(cls); + String s = address.toString() + cls.getName(); + Object o = map.get(s); if (o != null) { return (T) o; } @@ -76,7 +121,7 @@ public class NettyProxyCenter { (proxy, method, args) -> { String clsName = cls.getName(); String methodName = method.getName(); - String key = host + port + clsName + methodName + IdUtil.simpleUUID(); + String key = address + clsName + methodName + IdUtil.simpleUUID(); Class[] parameterTypes = method.getParameterTypes(); Request request = new DefaultRequest(key, clsName, methodName, parameterTypes, args); Response response = client.connection(request); @@ -88,7 +133,7 @@ public class NettyProxyCenter { } return response.getObj(); }); - map.put(cls, obj); + map.put(s, obj); return obj; } } diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyServerSupport.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyServerSupport.java index 56f22d9e..0099663a 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyServerSupport.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyServerSupport.java @@ -22,7 +22,7 @@ import cn.hippo4j.rpc.discovery.DefaultInstance; import cn.hippo4j.rpc.discovery.ServerPort; import cn.hippo4j.rpc.handler.HandlerManager; import cn.hippo4j.rpc.handler.NettyServerTakeHandler; -import cn.hippo4j.rpc.server.AbstractNettyServerConnection; +import cn.hippo4j.rpc.server.NettyServerConnection; import cn.hippo4j.rpc.server.RPCServer; import cn.hippo4j.rpc.server.Server; import io.netty.channel.ChannelHandler; @@ -36,6 +36,10 @@ import java.util.List; * This is a server-side build class that allows you to quickly prepare data on the server side and start the server side.
*

* The composite pattern is adopted, which means that it is itself a server-side implementation, so it is stateless. + * + * @see RPCServer + * @see NettyServerConnection + * @see NettyClientSupport */ public class NettyServerSupport implements Server { @@ -43,27 +47,27 @@ public class NettyServerSupport implements Server { * The interface that the server side can call, * All the methods in the interface are brokered during initialization */ - List> classes; + protected List> classes; /** * Extract the port number of the web container, * which is the port information exposed by the server */ - ServerPort serverPort; + protected ServerPort serverPort; /** * ChannelHandler */ - HandlerManager handlerManager; + protected HandlerManager handlerManager; - Server server; + protected Server server; public NettyServerSupport(ServerPort serverPort, Class... classes) { - this(serverPort, new AbstractNettyServerConnection(), classes); + this(serverPort, new NettyServerConnection(), classes); } public NettyServerSupport(ServerPort serverPort, List> classes) { - this(serverPort, new AbstractNettyServerConnection(), classes); + this(serverPort, new NettyServerConnection(), classes); } public NettyServerSupport(ServerPort serverPort, HandlerManager handlerManager, Class... classes) { @@ -82,16 +86,16 @@ public class NettyServerSupport implements Server { * Only interfaces are registered during registration. Classes and abstract classes are not registered. * If no processor is available, a default processor is provided */ - private void initServer() { + protected void initServer() { // Register the interface that can be invoked classes.stream().filter(Class::isInterface) .forEach(cls -> ClassRegistry.put(cls.getName(), cls)); - AbstractNettyServerConnection connection = (handlerManager instanceof AbstractNettyServerConnection) - ? (AbstractNettyServerConnection) handlerManager - : new AbstractNettyServerConnection(); + NettyServerConnection connection = (handlerManager instanceof NettyServerConnection) + ? (NettyServerConnection) handlerManager + : new NettyServerConnection(); // Assign a default handler if no handler exists if (connection.isEmpty()) { - connection.addLast(new NettyServerTakeHandler(new DefaultInstance())); + connection.addFirst(new NettyServerTakeHandler(new DefaultInstance())); } server = new RPCServer(connection, serverPort); } diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/ResultHolder.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/ResultHolder.java index 141368c3..613a079b 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/ResultHolder.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/ResultHolder.java @@ -46,7 +46,9 @@ public class ResultHolder { * @param o The result */ public static void put(String key, Object o) { - log.debug("Write the result, wake up the thread"); + if (log.isDebugEnabled()) { + log.debug("Write the result, wake up the thread"); + } map.put(key, o); } @@ -57,7 +59,9 @@ public class ResultHolder { * @param t The Thread */ public static void putThread(String key, Thread t) { - log.debug("Write thread, waiting to wake up"); + if (log.isDebugEnabled()) { + log.debug("Write thread, waiting to wake up"); + } threadMap.put(key, t); } @@ -67,7 +71,9 @@ public class ResultHolder { * @param key Request and response keys */ public static synchronized void wake(String key) { - log.debug("The future has been fetched, wake up the thread"); + if (log.isDebugEnabled()) { + log.debug("The future has been fetched, wake up the thread"); + } Thread thread = threadMap.remove(key); LockSupport.unpark(thread); } @@ -82,7 +88,9 @@ public class ResultHolder { */ @SuppressWarnings("unchecked") public static T get(String key) { - log.debug("Get the future"); + if (log.isDebugEnabled()) { + log.debug("Get the future"); + } return (T) map.remove(key); } diff --git a/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/client/RPCClientTest.java b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/client/RPCClientTest.java index 6602a4da..867a3826 100644 --- a/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/client/RPCClientTest.java +++ b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/client/RPCClientTest.java @@ -17,25 +17,25 @@ package cn.hippo4j.rpc.client; +import cn.hippo4j.rpc.discovery.ClassRegistry; +import cn.hippo4j.rpc.discovery.DefaultInstance; +import cn.hippo4j.rpc.discovery.Instance; import cn.hippo4j.rpc.discovery.ServerPort; -import cn.hippo4j.rpc.handler.AbstractNettyClientPoolHandler; +import cn.hippo4j.rpc.handler.NettyClientPoolHandler; import cn.hippo4j.rpc.handler.NettyClientTakeHandler; import cn.hippo4j.rpc.handler.NettyServerTakeHandler; import cn.hippo4j.rpc.model.DefaultRequest; import cn.hippo4j.rpc.model.Request; import cn.hippo4j.rpc.model.Response; -import cn.hippo4j.rpc.server.AbstractNettyServerConnection; +import cn.hippo4j.rpc.server.NettyServerConnection; import cn.hippo4j.rpc.server.RPCServer; import cn.hippo4j.rpc.server.ServerConnection; -import cn.hippo4j.rpc.discovery.ClassRegistry; -import cn.hippo4j.rpc.discovery.DefaultInstance; -import cn.hippo4j.rpc.discovery.Instance; import io.netty.channel.pool.ChannelPoolHandler; import org.junit.Assert; import org.junit.Test; import java.io.IOException; -import java.util.concurrent.CompletableFuture; +import java.net.InetSocketAddress; import java.util.concurrent.TimeUnit; public class RPCClientTest { @@ -44,6 +44,9 @@ public class RPCClientTest { ServerPort port = new TestServerPort(); ServerPort portTest = new TestPortServerPort(); + /** + * This test case can be overridden under the handler and coder packages + */ @Test public void connection() throws IOException { Class cls = CallManager.class; @@ -52,31 +55,33 @@ public class RPCClientTest { // The mode connection was denied when the server was started on the specified port Instance instance = new DefaultInstance(); NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); - ServerConnection connection = new AbstractNettyServerConnection(handler); - RPCServer rpcServer = new RPCServer(connection, port); - CompletableFuture.runAsync(rpcServer::bind); + ServerConnection connection = new NettyServerConnection(handler); + RPCServer rpcServer = new RPCServer(connection, portTest); + rpcServer.bind(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { throw new RuntimeException(e); } - ChannelPoolHandler channelPoolHandler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler()); - NettyClientConnection clientConnection = new NettyClientConnection(host, port, channelPoolHandler); + InetSocketAddress address = InetSocketAddress.createUnresolved(host, portTest.getPort()); + ChannelPoolHandler channelPoolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler()); + NettyClientConnection clientConnection = new NettyClientConnection(address, channelPoolHandler); RPCClient rpcClient = new RPCClient(clientConnection); - Request request = new DefaultRequest("127.0.0.18888", className, "call", null, null); - for (int i = 0; i < 100; i++) { - Response response = rpcClient.connection(request); - boolean active = rpcClient.isActive(); - Assert.assertTrue(active); - Assert.assertEquals(response.getObj(), 1); - } + Class[] classes = new Class[2]; + classes[0] = Integer.class; + classes[1] = Integer.class; + Object[] objects = new Object[2]; + objects[0] = 1; + objects[1] = 2; + Request request = new DefaultRequest("127.0.0.18889", className, "callTest", classes, objects); + Response response = rpcClient.connection(request); + boolean active = rpcClient.isActive(); + Assert.assertTrue(active); + Assert.assertEquals(response.getObj(), 3); rpcClient.close(); rpcServer.close(); } - /** - * This test case can be overridden under the handler and coder packages - */ @Test public void connectionTest() throws IOException { Class cls = CallManager.class; @@ -85,28 +90,25 @@ public class RPCClientTest { // The mode connection was denied when the server was started on the specified port Instance instance = new DefaultInstance(); NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); - ServerConnection connection = new AbstractNettyServerConnection(handler); - RPCServer rpcServer = new RPCServer(connection, portTest); - CompletableFuture.runAsync(rpcServer::bind); + ServerConnection connection = new NettyServerConnection(handler); + RPCServer rpcServer = new RPCServer(connection, port); + rpcServer.bind(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { throw new RuntimeException(e); } - ChannelPoolHandler channelPoolHandler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler()); - NettyClientConnection clientConnection = new NettyClientConnection(host, portTest, channelPoolHandler); + InetSocketAddress address = InetSocketAddress.createUnresolved(host, port.getPort()); + ChannelPoolHandler channelPoolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler()); + ClientConnection clientConnection = new NettyClientConnection(address, channelPoolHandler); RPCClient rpcClient = new RPCClient(clientConnection); - Class[] classes = new Class[2]; - classes[0] = Integer.class; - classes[1] = Integer.class; - Object[] objects = new Object[2]; - objects[0] = 1; - objects[1] = 2; - Request request = new DefaultRequest("127.0.0.18889", className, "callTest", classes, objects); - Response response = rpcClient.connection(request); - boolean active = rpcClient.isActive(); - Assert.assertTrue(active); - Assert.assertEquals(response.getObj(), 3); + Request request = new DefaultRequest("127.0.0.18888", className, "call", null, null); + for (int i = 0; i < 100; i++) { + Response response = rpcClient.connection(request); + boolean active = rpcClient.isActive(); + Assert.assertTrue(active); + Assert.assertEquals(response.getObj(), 1); + } rpcClient.close(); rpcServer.close(); } diff --git a/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/handler/ConnectHandlerTest.java b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/handler/ConnectHandlerTest.java index b9e4e9c1..1d4ef3b8 100644 --- a/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/handler/ConnectHandlerTest.java +++ b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/handler/ConnectHandlerTest.java @@ -20,7 +20,7 @@ package cn.hippo4j.rpc.handler; import cn.hippo4j.rpc.client.NettyClientConnection; import cn.hippo4j.rpc.client.RPCClient; import cn.hippo4j.rpc.discovery.*; -import cn.hippo4j.rpc.server.AbstractNettyServerConnection; +import cn.hippo4j.rpc.server.NettyServerConnection; import cn.hippo4j.rpc.server.RPCServer; import cn.hippo4j.rpc.support.NettyProxyCenter; import io.netty.channel.pool.ChannelPoolHandler; @@ -28,7 +28,7 @@ import org.junit.Assert; import org.junit.Test; import java.io.IOException; -import java.util.concurrent.CompletableFuture; +import java.net.InetSocketAddress; import java.util.concurrent.TimeUnit; public class ConnectHandlerTest { @@ -38,21 +38,23 @@ public class ConnectHandlerTest { // server Class cls = InstanceServerLoader.class; ClassRegistry.put(cls.getName(), cls); - ServerPort port = () -> 8891; + ServerPort port = () -> 8892; Instance instance = new DefaultInstance(); NettyServerTakeHandler serverHandler = new NettyServerTakeHandler(instance); - AbstractNettyServerConnection connection = new AbstractNettyServerConnection(serverHandler); + NettyServerConnection connection = new NettyServerConnection(serverHandler); RPCServer rpcServer = new RPCServer(connection, port); - CompletableFuture.runAsync(rpcServer::bind); + rpcServer.bind(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { throw new RuntimeException(e); } - ChannelPoolHandler channelPoolHandler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler()); - NettyClientConnection clientConnection = new NettyClientConnection("localhost", port, channelPoolHandler); + InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", port.getPort()); + ChannelPoolHandler channelPoolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler()); + NettyClientConnection clientConnection = new NettyClientConnection(address, channelPoolHandler); RPCClient rpcClient = new RPCClient(clientConnection); - InstanceServerLoader loader = NettyProxyCenter.getProxy(rpcClient, cls, "localhost", port); + + InstanceServerLoader loader = NettyProxyCenter.createProxy(rpcClient, cls, address); String name = loader.getName(); Assert.assertEquals("name", name); rpcClient.close(); diff --git a/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/handler/NettyClientPoolHandlerTest.java b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/handler/NettyClientPoolHandlerTest.java index 5d8a35f8..e1806508 100644 --- a/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/handler/NettyClientPoolHandlerTest.java +++ b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/handler/NettyClientPoolHandlerTest.java @@ -28,7 +28,7 @@ public class NettyClientPoolHandlerTest { TestHandler handler = new TestHandler(); long order = 0; String name = "Test"; - AbstractNettyClientPoolHandler poolHandler = new AbstractNettyClientPoolHandler(); + NettyClientPoolHandler poolHandler = new NettyClientPoolHandler(); HandlerManager.HandlerEntity entity = poolHandler.getHandlerEntity(order, handler, name); Assert.assertEquals(entity.getName(), name); Assert.assertEquals(entity.getOrder(), order); @@ -43,7 +43,7 @@ public class NettyClientPoolHandlerTest { TestHandler handler1 = new TestHandler(); long order1 = 1; String name1 = "Test1"; - AbstractNettyClientPoolHandler poolHandler = new AbstractNettyClientPoolHandler(); + NettyClientPoolHandler poolHandler = new NettyClientPoolHandler(); HandlerManager.HandlerEntity entity = poolHandler.getHandlerEntity(order, handler, name); HandlerManager.HandlerEntity entity1 = poolHandler.getHandlerEntity(order1, handler1, name1); int compare = entity.compareTo(entity1); @@ -52,7 +52,7 @@ public class NettyClientPoolHandlerTest { @Test public void addLast() { - AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler(); + NettyClientPoolHandler handler = new NettyClientPoolHandler(); Assert.assertTrue(handler.isEmpty()); handler.addLast(new TestHandler()); Assert.assertFalse(handler.isEmpty()); @@ -60,7 +60,7 @@ public class NettyClientPoolHandlerTest { @Test public void addFirst() { - AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler(); + NettyClientPoolHandler handler = new NettyClientPoolHandler(); Assert.assertTrue(handler.isEmpty()); handler.addFirst(new TestHandler()); Assert.assertFalse(handler.isEmpty()); @@ -68,7 +68,7 @@ public class NettyClientPoolHandlerTest { @Test public void testAddLast() { - AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler(); + NettyClientPoolHandler handler = new NettyClientPoolHandler(); Assert.assertTrue(handler.isEmpty()); handler.addLast("Test", new TestHandler()); Assert.assertFalse(handler.isEmpty()); @@ -76,7 +76,7 @@ public class NettyClientPoolHandlerTest { @Test public void testAddFirst() { - AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler(); + NettyClientPoolHandler handler = new NettyClientPoolHandler(); Assert.assertTrue(handler.isEmpty()); handler.addFirst("Test", new TestHandler()); Assert.assertFalse(handler.isEmpty()); diff --git a/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/server/NettyServerConnectionTest.java b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/server/NettyServerConnectionTest.java index c90fe9b6..1a55a2ae 100644 --- a/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/server/NettyServerConnectionTest.java +++ b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/server/NettyServerConnectionTest.java @@ -25,7 +25,7 @@ public class NettyServerConnectionTest { @Test public void addLast() { - AbstractNettyServerConnection connection = new AbstractNettyServerConnection(); + NettyServerConnection connection = new NettyServerConnection(); Assert.assertTrue(connection.isEmpty()); connection.addLast(new TestHandler()); Assert.assertFalse(connection.isEmpty()); @@ -33,7 +33,7 @@ public class NettyServerConnectionTest { @Test public void addFirst() { - AbstractNettyServerConnection connection = new AbstractNettyServerConnection(); + NettyServerConnection connection = new NettyServerConnection(); Assert.assertTrue(connection.isEmpty()); connection.addFirst(new TestHandler()); Assert.assertFalse(connection.isEmpty()); @@ -41,7 +41,7 @@ public class NettyServerConnectionTest { @Test public void testAddLast() { - AbstractNettyServerConnection connection = new AbstractNettyServerConnection(); + NettyServerConnection connection = new NettyServerConnection(); Assert.assertTrue(connection.isEmpty()); connection.addLast("Test", new TestHandler()); Assert.assertFalse(connection.isEmpty()); @@ -49,7 +49,7 @@ public class NettyServerConnectionTest { @Test public void testAddFirst() { - AbstractNettyServerConnection connection = new AbstractNettyServerConnection(); + NettyServerConnection connection = new NettyServerConnection(); Assert.assertTrue(connection.isEmpty()); connection.addFirst("Test", new TestHandler()); Assert.assertFalse(connection.isEmpty()); diff --git a/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/server/RPCServerTest.java b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/server/RPCServerTest.java index 95d44d94..e2f76240 100644 --- a/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/server/RPCServerTest.java +++ b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/server/RPCServerTest.java @@ -17,30 +17,30 @@ package cn.hippo4j.rpc.server; -import cn.hippo4j.rpc.discovery.ServerPort; -import cn.hippo4j.rpc.handler.NettyServerTakeHandler; import cn.hippo4j.rpc.discovery.DefaultInstance; import cn.hippo4j.rpc.discovery.Instance; +import cn.hippo4j.rpc.discovery.ServerPort; +import cn.hippo4j.rpc.handler.NettyServerTakeHandler; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import org.junit.Assert; import org.junit.Test; import java.io.IOException; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; public class RPCServerTest { public static ServerPort port = new TestServerPort(); + public static ServerPort portTest = new ServerPortTest(); @Test public void bind() throws IOException { Instance instance = new DefaultInstance(); NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); - ServerConnection connection = new AbstractNettyServerConnection(handler); + ServerConnection connection = new NettyServerConnection(handler); RPCServer rpcServer = new RPCServer(connection, port); - CompletableFuture.runAsync(rpcServer::bind); + rpcServer.bind(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { @@ -59,22 +59,34 @@ public class RPCServerTest { EventLoopGroup leader = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); - ServerConnection connection = new AbstractNettyServerConnection(leader, worker, handler); - RPCServer rpcServer = new RPCServer(connection, port); - CompletableFuture.runAsync(rpcServer::bind); + ServerConnection connection = new NettyServerConnection(leader, worker, handler); + RPCServer rpcServer = new RPCServer(connection, portTest); + rpcServer.bind(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { throw new RuntimeException(e); } + boolean active = rpcServer.isActive(); + Assert.assertTrue(active); rpcServer.close(); + boolean serverActive = rpcServer.isActive(); + Assert.assertFalse(serverActive); } static class TestServerPort implements ServerPort { @Override public int getPort() { - return 8888; + return 8893; + } + } + + static class ServerPortTest implements ServerPort { + + @Override + public int getPort() { + return 8894; } } } \ No newline at end of file diff --git a/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/NettyConnectPoolHolderTest.java b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/NettyConnectPoolHolderTest.java index ab3fb0b6..b367b1d9 100644 --- a/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/NettyConnectPoolHolderTest.java +++ b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/NettyConnectPoolHolderTest.java @@ -18,7 +18,7 @@ package cn.hippo4j.rpc.support; import cn.hippo4j.rpc.discovery.ServerPort; -import cn.hippo4j.rpc.handler.AbstractNettyClientPoolHandler; +import cn.hippo4j.rpc.handler.NettyClientPoolHandler; import cn.hippo4j.rpc.handler.NettyClientTakeHandler; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; @@ -27,6 +27,8 @@ import io.netty.channel.socket.nio.NioSocketChannel; import org.junit.Assert; import org.junit.Test; +import java.net.InetSocketAddress; + public class NettyConnectPoolHolderTest { String host = "127.0.0.1"; @@ -38,34 +40,37 @@ public class NettyConnectPoolHolderTest { @Test public void createPool() { - AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler()); - NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, handler); - NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port); + NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler()); + InetSocketAddress address = InetSocketAddress.createUnresolved(host, port.getPort()); + NettyConnectPool pool = new NettyConnectPool(address, maxCount, timeout, group, cls, handler); + NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(address); Assert.assertEquals(pool, connectPool); NettyConnectPoolHolder.clear(); - NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(host, port); + NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(address); Assert.assertNull(connectPool1); } @Test public void testGetPool() { - AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler()); - NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port, timeout, group, handler); - NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(host, port); + NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler()); + InetSocketAddress address = InetSocketAddress.createUnresolved(host, port.getPort()); + NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(address, timeout, group, handler); + NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(address); Assert.assertEquals(connectPool1, connectPool); NettyConnectPoolHolder.clear(); - NettyConnectPool connectPool2 = NettyConnectPoolHolder.getPool(host, port); + NettyConnectPool connectPool2 = NettyConnectPoolHolder.getPool(address); Assert.assertNull(connectPool2); } @Test public void remove() { - AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler()); - NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port, timeout, group, handler); - NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(host, port); + NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler()); + InetSocketAddress address = InetSocketAddress.createUnresolved(host, port.getPort()); + NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(address, timeout, group, handler); + NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(address); Assert.assertEquals(connectPool1, connectPool); - NettyConnectPoolHolder.remove(host, port); - NettyConnectPool connectPool2 = NettyConnectPoolHolder.getPool(host, port); + NettyConnectPoolHolder.remove(address); + NettyConnectPool connectPool2 = NettyConnectPoolHolder.getPool(address); Assert.assertNull(connectPool2); } @@ -73,7 +78,7 @@ public class NettyConnectPoolHolderTest { @Override public int getPort() { - return 8888; + return 8895; } } } \ No newline at end of file diff --git a/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/NettyConnectPoolTest.java b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/NettyConnectPoolTest.java index 1e0f8026..2806e40a 100644 --- a/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/NettyConnectPoolTest.java +++ b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/NettyConnectPoolTest.java @@ -20,10 +20,10 @@ package cn.hippo4j.rpc.support; import cn.hippo4j.rpc.discovery.DefaultInstance; import cn.hippo4j.rpc.discovery.Instance; import cn.hippo4j.rpc.discovery.ServerPort; -import cn.hippo4j.rpc.handler.AbstractNettyClientPoolHandler; +import cn.hippo4j.rpc.handler.NettyClientPoolHandler; import cn.hippo4j.rpc.handler.NettyClientTakeHandler; import cn.hippo4j.rpc.handler.NettyServerTakeHandler; -import cn.hippo4j.rpc.server.AbstractNettyServerConnection; +import cn.hippo4j.rpc.server.NettyServerConnection; import cn.hippo4j.rpc.server.RPCServer; import cn.hippo4j.rpc.server.ServerConnection; import io.netty.channel.Channel; @@ -35,7 +35,7 @@ import org.junit.Assert; import org.junit.Test; import java.io.IOException; -import java.util.concurrent.CompletableFuture; +import java.net.InetSocketAddress; import java.util.concurrent.TimeUnit; public class NettyConnectPoolTest { @@ -52,17 +52,18 @@ public class NettyConnectPoolTest { // The mode connection was denied when the server was started on the specified port Instance instance = new DefaultInstance(); NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); - ServerConnection connection = new AbstractNettyServerConnection(handler); + ServerConnection connection = new NettyServerConnection(handler); RPCServer rpcServer = new RPCServer(connection, port); - CompletableFuture.runAsync(rpcServer::bind); + rpcServer.bind(); // Given the delay in starting the server, wait here try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { throw new RuntimeException(e); } - AbstractNettyClientPoolHandler poolHandler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler()); - NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, poolHandler); + InetSocketAddress address = InetSocketAddress.createUnresolved(host, port.getPort()); + NettyClientPoolHandler poolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler()); + NettyConnectPool pool = new NettyConnectPool(address, maxCount, timeout, group, cls, poolHandler); Channel acquire = pool.acquire(timeout); Assert.assertNotNull(acquire); pool.release(acquire); @@ -74,17 +75,18 @@ public class NettyConnectPoolTest { // The mode connection was denied when the server was started on the specified port Instance instance = new DefaultInstance(); NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); - ServerConnection connection = new AbstractNettyServerConnection(handler); + ServerConnection connection = new NettyServerConnection(handler); RPCServer rpcServer = new RPCServer(connection, port); - CompletableFuture.runAsync(rpcServer::bind); + rpcServer.bind(); // Given the delay in starting the server, wait here try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { throw new RuntimeException(e); } - AbstractNettyClientPoolHandler poolHandler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler()); - NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, poolHandler); + InetSocketAddress address = InetSocketAddress.createUnresolved(host, port.getPort()); + NettyClientPoolHandler poolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler()); + NettyConnectPool pool = new NettyConnectPool(address, maxCount, timeout, group, cls, poolHandler); Future acquire = pool.acquire(); Assert.assertNotNull(acquire); rpcServer.close(); @@ -95,18 +97,18 @@ public class NettyConnectPoolTest { // The mode connection was denied when the server was started on the specified port Instance instance = new DefaultInstance(); NettyServerTakeHandler handler = new NettyServerTakeHandler(instance); - ServerConnection connection = new AbstractNettyServerConnection(handler); + ServerConnection connection = new NettyServerConnection(handler); RPCServer rpcServer = new RPCServer(connection, port); - CompletableFuture.runAsync(rpcServer::bind); + rpcServer.bind(); // Given the delay in starting the server, wait here try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { throw new RuntimeException(e); } - - AbstractNettyClientPoolHandler poolHandler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler()); - NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, poolHandler); + InetSocketAddress address = InetSocketAddress.createUnresolved(host, port.getPort()); + NettyClientPoolHandler poolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler()); + NettyConnectPool pool = new NettyConnectPool(address, maxCount, timeout, group, cls, poolHandler); Channel acquire = pool.acquire(timeout); Assert.assertNotNull(acquire); pool.release(acquire); @@ -118,7 +120,7 @@ public class NettyConnectPoolTest { @Override public int getPort() { - return 8888; + return 8890; } } } \ No newline at end of file diff --git a/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/NettyProxyCenterTest.java b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/NettyProxyCenterTest.java index b6623248..b75cef3b 100644 --- a/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/NettyProxyCenterTest.java +++ b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/NettyProxyCenterTest.java @@ -19,26 +19,36 @@ package cn.hippo4j.rpc.support; import cn.hippo4j.common.web.exception.IllegalException; import cn.hippo4j.rpc.discovery.ServerPort; -import cn.hippo4j.rpc.handler.AbstractNettyClientPoolHandler; +import cn.hippo4j.rpc.handler.NettyClientPoolHandler; import cn.hippo4j.rpc.handler.NettyClientTakeHandler; import org.junit.Assert; import org.junit.Test; +import java.net.InetSocketAddress; + public class NettyProxyCenterTest { ServerPort port = new TestServerPort(); @Test public void getProxy() { - AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler()); - ProxyInterface localhost = NettyProxyCenter.getProxy(ProxyInterface.class, "localhost", port, handler); + InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", port.getPort()); + NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler()); + ProxyInterface localhost = NettyProxyCenter.getProxy(ProxyInterface.class, address, handler); + Assert.assertNotNull(localhost); + } + + @Test + public void createProxy() { + ProxyInterface localhost = NettyProxyCenter.getProxy(ProxyInterface.class, "localhost:8894"); Assert.assertNotNull(localhost); } @Test(expected = IllegalException.class) public void getProxyTest() { - AbstractNettyClientPoolHandler handler = new AbstractNettyClientPoolHandler(new NettyClientTakeHandler()); - ProxyClass localhost = NettyProxyCenter.getProxy(ProxyClass.class, "localhost", port, handler); + InetSocketAddress address = InetSocketAddress.createUnresolved("localhost", port.getPort()); + NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler()); + ProxyClass localhost = NettyProxyCenter.getProxy(ProxyClass.class, address, handler); Assert.assertNotNull(localhost); } @@ -55,7 +65,7 @@ public class NettyProxyCenterTest { @Override public int getPort() { - return 8888; + return 8894; } } } \ No newline at end of file diff --git a/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/NettyServerSupportTest.java b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/NettyServerSupportTest.java index c8531f33..5f66f865 100644 --- a/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/NettyServerSupportTest.java +++ b/hippo4j-rpc/src/test/java/cn/hippo4j/rpc/support/NettyServerSupportTest.java @@ -22,15 +22,14 @@ import org.junit.Assert; import org.junit.Test; import java.io.IOException; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; public class NettyServerSupportTest { @Test public void bind() throws IOException { - NettyServerSupport support = new NettyServerSupport(() -> 8890, InstanceServerLoader.class); - CompletableFuture.runAsync(support::bind); + NettyServerSupport support = new NettyServerSupport(() -> 8891, InstanceServerLoader.class); + support.bind(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { diff --git a/hippo4j-rpc/src/test/resources/META-INF/services/cn.hippo4j.rpc.discovery.InstanceServerLoader b/hippo4j-rpc/src/test/resources/META-INF/services/cn.hippo4j.rpc.discovery.InstanceServerLoader index 3818d6c7..3014d7e6 100644 --- a/hippo4j-rpc/src/test/resources/META-INF/services/cn.hippo4j.rpc.discovery.InstanceServerLoader +++ b/hippo4j-rpc/src/test/resources/META-INF/services/cn.hippo4j.rpc.discovery.InstanceServerLoader @@ -1 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + cn.hippo4j.rpc.discovery.InstanceServerLoaderImpl \ No newline at end of file