fix : Adjust the directory structure and add the supporting classes on the server side

pull/945/head
pizihao 3 years ago
parent 59cb0b28c3
commit d2e569d188

@ -17,8 +17,8 @@
package cn.hippo4j.rpc.client; package cn.hippo4j.rpc.client;
import cn.hippo4j.rpc.request.Request; import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.response.Response; import cn.hippo4j.rpc.model.Response;
import java.io.Closeable; import java.io.Closeable;

@ -18,8 +18,8 @@
package cn.hippo4j.rpc.client; package cn.hippo4j.rpc.client;
import cn.hippo4j.rpc.handler.Connection; import cn.hippo4j.rpc.handler.Connection;
import cn.hippo4j.rpc.request.Request; import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.response.Response; import cn.hippo4j.rpc.model.Response;
/** /**
* Applicable to client connections * Applicable to client connections

@ -19,11 +19,12 @@ package cn.hippo4j.rpc.client;
import cn.hippo4j.common.toolkit.Assert; import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.common.web.exception.IllegalException; import cn.hippo4j.common.web.exception.IllegalException;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.exception.TimeOutException; import cn.hippo4j.rpc.exception.TimeOutException;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import cn.hippo4j.rpc.process.ActivePostProcess; import cn.hippo4j.rpc.process.ActivePostProcess;
import cn.hippo4j.rpc.process.ActiveProcessChain; import cn.hippo4j.rpc.process.ActiveProcessChain;
import cn.hippo4j.rpc.request.Request;
import cn.hippo4j.rpc.response.Response;
import cn.hippo4j.rpc.support.NettyConnectPool; import cn.hippo4j.rpc.support.NettyConnectPool;
import cn.hippo4j.rpc.support.NettyConnectPoolHolder; import cn.hippo4j.rpc.support.NettyConnectPoolHolder;
import cn.hippo4j.rpc.support.ResultHolder; import cn.hippo4j.rpc.support.ResultHolder;
@ -45,7 +46,7 @@ import java.util.concurrent.locks.LockSupport;
public class NettyClientConnection implements ClientConnection { public class NettyClientConnection implements ClientConnection {
String host; String host;
Integer port; ServerPort port;
// Obtain the connection timeout period. The default value is 30s // Obtain the connection timeout period. The default value is 30s
long timeout = 30000L; long timeout = 30000L;
EventLoopGroup worker = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup();
@ -54,7 +55,7 @@ public class NettyClientConnection implements ClientConnection {
ChannelFuture future; ChannelFuture future;
Channel channel; Channel channel;
public NettyClientConnection(String host, int port, public NettyClientConnection(String host, ServerPort port,
List<ActivePostProcess> activeProcesses, List<ActivePostProcess> activeProcesses,
ChannelPoolHandler handler) { ChannelPoolHandler handler) {
Assert.notNull(worker); Assert.notNull(worker);
@ -64,7 +65,7 @@ public class NettyClientConnection implements ClientConnection {
this.connectionPool = NettyConnectPoolHolder.getPool(host, port, timeout, worker, handler); this.connectionPool = NettyConnectPoolHolder.getPool(host, port, timeout, worker, handler);
} }
public NettyClientConnection(String host, int port, ChannelPoolHandler handler) { public NettyClientConnection(String host, ServerPort port, ChannelPoolHandler handler) {
this(host, port, new LinkedList<>(), handler); this(host, port, new LinkedList<>(), handler);
} }
@ -76,7 +77,7 @@ public class NettyClientConnection implements ClientConnection {
try { try {
String key = request.getKey(); String key = request.getKey();
this.future = channel.writeAndFlush(request); this.future = channel.writeAndFlush(request);
log.info("Call successful, target address is {}:{}, request key is {}", host, port, key); log.info("Call successful, target address is {}:{}, request key is {}", host, port.getPort(), key);
// Wait for execution to complete // Wait for execution to complete
ResultHolder.putThread(key, Thread.currentThread()); ResultHolder.putThread(key, Thread.currentThread());
LockSupport.parkNanos(timeout() * 1000000); LockSupport.parkNanos(timeout() * 1000000);
@ -85,7 +86,7 @@ public class NettyClientConnection implements ClientConnection {
throw new TimeOutException("Timeout waiting for server-side response"); throw new TimeOutException("Timeout waiting for server-side response");
} }
activeProcessChain.applyPostHandle(request, response); activeProcessChain.applyPostHandle(request, response);
log.info("The response from {}:{} was received successfully with the response key {}.", host, port, key); log.info("The response from {}:{} was received successfully with the response key {}.", host, port.getPort(), key);
return response; return response;
} catch (Exception ex) { } catch (Exception ex) {
activeProcessChain.afterCompletion(request, response, ex); activeProcessChain.afterCompletion(request, response, ex);

@ -17,8 +17,8 @@
package cn.hippo4j.rpc.client; package cn.hippo4j.rpc.client;
import cn.hippo4j.rpc.request.Request; import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.response.Response; import cn.hippo4j.rpc.model.Response;
import java.io.IOException; import java.io.IOException;

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package cn.hippo4j.rpc.support; package cn.hippo4j.rpc.discovery;
import lombok.AccessLevel; import lombok.AccessLevel;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;

@ -15,19 +15,28 @@
* limitations under the License. * limitations under the License.
*/ */
package cn.hippo4j.rpc.support; package cn.hippo4j.rpc.discovery;
import cn.hippo4j.common.toolkit.ReflectUtil; import cn.hippo4j.common.toolkit.ReflectUtil;
import cn.hippo4j.common.web.exception.IllegalException; import cn.hippo4j.common.web.exception.IllegalException;
import java.util.Iterator;
import java.util.ServiceLoader;
/** /**
* Simply creating an instance of a class by its name and its specific type, * You simply create an instance of a class based on its name and specific type.
* and then throwing an exception if it is an interface, is not elegant * Load through the ServiceLoader first. If the load fails, load directly through the instantiation.
* If it is an interface, throw an exception. This is not elegant implementation
*/ */
public class DefaultInstance implements Instance { public class DefaultInstance implements Instance {
@Override @Override
public Object getInstance(Class<?> cls) { public Object getInstance(Class<?> cls) {
ServiceLoader<?> load = ServiceLoader.load(cls);
Iterator<?> iterator = load.iterator();
if (iterator.hasNext()) {
return iterator.next();
}
return ReflectUtil.createInstance(cls); return ReflectUtil.createInstance(cls);
} }

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package cn.hippo4j.rpc.support; package cn.hippo4j.rpc.discovery;
/** /**
* Instance interface to get an instance * Instance interface to get an instance

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.discovery;
/**
*
*/
public interface ServerPort {
/**
*
* @return
*/
int getPort();
}

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package cn.hippo4j.rpc.support; package cn.hippo4j.rpc.discovery;
import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.config.ApplicationContextHolder;

@ -18,7 +18,7 @@
package cn.hippo4j.rpc.handler; package cn.hippo4j.rpc.handler;
import cn.hippo4j.rpc.exception.ConnectionException; import cn.hippo4j.rpc.exception.ConnectionException;
import cn.hippo4j.rpc.response.Response; import cn.hippo4j.rpc.model.Response;
import cn.hippo4j.rpc.support.ResultHolder; import cn.hippo4j.rpc.support.ResultHolder;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;

@ -17,8 +17,8 @@
package cn.hippo4j.rpc.handler; package cn.hippo4j.rpc.handler;
import cn.hippo4j.rpc.request.Request; import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.response.Response; import cn.hippo4j.rpc.model.Response;
/** /**
* The handler in each connection, where the specific behavior of the connection * The handler in each connection, where the specific behavior of the connection

@ -18,7 +18,7 @@
package cn.hippo4j.rpc.handler; package cn.hippo4j.rpc.handler;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Getter;
/** /**
* Manage the Handler used in the processing.<br> * Manage the Handler used in the processing.<br>
@ -56,6 +56,13 @@ public interface HandlerManager<T> {
*/ */
HandlerManager<T> addFirst(T handler); HandlerManager<T> addFirst(T handler);
/**
* Whether handler exists
*
* @return Whether handler exists
*/
boolean isEmpty();
/** /**
* Create a handler * Create a handler
* *
@ -68,7 +75,7 @@ public interface HandlerManager<T> {
return new HandlerEntity<>(order, handler, name); return new HandlerEntity<>(order, handler, name);
} }
@Data @Getter
@AllArgsConstructor @AllArgsConstructor
class HandlerEntity<T> implements Comparable<HandlerEntity<T>> { class HandlerEntity<T> implements Comparable<HandlerEntity<T>> {

@ -47,21 +47,25 @@ public class NettyClientPoolHandler extends NettyHandlerManager implements Chann
super(); super();
} }
@Override
public NettyClientPoolHandler addLast(String name, ChannelHandler handler) { public NettyClientPoolHandler addLast(String name, ChannelHandler handler) {
super.addLast(name, handler); super.addLast(name, handler);
return this; return this;
} }
@Override
public NettyClientPoolHandler addFirst(String name, ChannelHandler handler) { public NettyClientPoolHandler addFirst(String name, ChannelHandler handler) {
super.addFirst(name, handler); super.addFirst(name, handler);
return this; return this;
} }
@Override
public NettyClientPoolHandler addLast(ChannelHandler handler) { public NettyClientPoolHandler addLast(ChannelHandler handler) {
super.addLast(handler); super.addLast(handler);
return this; return this;
} }
@Override
public NettyClientPoolHandler addFirst(ChannelHandler handler) { public NettyClientPoolHandler addFirst(ChannelHandler handler) {
super.addFirst(handler); super.addFirst(handler);
return this; return this;
@ -85,7 +89,7 @@ public class NettyClientPoolHandler extends NettyHandlerManager implements Chann
.setTcpNoDelay(false); .setTcpNoDelay(false);
ch.pipeline().addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null))); ch.pipeline().addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null)));
ch.pipeline().addLast(new NettyEncoder()); ch.pipeline().addLast(new NettyEncoder());
this.handlers.stream() this.handlerEntities.stream()
.sorted() .sorted()
.forEach(h -> { .forEach(h -> {
if (h.getName() == null) { if (h.getName() == null) {

@ -18,7 +18,7 @@
package cn.hippo4j.rpc.handler; package cn.hippo4j.rpc.handler;
import cn.hippo4j.common.web.exception.IllegalException; import cn.hippo4j.common.web.exception.IllegalException;
import cn.hippo4j.rpc.response.Response; import cn.hippo4j.rpc.model.Response;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;

@ -29,25 +29,31 @@ import java.util.stream.Collectors;
*/ */
public abstract class NettyHandlerManager implements HandlerManager<ChannelHandler> { public abstract class NettyHandlerManager implements HandlerManager<ChannelHandler> {
protected final List<HandlerEntity<ChannelHandler>> handlers; protected final List<HandlerEntity<ChannelHandler>> handlerEntities;
AtomicLong firstIndex = new AtomicLong(-1); AtomicLong firstIndex = new AtomicLong(-1);
AtomicLong lastIndex = new AtomicLong(0); AtomicLong lastIndex = new AtomicLong(0);
protected NettyHandlerManager(List<ChannelHandler> handlers) { protected NettyHandlerManager(List<ChannelHandler> handlerEntities) {
this.handlers = handlers.stream() Assert.notNull(handlerEntities);
this.handlerEntities = handlerEntities.stream()
.filter(Objects::nonNull) .filter(Objects::nonNull)
.map(c -> getHandlerEntity(lastIndex.getAndIncrement(), c, null)) .map(c -> getHandlerEntity(lastIndex.getAndIncrement(), c, null))
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
protected NettyHandlerManager(ChannelHandler... handlers) { protected NettyHandlerManager(ChannelHandler... handlerEntities) {
this(handlers != null ? Arrays.asList(handlers) : Collections.emptyList()); this(handlerEntities != null ? Arrays.asList(handlerEntities) : Collections.emptyList());
} }
protected NettyHandlerManager() { protected NettyHandlerManager() {
this.handlers = new LinkedList<>(); this.handlerEntities = new LinkedList<>();
}
@Override
public boolean isEmpty() {
return handlerEntities.isEmpty();
} }
/** /**
@ -59,7 +65,7 @@ public abstract class NettyHandlerManager implements HandlerManager<ChannelHandl
*/ */
public NettyHandlerManager addLast(String name, ChannelHandler handler) { public NettyHandlerManager addLast(String name, ChannelHandler handler) {
Assert.notNull(handler); Assert.notNull(handler);
this.handlers.add(getHandlerEntity(lastIndex.getAndIncrement(), handler, name)); this.handlerEntities.add(getHandlerEntity(lastIndex.getAndIncrement(), handler, name));
return this; return this;
} }
@ -72,7 +78,7 @@ public abstract class NettyHandlerManager implements HandlerManager<ChannelHandl
*/ */
public NettyHandlerManager addFirst(String name, ChannelHandler handler) { public NettyHandlerManager addFirst(String name, ChannelHandler handler) {
Assert.notNull(handler); Assert.notNull(handler);
this.handlers.add(getHandlerEntity(firstIndex.getAndIncrement(), handler, name)); this.handlerEntities.add(getHandlerEntity(firstIndex.getAndIncrement(), handler, name));
return this; return this;
} }
@ -84,7 +90,7 @@ public abstract class NettyHandlerManager implements HandlerManager<ChannelHandl
*/ */
public NettyHandlerManager addLast(ChannelHandler handler) { public NettyHandlerManager addLast(ChannelHandler handler) {
Assert.notNull(handler); Assert.notNull(handler);
this.handlers.add(getHandlerEntity(lastIndex.getAndIncrement(), handler, null)); this.handlerEntities.add(getHandlerEntity(lastIndex.getAndIncrement(), handler, null));
return this; return this;
} }
@ -96,7 +102,7 @@ public abstract class NettyHandlerManager implements HandlerManager<ChannelHandl
*/ */
public NettyHandlerManager addFirst(ChannelHandler handler) { public NettyHandlerManager addFirst(ChannelHandler handler) {
Assert.notNull(handler); Assert.notNull(handler);
this.handlers.add(getHandlerEntity(firstIndex.getAndDecrement(), handler, null)); this.handlerEntities.add(getHandlerEntity(firstIndex.getAndDecrement(), handler, null));
return this; return this;
} }
} }

@ -21,11 +21,11 @@ import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.common.toolkit.ReflectUtil; import cn.hippo4j.common.toolkit.ReflectUtil;
import cn.hippo4j.rpc.process.ActivePostProcess; import cn.hippo4j.rpc.process.ActivePostProcess;
import cn.hippo4j.rpc.process.ActiveProcessChain; import cn.hippo4j.rpc.process.ActiveProcessChain;
import cn.hippo4j.rpc.request.Request; import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.response.DefaultResponse; import cn.hippo4j.rpc.model.DefaultResponse;
import cn.hippo4j.rpc.response.Response; import cn.hippo4j.rpc.model.Response;
import cn.hippo4j.rpc.support.ClassRegistry; import cn.hippo4j.rpc.discovery.ClassRegistry;
import cn.hippo4j.rpc.support.Instance; import cn.hippo4j.rpc.discovery.Instance;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package cn.hippo4j.rpc.request; package cn.hippo4j.rpc.model;
import java.io.IOException; import java.io.IOException;
import java.io.ObjectInputStream; import java.io.ObjectInputStream;

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package cn.hippo4j.rpc.response; package cn.hippo4j.rpc.model;
import java.io.IOException; import java.io.IOException;
import java.io.ObjectInputStream; import java.io.ObjectInputStream;

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package cn.hippo4j.rpc.request; package cn.hippo4j.rpc.model;
import java.io.Serializable; import java.io.Serializable;

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package cn.hippo4j.rpc.response; package cn.hippo4j.rpc.model;
import java.io.Serializable; import java.io.Serializable;

@ -17,8 +17,8 @@
package cn.hippo4j.rpc.process; package cn.hippo4j.rpc.process;
import cn.hippo4j.rpc.request.Request; import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.response.Response; import cn.hippo4j.rpc.model.Response;
/** /**
* Callback while the connection is in progress * Callback while the connection is in progress

@ -17,8 +17,8 @@
package cn.hippo4j.rpc.process; package cn.hippo4j.rpc.process;
import cn.hippo4j.rpc.request.Request; import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.response.Response; import cn.hippo4j.rpc.model.Response;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.Arrays; import java.util.Arrays;

@ -20,6 +20,7 @@ package cn.hippo4j.rpc.server;
import cn.hippo4j.common.toolkit.Assert; import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.rpc.coder.NettyDecoder; import cn.hippo4j.rpc.coder.NettyDecoder;
import cn.hippo4j.rpc.coder.NettyEncoder; import cn.hippo4j.rpc.coder.NettyEncoder;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.NettyHandlerManager; import cn.hippo4j.rpc.handler.NettyHandlerManager;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*; import io.netty.channel.*;
@ -39,7 +40,7 @@ import java.util.List;
@Slf4j @Slf4j
public class NettyServerConnection extends NettyHandlerManager implements ServerConnection { public class NettyServerConnection extends NettyHandlerManager implements ServerConnection {
Integer port; ServerPort port;
EventLoopGroup leader; EventLoopGroup leader;
EventLoopGroup worker; EventLoopGroup worker;
Class<? extends ServerChannel> socketChannelCls = NioServerSocketChannel.class; Class<? extends ServerChannel> socketChannelCls = NioServerSocketChannel.class;
@ -48,7 +49,6 @@ public class NettyServerConnection extends NettyHandlerManager implements Server
public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, List<ChannelHandler> handlers) { public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, List<ChannelHandler> handlers) {
super(handlers); super(handlers);
Assert.notNull(handlers);
Assert.notNull(leader); Assert.notNull(leader);
Assert.notNull(worker); Assert.notNull(worker);
this.leader = leader; this.leader = leader;
@ -68,7 +68,7 @@ public class NettyServerConnection extends NettyHandlerManager implements Server
} }
@Override @Override
public void bind(int port) { public void bind(ServerPort port) {
ServerBootstrap server = new ServerBootstrap(); ServerBootstrap server = new ServerBootstrap();
server.group(leader, worker) server.group(leader, worker)
.channel(socketChannelCls) .channel(socketChannelCls)
@ -79,7 +79,7 @@ public class NettyServerConnection extends NettyHandlerManager implements Server
protected void initChannel(SocketChannel ch) throws Exception { protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null))); ch.pipeline().addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null)));
ch.pipeline().addLast(new NettyEncoder()); ch.pipeline().addLast(new NettyEncoder());
handlers.stream() handlerEntities.stream()
.sorted() .sorted()
.forEach(h -> { .forEach(h -> {
if (h.getName() == null) { if (h.getName() == null) {
@ -91,7 +91,7 @@ public class NettyServerConnection extends NettyHandlerManager implements Server
} }
}); });
try { try {
this.future = server.bind(port); this.future = server.bind(port.getPort());
this.channel = this.future.channel(); this.channel = this.future.channel();
log.info("The server is started and can receive requests. The listening port is {}", port); log.info("The server is started and can receive requests. The listening port is {}", port);
this.port = port; this.port = port;
@ -117,21 +117,25 @@ public class NettyServerConnection extends NettyHandlerManager implements Server
return channel.isActive(); return channel.isActive();
} }
@Override
public NettyServerConnection addLast(String name, ChannelHandler handler) { public NettyServerConnection addLast(String name, ChannelHandler handler) {
super.addLast(name, handler); super.addLast(name, handler);
return this; return this;
} }
@Override
public NettyServerConnection addFirst(String name, ChannelHandler handler) { public NettyServerConnection addFirst(String name, ChannelHandler handler) {
super.addFirst(name, handler); super.addFirst(name, handler);
return this; return this;
} }
@Override
public NettyServerConnection addLast(ChannelHandler handler) { public NettyServerConnection addLast(ChannelHandler handler) {
super.addLast(handler); super.addLast(handler);
return this; return this;
} }
@Override
public NettyServerConnection addFirst(ChannelHandler handler) { public NettyServerConnection addFirst(ChannelHandler handler) {
super.addFirst(handler); super.addFirst(handler);
return this; return this;

@ -17,6 +17,8 @@
package cn.hippo4j.rpc.server; package cn.hippo4j.rpc.server;
import cn.hippo4j.rpc.discovery.ServerPort;
import java.io.IOException; import java.io.IOException;
/** /**
@ -24,10 +26,10 @@ import java.io.IOException;
*/ */
public class RPCServer implements Server { public class RPCServer implements Server {
int port; ServerPort port;
ServerConnection serverConnection; ServerConnection serverConnection;
public RPCServer(int port, ServerConnection serverConnection) { public RPCServer(ServerPort port, ServerConnection serverConnection) {
this.port = port; this.port = port;
this.serverConnection = serverConnection; this.serverConnection = serverConnection;
} }

@ -17,6 +17,7 @@
package cn.hippo4j.rpc.server; package cn.hippo4j.rpc.server;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.Connection; import cn.hippo4j.rpc.handler.Connection;
/** /**
@ -27,6 +28,6 @@ public interface ServerConnection extends Connection {
/** /**
* Bind ports and process them * Bind ports and process them
*/ */
void bind(int port); void bind(ServerPort port);
} }

@ -17,6 +17,7 @@
package cn.hippo4j.rpc.support; package cn.hippo4j.rpc.support;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.exception.ConnectionException; import cn.hippo4j.rpc.exception.ConnectionException;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
@ -43,13 +44,13 @@ public class NettyConnectPool {
ChannelPoolHandler handler; ChannelPoolHandler handler;
ChannelPool pool; ChannelPool pool;
String host; String host;
int port; ServerPort port;
public NettyConnectPool(String host, int port, int maxConnect, public NettyConnectPool(String host, ServerPort port, int maxConnect,
long timeout, EventLoopGroup worker, long timeout, EventLoopGroup worker,
Class<? extends Channel> socketChannelCls, Class<? extends Channel> socketChannelCls,
ChannelPoolHandler handler) { ChannelPoolHandler handler) {
InetSocketAddress socketAddress = InetSocketAddress.createUnresolved(host, port); InetSocketAddress socketAddress = InetSocketAddress.createUnresolved(host, port.getPort());
Bootstrap bootstrap = new Bootstrap() Bootstrap bootstrap = new Bootstrap()
.group(worker) .group(worker)
.channel(socketChannelCls) .channel(socketChannelCls)
@ -59,7 +60,7 @@ public class NettyConnectPool {
this.handler = handler; this.handler = handler;
this.pool = new FixedChannelPool(bootstrap, handler, healthCheck, acquireTimeoutAction, this.pool = new FixedChannelPool(bootstrap, handler, healthCheck, acquireTimeoutAction,
timeout, maxConnect, maxPendingAcquires, true, true); timeout, maxConnect, maxPendingAcquires, true, true);
log.info("The connection pool is established with the connection target {}:{}", host, port); log.info("The connection pool is established with the connection target {}:{}", host, port.getPort());
NettyConnectPoolHolder.createPool(host, port, this); NettyConnectPoolHolder.createPool(host, port, this);
} }

@ -17,6 +17,7 @@
package cn.hippo4j.rpc.support; package cn.hippo4j.rpc.support;
import cn.hippo4j.rpc.discovery.ServerPort;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.pool.ChannelPoolHandler; import io.netty.channel.pool.ChannelPoolHandler;
@ -38,7 +39,7 @@ public class NettyConnectPoolHolder {
static Map<String, NettyConnectPool> connectPoolMap = new ConcurrentHashMap<>(); static Map<String, NettyConnectPool> connectPoolMap = new ConcurrentHashMap<>();
private static NettyConnectPool initPool(String host, int port, private static NettyConnectPool initPool(String host, ServerPort port,
long timeout, EventLoopGroup worker, long timeout, EventLoopGroup worker,
ChannelPoolHandler handler) { ChannelPoolHandler handler) {
return new NettyConnectPool( return new NettyConnectPool(
@ -48,8 +49,8 @@ public class NettyConnectPoolHolder {
handler); handler);
} }
private static String getKey(String host, int port) { private static String getKey(String host, ServerPort port) {
return host + ":" + port; return host + ":" + port.getPort();
} }
/** /**
@ -60,7 +61,7 @@ public class NettyConnectPoolHolder {
* @param port the port * @param port the port
* @param pool This parameter applies only to the connection pool of netty * @param pool This parameter applies only to the connection pool of netty
*/ */
public static void createPool(String host, int port, NettyConnectPool pool) { public static void createPool(String host, ServerPort port, NettyConnectPool pool) {
connectPoolMap.put(getKey(host, port), pool); connectPoolMap.put(getKey(host, port), pool);
} }
@ -71,7 +72,7 @@ public class NettyConnectPoolHolder {
* @param port the port * @param port the port
* @return Map to the connection pool * @return Map to the connection pool
*/ */
public static NettyConnectPool getPool(String host, int port) { public static NettyConnectPool getPool(String host, ServerPort port) {
return connectPoolMap.get(getKey(host, port)); return connectPoolMap.get(getKey(host, port));
} }
@ -86,7 +87,7 @@ public class NettyConnectPoolHolder {
* @param handler the chandler for netty * @param handler the chandler for netty
* @return Map to the connection pool * @return Map to the connection pool
*/ */
public static synchronized NettyConnectPool getPool(String host, int port, public static synchronized NettyConnectPool getPool(String host, ServerPort port,
long timeout, EventLoopGroup worker, long timeout, EventLoopGroup worker,
ChannelPoolHandler handler) { ChannelPoolHandler handler) {
/* /*
@ -102,7 +103,7 @@ public class NettyConnectPoolHolder {
* @param host host * @param host host
* @param port port * @param port port
*/ */
public static void remove(String host, int port) { public static void remove(String host, ServerPort port) {
connectPoolMap.remove(getKey(host, port)); connectPoolMap.remove(getKey(host, port));
} }

@ -20,9 +20,10 @@ package cn.hippo4j.rpc.support;
import cn.hippo4j.common.toolkit.IdUtil; import cn.hippo4j.common.toolkit.IdUtil;
import cn.hippo4j.common.web.exception.IllegalException; import cn.hippo4j.common.web.exception.IllegalException;
import cn.hippo4j.rpc.client.NettyClientConnection; import cn.hippo4j.rpc.client.NettyClientConnection;
import cn.hippo4j.rpc.request.DefaultRequest; import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.request.Request; import cn.hippo4j.rpc.model.DefaultRequest;
import cn.hippo4j.rpc.response.Response; import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import io.netty.channel.pool.ChannelPoolHandler; import io.netty.channel.pool.ChannelPoolHandler;
import lombok.AccessLevel; import lombok.AccessLevel;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
@ -51,13 +52,13 @@ public class NettyProxyCenter {
* @param handler the pool handler for netty * @param handler the pool handler for netty
* @return Proxy objects * @return Proxy objects
*/ */
public static <T> T getProxy(Class<T> cls, String host, int port, ChannelPoolHandler handler) { public static <T> T getProxy(Class<T> cls, String host, ServerPort port, ChannelPoolHandler handler) {
NettyClientConnection connection = new NettyClientConnection(host, port, handler); NettyClientConnection connection = new NettyClientConnection(host, port, handler);
return getProxy(connection, cls, host, port); return getProxy(connection, cls, host, port);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public static <T> T getProxy(NettyClientConnection connection, Class<T> cls, String host, int port) { public static <T> T getProxy(NettyClientConnection connection, Class<T> cls, String host, ServerPort port) {
boolean b = cls.isInterface(); boolean b = cls.isInterface();
if (!b) { if (!b) {
throw new IllegalException(cls.getName() + "is not a Interface"); throw new IllegalException(cls.getName() + "is not a Interface");

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.support;
import cn.hippo4j.rpc.discovery.ClassRegistry;
import cn.hippo4j.rpc.discovery.DefaultInstance;
import cn.hippo4j.rpc.discovery.ServerPort;
import cn.hippo4j.rpc.handler.HandlerManager;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.rpc.server.NettyServerConnection;
import cn.hippo4j.rpc.server.RPCServer;
import cn.hippo4j.rpc.server.Server;
import io.netty.channel.ChannelHandler;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* This is a server-side build class that allows you to quickly prepare data on the server side and start the server side.<br>
* <p>
* The composite pattern is adopted, which means that it is itself a server-side implementation, so it is stateless.
*/
public class NettyServerSupport implements Server {
/**
* The interface that the server side can call,
* All the methods in the interface are brokered during initialization
*/
List<Class<?>> classes;
/**
* Extract the port number of the web container,
* which is the port information exposed by the server
*/
ServerPort serverPort;
/**
* ChannelHandler
*/
HandlerManager<ChannelHandler> handlerManager;
Server server;
public NettyServerSupport(ServerPort serverPort, Class<?>... classes) {
this(serverPort, new NettyServerConnection(), classes);
}
public NettyServerSupport(ServerPort serverPort, List<Class<?>> classes) {
this(serverPort, new NettyServerConnection(), classes);
}
public NettyServerSupport(ServerPort serverPort, HandlerManager<ChannelHandler> handlerManager, Class<?>... classes) {
this(serverPort, handlerManager, classes != null ? Arrays.asList(classes) : Collections.emptyList());
}
public NettyServerSupport(ServerPort serverPort, HandlerManager<ChannelHandler> handlerManager, List<Class<?>> classes) {
this.classes = classes;
this.serverPort = serverPort;
this.handlerManager = handlerManager;
initServer();
}
/**
* Initializes the entire server side, which includes interface registration, processors, and ports.<br>
* Only interfaces are registered during registration. Classes and abstract classes are not registered.
* If no processor is available, a default processor is provided
*/
private void initServer() {
// Register the interface that can be invoked
classes.stream().filter(Class::isInterface)
.forEach(cls -> ClassRegistry.put(cls.getName(), cls));
NettyServerConnection connection = (handlerManager instanceof NettyServerConnection)
? (NettyServerConnection) handlerManager
: new NettyServerConnection();
// Assign a default handler if no handler exists
if (connection.isEmpty()) {
connection.addLast(new NettyServerTakeHandler(new DefaultInstance()));
}
server = new RPCServer(serverPort, connection);
}
@Override
public void bind() {
server.bind();
}
@Override
public boolean isActive() {
return server.isActive();
}
@Override
public void close() throws IOException {
server.close();
}
}
Loading…
Cancel
Save