diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/Client.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/Client.java index 81957ce5..ad7f1271 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/Client.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/Client.java @@ -17,8 +17,8 @@ package cn.hippo4j.rpc.client; -import cn.hippo4j.rpc.request.Request; -import cn.hippo4j.rpc.response.Response; +import cn.hippo4j.rpc.model.Request; +import cn.hippo4j.rpc.model.Response; import java.io.Closeable; diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/ClientConnection.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/ClientConnection.java index 8d04a213..43791228 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/ClientConnection.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/ClientConnection.java @@ -18,8 +18,8 @@ package cn.hippo4j.rpc.client; import cn.hippo4j.rpc.handler.Connection; -import cn.hippo4j.rpc.request.Request; -import cn.hippo4j.rpc.response.Response; +import cn.hippo4j.rpc.model.Request; +import cn.hippo4j.rpc.model.Response; /** * Applicable to client connections diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/NettyClientConnection.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/NettyClientConnection.java index a725ed6a..88c3bc93 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/NettyClientConnection.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/NettyClientConnection.java @@ -19,11 +19,12 @@ package cn.hippo4j.rpc.client; import cn.hippo4j.common.toolkit.Assert; import cn.hippo4j.common.web.exception.IllegalException; +import cn.hippo4j.rpc.discovery.ServerPort; import cn.hippo4j.rpc.exception.TimeOutException; +import cn.hippo4j.rpc.model.Request; +import cn.hippo4j.rpc.model.Response; import cn.hippo4j.rpc.process.ActivePostProcess; import cn.hippo4j.rpc.process.ActiveProcessChain; -import cn.hippo4j.rpc.request.Request; -import cn.hippo4j.rpc.response.Response; import cn.hippo4j.rpc.support.NettyConnectPool; import cn.hippo4j.rpc.support.NettyConnectPoolHolder; import cn.hippo4j.rpc.support.ResultHolder; @@ -45,7 +46,7 @@ import java.util.concurrent.locks.LockSupport; public class NettyClientConnection implements ClientConnection { String host; - Integer port; + ServerPort port; // Obtain the connection timeout period. The default value is 30s long timeout = 30000L; EventLoopGroup worker = new NioEventLoopGroup(); @@ -54,7 +55,7 @@ public class NettyClientConnection implements ClientConnection { ChannelFuture future; Channel channel; - public NettyClientConnection(String host, int port, + public NettyClientConnection(String host, ServerPort port, List activeProcesses, ChannelPoolHandler handler) { Assert.notNull(worker); @@ -64,7 +65,7 @@ public class NettyClientConnection implements ClientConnection { this.connectionPool = NettyConnectPoolHolder.getPool(host, port, timeout, worker, handler); } - public NettyClientConnection(String host, int port, ChannelPoolHandler handler) { + public NettyClientConnection(String host, ServerPort port, ChannelPoolHandler handler) { this(host, port, new LinkedList<>(), handler); } @@ -76,7 +77,7 @@ public class NettyClientConnection implements ClientConnection { try { String key = request.getKey(); this.future = channel.writeAndFlush(request); - log.info("Call successful, target address is {}:{}, request key is {}", host, port, key); + log.info("Call successful, target address is {}:{}, request key is {}", host, port.getPort(), key); // Wait for execution to complete ResultHolder.putThread(key, Thread.currentThread()); LockSupport.parkNanos(timeout() * 1000000); @@ -85,7 +86,7 @@ public class NettyClientConnection implements ClientConnection { throw new TimeOutException("Timeout waiting for server-side response"); } activeProcessChain.applyPostHandle(request, response); - log.info("The response from {}:{} was received successfully with the response key {}.", host, port, key); + log.info("The response from {}:{} was received successfully with the response key {}.", host, port.getPort(), key); return response; } catch (Exception ex) { activeProcessChain.afterCompletion(request, response, ex); diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/RPCClient.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/RPCClient.java index e3094076..abc3ed9e 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/RPCClient.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/client/RPCClient.java @@ -17,8 +17,8 @@ package cn.hippo4j.rpc.client; -import cn.hippo4j.rpc.request.Request; -import cn.hippo4j.rpc.response.Response; +import cn.hippo4j.rpc.model.Request; +import cn.hippo4j.rpc.model.Response; import java.io.IOException; diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/ClassRegistry.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/discovery/ClassRegistry.java similarity index 98% rename from hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/ClassRegistry.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/discovery/ClassRegistry.java index ebcc86f6..5af58c87 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/ClassRegistry.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/discovery/ClassRegistry.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package cn.hippo4j.rpc.support; +package cn.hippo4j.rpc.discovery; import lombok.AccessLevel; import lombok.NoArgsConstructor; diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/DefaultInstance.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/discovery/DefaultInstance.java similarity index 71% rename from hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/DefaultInstance.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/discovery/DefaultInstance.java index c6cf9a6c..4c6d6741 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/DefaultInstance.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/discovery/DefaultInstance.java @@ -15,19 +15,28 @@ * limitations under the License. */ -package cn.hippo4j.rpc.support; +package cn.hippo4j.rpc.discovery; import cn.hippo4j.common.toolkit.ReflectUtil; import cn.hippo4j.common.web.exception.IllegalException; +import java.util.Iterator; +import java.util.ServiceLoader; + /** - * Simply creating an instance of a class by its name and its specific type, - * and then throwing an exception if it is an interface, is not elegant + * You simply create an instance of a class based on its name and specific type. + * Load through the ServiceLoader first. If the load fails, load directly through the instantiation. + * If it is an interface, throw an exception. This is not elegant implementation */ public class DefaultInstance implements Instance { @Override public Object getInstance(Class cls) { + ServiceLoader load = ServiceLoader.load(cls); + Iterator iterator = load.iterator(); + if (iterator.hasNext()) { + return iterator.next(); + } return ReflectUtil.createInstance(cls); } diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/Instance.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/discovery/Instance.java similarity index 97% rename from hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/Instance.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/discovery/Instance.java index 840dff3a..aab7163c 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/Instance.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/discovery/Instance.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package cn.hippo4j.rpc.support; +package cn.hippo4j.rpc.discovery; /** * Instance interface to get an instance diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/discovery/ServerPort.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/discovery/ServerPort.java new file mode 100644 index 00000000..b53edb0d --- /dev/null +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/discovery/ServerPort.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.rpc.discovery; + +/** + * + */ +public interface ServerPort { + + /** + * + * @return + */ + int getPort(); + +} diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/SpringContextInstance.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/discovery/SpringContextInstance.java similarity index 97% rename from hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/SpringContextInstance.java rename to hippo4j-rpc/src/main/java/cn/hippo4j/rpc/discovery/SpringContextInstance.java index a0d2db7a..5cbc79c7 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/SpringContextInstance.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/discovery/SpringContextInstance.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package cn.hippo4j.rpc.support; +package cn.hippo4j.rpc.discovery; import cn.hippo4j.common.config.ApplicationContextHolder; diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/AbstractNettyTakeHandler.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/AbstractNettyTakeHandler.java index 08fac1db..9bdf49c2 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/AbstractNettyTakeHandler.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/AbstractNettyTakeHandler.java @@ -18,7 +18,7 @@ package cn.hippo4j.rpc.handler; import cn.hippo4j.rpc.exception.ConnectionException; -import cn.hippo4j.rpc.response.Response; +import cn.hippo4j.rpc.model.Response; import cn.hippo4j.rpc.support.ResultHolder; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/ConnectHandler.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/ConnectHandler.java index 21e36fb9..f12a846e 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/ConnectHandler.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/ConnectHandler.java @@ -17,8 +17,8 @@ package cn.hippo4j.rpc.handler; -import cn.hippo4j.rpc.request.Request; -import cn.hippo4j.rpc.response.Response; +import cn.hippo4j.rpc.model.Request; +import cn.hippo4j.rpc.model.Response; /** * The handler in each connection, where the specific behavior of the connection diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/HandlerManager.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/HandlerManager.java index 7d0f3d29..f4ebb4b9 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/HandlerManager.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/HandlerManager.java @@ -18,7 +18,7 @@ package cn.hippo4j.rpc.handler; import lombok.AllArgsConstructor; -import lombok.Data; +import lombok.Getter; /** * Manage the Handler used in the processing.
@@ -56,6 +56,13 @@ public interface HandlerManager { */ HandlerManager addFirst(T handler); + /** + * Whether handler exists + * + * @return Whether handler exists + */ + boolean isEmpty(); + /** * Create a handler * @@ -68,7 +75,7 @@ public interface HandlerManager { return new HandlerEntity<>(order, handler, name); } - @Data + @Getter @AllArgsConstructor class HandlerEntity implements Comparable> { diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyClientPoolHandler.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyClientPoolHandler.java index b9e38d36..58fd002a 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyClientPoolHandler.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyClientPoolHandler.java @@ -47,21 +47,25 @@ public class NettyClientPoolHandler extends NettyHandlerManager implements Chann super(); } + @Override public NettyClientPoolHandler addLast(String name, ChannelHandler handler) { super.addLast(name, handler); return this; } + @Override public NettyClientPoolHandler addFirst(String name, ChannelHandler handler) { super.addFirst(name, handler); return this; } + @Override public NettyClientPoolHandler addLast(ChannelHandler handler) { super.addLast(handler); return this; } + @Override public NettyClientPoolHandler addFirst(ChannelHandler handler) { super.addFirst(handler); return this; @@ -85,7 +89,7 @@ public class NettyClientPoolHandler extends NettyHandlerManager implements Chann .setTcpNoDelay(false); ch.pipeline().addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null))); ch.pipeline().addLast(new NettyEncoder()); - this.handlers.stream() + this.handlerEntities.stream() .sorted() .forEach(h -> { if (h.getName() == null) { diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyClientTakeHandler.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyClientTakeHandler.java index b2cd5d6f..1841d7f9 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyClientTakeHandler.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyClientTakeHandler.java @@ -18,7 +18,7 @@ package cn.hippo4j.rpc.handler; import cn.hippo4j.common.web.exception.IllegalException; -import cn.hippo4j.rpc.response.Response; +import cn.hippo4j.rpc.model.Response; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyHandlerManager.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyHandlerManager.java index 0649dde3..18b7b5d5 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyHandlerManager.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/NettyHandlerManager.java @@ -29,25 +29,31 @@ import java.util.stream.Collectors; */ public abstract class NettyHandlerManager implements HandlerManager { - protected final List> handlers; + protected final List> handlerEntities; AtomicLong firstIndex = new AtomicLong(-1); AtomicLong lastIndex = new AtomicLong(0); - protected NettyHandlerManager(List handlers) { - this.handlers = handlers.stream() + protected NettyHandlerManager(List handlerEntities) { + Assert.notNull(handlerEntities); + this.handlerEntities = handlerEntities.stream() .filter(Objects::nonNull) .map(c -> getHandlerEntity(lastIndex.getAndIncrement(), c, null)) .collect(Collectors.toList()); } - protected NettyHandlerManager(ChannelHandler... handlers) { - this(handlers != null ? Arrays.asList(handlers) : Collections.emptyList()); + protected NettyHandlerManager(ChannelHandler... handlerEntities) { + this(handlerEntities != null ? Arrays.asList(handlerEntities) : Collections.emptyList()); } protected NettyHandlerManager() { - this.handlers = new LinkedList<>(); + this.handlerEntities = new LinkedList<>(); + } + + @Override + public boolean isEmpty() { + return handlerEntities.isEmpty(); } /** @@ -59,7 +65,7 @@ public abstract class NettyHandlerManager implements HandlerManager socketChannelCls = NioServerSocketChannel.class; @@ -48,7 +49,6 @@ public class NettyServerConnection extends NettyHandlerManager implements Server public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, List handlers) { super(handlers); - Assert.notNull(handlers); Assert.notNull(leader); Assert.notNull(worker); this.leader = leader; @@ -68,7 +68,7 @@ public class NettyServerConnection extends NettyHandlerManager implements Server } @Override - public void bind(int port) { + public void bind(ServerPort port) { ServerBootstrap server = new ServerBootstrap(); server.group(leader, worker) .channel(socketChannelCls) @@ -79,7 +79,7 @@ public class NettyServerConnection extends NettyHandlerManager implements Server protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null))); ch.pipeline().addLast(new NettyEncoder()); - handlers.stream() + handlerEntities.stream() .sorted() .forEach(h -> { if (h.getName() == null) { @@ -91,7 +91,7 @@ public class NettyServerConnection extends NettyHandlerManager implements Server } }); try { - this.future = server.bind(port); + this.future = server.bind(port.getPort()); this.channel = this.future.channel(); log.info("The server is started and can receive requests. The listening port is {}", port); this.port = port; @@ -117,21 +117,25 @@ public class NettyServerConnection extends NettyHandlerManager implements Server return channel.isActive(); } + @Override public NettyServerConnection addLast(String name, ChannelHandler handler) { super.addLast(name, handler); return this; } + @Override public NettyServerConnection addFirst(String name, ChannelHandler handler) { super.addFirst(name, handler); return this; } + @Override public NettyServerConnection addLast(ChannelHandler handler) { super.addLast(handler); return this; } + @Override public NettyServerConnection addFirst(ChannelHandler handler) { super.addFirst(handler); return this; diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/RPCServer.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/RPCServer.java index abb5f6ff..bc3bcc43 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/RPCServer.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/RPCServer.java @@ -17,6 +17,8 @@ package cn.hippo4j.rpc.server; +import cn.hippo4j.rpc.discovery.ServerPort; + import java.io.IOException; /** @@ -24,10 +26,10 @@ import java.io.IOException; */ public class RPCServer implements Server { - int port; + ServerPort port; ServerConnection serverConnection; - public RPCServer(int port, ServerConnection serverConnection) { + public RPCServer(ServerPort port, ServerConnection serverConnection) { this.port = port; this.serverConnection = serverConnection; } diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/ServerConnection.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/ServerConnection.java index fcb5a9e1..37edfc62 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/ServerConnection.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/ServerConnection.java @@ -17,6 +17,7 @@ package cn.hippo4j.rpc.server; +import cn.hippo4j.rpc.discovery.ServerPort; import cn.hippo4j.rpc.handler.Connection; /** @@ -27,6 +28,6 @@ public interface ServerConnection extends Connection { /** * Bind ports and process them */ - void bind(int port); + void bind(ServerPort port); } diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPool.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPool.java index a34159e9..a8406d52 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPool.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPool.java @@ -17,6 +17,7 @@ package cn.hippo4j.rpc.support; +import cn.hippo4j.rpc.discovery.ServerPort; import cn.hippo4j.rpc.exception.ConnectionException; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; @@ -43,13 +44,13 @@ public class NettyConnectPool { ChannelPoolHandler handler; ChannelPool pool; String host; - int port; + ServerPort port; - public NettyConnectPool(String host, int port, int maxConnect, + public NettyConnectPool(String host, ServerPort port, int maxConnect, long timeout, EventLoopGroup worker, Class socketChannelCls, ChannelPoolHandler handler) { - InetSocketAddress socketAddress = InetSocketAddress.createUnresolved(host, port); + InetSocketAddress socketAddress = InetSocketAddress.createUnresolved(host, port.getPort()); Bootstrap bootstrap = new Bootstrap() .group(worker) .channel(socketChannelCls) @@ -59,7 +60,7 @@ public class NettyConnectPool { this.handler = handler; this.pool = new FixedChannelPool(bootstrap, handler, healthCheck, acquireTimeoutAction, timeout, maxConnect, maxPendingAcquires, true, true); - log.info("The connection pool is established with the connection target {}:{}", host, port); + log.info("The connection pool is established with the connection target {}:{}", host, port.getPort()); NettyConnectPoolHolder.createPool(host, port, this); } diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPoolHolder.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPoolHolder.java index e143948f..32475363 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPoolHolder.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPoolHolder.java @@ -17,6 +17,7 @@ package cn.hippo4j.rpc.support; +import cn.hippo4j.rpc.discovery.ServerPort; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.pool.ChannelPoolHandler; @@ -38,7 +39,7 @@ public class NettyConnectPoolHolder { static Map connectPoolMap = new ConcurrentHashMap<>(); - private static NettyConnectPool initPool(String host, int port, + private static NettyConnectPool initPool(String host, ServerPort port, long timeout, EventLoopGroup worker, ChannelPoolHandler handler) { return new NettyConnectPool( @@ -48,8 +49,8 @@ public class NettyConnectPoolHolder { handler); } - private static String getKey(String host, int port) { - return host + ":" + port; + private static String getKey(String host, ServerPort port) { + return host + ":" + port.getPort(); } /** @@ -60,7 +61,7 @@ public class NettyConnectPoolHolder { * @param port the port * @param pool This parameter applies only to the connection pool of netty */ - public static void createPool(String host, int port, NettyConnectPool pool) { + public static void createPool(String host, ServerPort port, NettyConnectPool pool) { connectPoolMap.put(getKey(host, port), pool); } @@ -71,7 +72,7 @@ public class NettyConnectPoolHolder { * @param port the port * @return Map to the connection pool */ - public static NettyConnectPool getPool(String host, int port) { + public static NettyConnectPool getPool(String host, ServerPort port) { return connectPoolMap.get(getKey(host, port)); } @@ -86,7 +87,7 @@ public class NettyConnectPoolHolder { * @param handler the chandler for netty * @return Map to the connection pool */ - public static synchronized NettyConnectPool getPool(String host, int port, + public static synchronized NettyConnectPool getPool(String host, ServerPort port, long timeout, EventLoopGroup worker, ChannelPoolHandler handler) { /* @@ -102,7 +103,7 @@ public class NettyConnectPoolHolder { * @param host host * @param port port */ - public static void remove(String host, int port) { + public static void remove(String host, ServerPort port) { connectPoolMap.remove(getKey(host, port)); } diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyProxyCenter.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyProxyCenter.java index e73ef944..690c8571 100644 --- a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyProxyCenter.java +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyProxyCenter.java @@ -20,9 +20,10 @@ package cn.hippo4j.rpc.support; import cn.hippo4j.common.toolkit.IdUtil; import cn.hippo4j.common.web.exception.IllegalException; import cn.hippo4j.rpc.client.NettyClientConnection; -import cn.hippo4j.rpc.request.DefaultRequest; -import cn.hippo4j.rpc.request.Request; -import cn.hippo4j.rpc.response.Response; +import cn.hippo4j.rpc.discovery.ServerPort; +import cn.hippo4j.rpc.model.DefaultRequest; +import cn.hippo4j.rpc.model.Request; +import cn.hippo4j.rpc.model.Response; import io.netty.channel.pool.ChannelPoolHandler; import lombok.AccessLevel; import lombok.NoArgsConstructor; @@ -51,13 +52,13 @@ public class NettyProxyCenter { * @param handler the pool handler for netty * @return Proxy objects */ - public static T getProxy(Class cls, String host, int port, ChannelPoolHandler handler) { + public static T getProxy(Class cls, String host, ServerPort port, ChannelPoolHandler handler) { NettyClientConnection connection = new NettyClientConnection(host, port, handler); return getProxy(connection, cls, host, port); } @SuppressWarnings("unchecked") - public static T getProxy(NettyClientConnection connection, Class cls, String host, int port) { + public static T getProxy(NettyClientConnection connection, Class cls, String host, ServerPort port) { boolean b = cls.isInterface(); if (!b) { throw new IllegalException(cls.getName() + "is not a Interface"); diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyServerSupport.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyServerSupport.java new file mode 100644 index 00000000..27e0473b --- /dev/null +++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyServerSupport.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.rpc.support; + +import cn.hippo4j.rpc.discovery.ClassRegistry; +import cn.hippo4j.rpc.discovery.DefaultInstance; +import cn.hippo4j.rpc.discovery.ServerPort; +import cn.hippo4j.rpc.handler.HandlerManager; +import cn.hippo4j.rpc.handler.NettyServerTakeHandler; +import cn.hippo4j.rpc.server.NettyServerConnection; +import cn.hippo4j.rpc.server.RPCServer; +import cn.hippo4j.rpc.server.Server; +import io.netty.channel.ChannelHandler; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * This is a server-side build class that allows you to quickly prepare data on the server side and start the server side.
+ *

+ * The composite pattern is adopted, which means that it is itself a server-side implementation, so it is stateless. + */ +public class NettyServerSupport implements Server { + + /** + * The interface that the server side can call, + * All the methods in the interface are brokered during initialization + */ + List> classes; + + /** + * Extract the port number of the web container, + * which is the port information exposed by the server + */ + ServerPort serverPort; + + /** + * ChannelHandler + */ + HandlerManager handlerManager; + + Server server; + + public NettyServerSupport(ServerPort serverPort, Class... classes) { + this(serverPort, new NettyServerConnection(), classes); + } + + public NettyServerSupport(ServerPort serverPort, List> classes) { + this(serverPort, new NettyServerConnection(), classes); + } + + public NettyServerSupport(ServerPort serverPort, HandlerManager handlerManager, Class... classes) { + this(serverPort, handlerManager, classes != null ? Arrays.asList(classes) : Collections.emptyList()); + } + + public NettyServerSupport(ServerPort serverPort, HandlerManager handlerManager, List> classes) { + this.classes = classes; + this.serverPort = serverPort; + this.handlerManager = handlerManager; + initServer(); + } + + /** + * Initializes the entire server side, which includes interface registration, processors, and ports.
+ * Only interfaces are registered during registration. Classes and abstract classes are not registered. + * If no processor is available, a default processor is provided + */ + private void initServer() { + // Register the interface that can be invoked + classes.stream().filter(Class::isInterface) + .forEach(cls -> ClassRegistry.put(cls.getName(), cls)); + NettyServerConnection connection = (handlerManager instanceof NettyServerConnection) + ? (NettyServerConnection) handlerManager + : new NettyServerConnection(); + // Assign a default handler if no handler exists + if (connection.isEmpty()) { + connection.addLast(new NettyServerTakeHandler(new DefaultInstance())); + } + server = new RPCServer(serverPort, connection); + } + + @Override + public void bind() { + server.bind(); + } + + @Override + public boolean isActive() { + return server.isActive(); + } + + @Override + public void close() throws IOException { + server.close(); + } +}