fix : Simplify the RPC module and remove redundant design (#1187) (#812)

pull/1206/head
pizihao 2 years ago
parent 619b2da733
commit d705ed5fd7

@ -22,8 +22,6 @@ import cn.hippo4j.common.web.exception.IllegalException;
import cn.hippo4j.rpc.exception.TimeOutException;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import cn.hippo4j.rpc.process.ActivePostProcess;
import cn.hippo4j.rpc.process.ActiveProcessChain;
import cn.hippo4j.rpc.support.NettyConnectPool;
import cn.hippo4j.rpc.support.NettyConnectPoolHolder;
import cn.hippo4j.rpc.support.ResultHolder;
@ -35,8 +33,6 @@ import io.netty.channel.pool.ChannelPoolHandler;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.locks.LockSupport;
@ -54,30 +50,22 @@ public class NettyClientConnection implements ClientConnection {
*/
long timeout = 30000L;
EventLoopGroup worker = new NioEventLoopGroup();
ActiveProcessChain activeProcessChain;
NettyConnectPool connectionPool;
ChannelFuture future;
Channel channel;
public NettyClientConnection(InetSocketAddress address,
List<ActivePostProcess> activeProcesses,
ChannelPoolHandler handler) {
Assert.notNull(worker);
this.address = address;
this.activeProcessChain = new ActiveProcessChain(activeProcesses);
this.connectionPool = NettyConnectPoolHolder.getPool(address, timeout, worker, handler);
}
public NettyClientConnection(InetSocketAddress address, ChannelPoolHandler handler) {
this(address, new LinkedList<>(), handler);
}
@Override
public Response connect(Request request) {
activeProcessChain.applyPreHandle(request);
this.channel = connectionPool.acquire(timeout);
boolean debugEnabled = log.isDebugEnabled();
Response response = null;
Response response;
try {
String key = request.getKey();
this.future = channel.writeAndFlush(request);
@ -91,16 +79,13 @@ public class NettyClientConnection implements ClientConnection {
if (response == null) {
throw new TimeOutException("Timeout waiting for server-side response");
}
activeProcessChain.applyPostHandle(request, response);
if (debugEnabled) {
log.debug("The response from {}:{} was received successfully with the response key {}.", address.getHostName(), address.getPort(), key);
}
return response;
} catch (Exception ex) {
activeProcessChain.afterCompletion(request, response, ex);
throw new IllegalException(ex);
} finally {
activeProcessChain.afterCompletion(request, response, null);
connectionPool.release(this.channel);
}
}

@ -83,28 +83,4 @@ public abstract class AbstractNettyHandlerManager implements HandlerManager<Chan
this.handlerEntities.add(getHandlerEntity(firstIndex.getAndIncrement(), handler, name));
return this;
}
/**
* {@inheritDoc}
*
* @param handler handler
* @return NettyHandlerManager
*/
public AbstractNettyHandlerManager addLast(ChannelHandler handler) {
Assert.notNull(handler);
this.handlerEntities.add(getHandlerEntity(lastIndex.getAndIncrement(), handler, null));
return this;
}
/**
* {@inheritDoc}
*
* @param handler handler
* @return NettyHandlerManager
*/
public AbstractNettyHandlerManager addFirst(ChannelHandler handler) {
Assert.notNull(handler);
this.handlerEntities.add(getHandlerEntity(firstIndex.getAndDecrement(), handler, null));
return this;
}
}

@ -46,20 +46,6 @@ public interface HandlerManager<T> {
*/
HandlerManager<T> addFirst(String name, T handler);
/**
* Add handler to the end of the Handler chain, without specifying a name
*
* @param handler handler
*/
HandlerManager<T> addLast(T handler);
/**
* Adds handler to the head of the Handler chain, without specifying a name
*
* @param handler handler
*/
HandlerManager<T> addFirst(T handler);
/**
* Whether handler exists
*

@ -62,18 +62,6 @@ public class NettyClientPoolHandler extends AbstractNettyHandlerManager implemen
return this;
}
@Override
public NettyClientPoolHandler addLast(ChannelHandler handler) {
super.addLast(handler);
return this;
}
@Override
public NettyClientPoolHandler addFirst(ChannelHandler handler) {
super.addFirst(handler);
return this;
}
@Override
public void channelReleased(Channel ch) {
ch.writeAndFlush(Unpooled.EMPTY_BUFFER);

@ -24,14 +24,10 @@ import cn.hippo4j.rpc.discovery.Instance;
import cn.hippo4j.rpc.model.DefaultResponse;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import cn.hippo4j.rpc.process.ActivePostProcess;
import cn.hippo4j.rpc.process.ActiveProcessChain;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import java.lang.reflect.Method;
import java.util.LinkedList;
import java.util.List;
/**
* netty adaptation layer
@ -41,16 +37,10 @@ import java.util.List;
@ChannelHandler.Sharable
public class NettyServerTakeHandler extends AbstractNettyTakeHandler implements ConnectHandler {
ActiveProcessChain activeProcessChain;
Instance instance;
public NettyServerTakeHandler(List<ActivePostProcess> processes, Instance instance) {
this.activeProcessChain = new ActiveProcessChain(processes);
this.instance = instance;
}
public NettyServerTakeHandler(Instance instance) {
this(new LinkedList<>(), instance);
this.instance = instance;
}
@Override
@ -65,24 +55,17 @@ public class NettyServerTakeHandler extends AbstractNettyTakeHandler implements
@Override
public Response sendHandler(Request request) {
if (!activeProcessChain.applyPreHandle(request)) {
return null;
}
Response response = null;
Response response;
try {
Class<?> cls = ClassRegistry.get(request.getClassName());
Method method = ReflectUtil.getMethodByName(cls, request.getMethodName(), request.getParameterTypes());
Assert.notNull(method);
Object invoke = ReflectUtil.invoke(instance.getInstance(cls), method, request.getParameters());
response = new DefaultResponse(request.getKey(), invoke.getClass(), invoke);
activeProcessChain.applyPostHandle(request, response);
return response;
} catch (Exception e) {
response = new DefaultResponse(request.getKey(), e, e.getMessage());
activeProcessChain.afterCompletion(request, response, e);
return response;
} finally {
activeProcessChain.afterCompletion(request, response, null);
}
}

@ -1,62 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.process;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
/**
* Callback while the connection is in progress
*
* @since 1.5.1
*/
public interface ActivePostProcess {
/**
* Client: After establishing a connection and before passing parameters<br>
* Server: Receives parameters and performs pre-call operations<br>
*
* @param request request
* @return Whether to continue the execution. If it is a client, the returned value does not affect subsequent execution
*/
default boolean preHandler(Request request) {
return true;
}
/**
* Client: Action after receiving a response<br>
* Server: performs the operation after the call<br>
*
* @param request request
* @param response response
*/
default void postHandler(Request request, Response response) {
// NO SOMETHING
}
/**
* Called when an exception or resource is cleaned
*
* @param request request
* @param response response
* @param e Exception
*/
default void afterCompletion(Request request, Response response, Exception e) {
// NO SOMETHING
}
}

@ -1,105 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.process;
import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* Processor chain for easier processing of processors in different scenarios<br>
* reference resources: spring HandlerExecutionChain
*
* @see ActivePostProcess
* @since 1.5.1
*/
@Slf4j
public final class ActiveProcessChain {
/**
* A collection of processors that will be applied to their assigned programs.
* Processors will perform different actions on different occasions for both the server and the client,
* but the execution period of that action must be the same
*/
List<ActivePostProcess> processes;
/**
* index <br>
* that identifies where the {@link ActivePostProcess#preHandler(Request)} processing is performed<br>
* This allows for the fact that some processors will add shutable operations to the class<br>
* eg: {@link java.io.Closeable}, The {@link ActivePostProcess#afterCompletion(Request, Response, Exception)}
* operation is not performed after an exception if the preprocessor is not executed
*/
int index = -1;
public ActiveProcessChain(List<ActivePostProcess> processes) {
this.processes = processes;
}
public ActiveProcessChain(ActivePostProcess... processes) {
this((processes != null ? Arrays.asList(processes) : Collections.emptyList()));
}
/**
* Apply postHandle methods of registered processes.
*/
public boolean applyPreHandle(Request request) {
for (int i = 0; i < this.processes.size(); i++) {
ActivePostProcess handle = processes.get(i);
if (!handle.preHandler(request)) {
afterCompletion(request, null, null);
return false;
}
this.index = i;
}
return true;
}
/**
* Apply postHandle methods of registered processes.
*/
public void applyPostHandle(Request request, Response response) {
for (int i = processes.size() - 1; i >= 0; i--) {
ActivePostProcess handle = processes.get(i);
handle.postHandler(request, response);
}
}
/**
* Trigger afterCompletion callbacks on the mapped ActivePostProcess.
* Will just invoke afterCompletion for all interceptors whose preHandle invocation
* has successfully completed and returned true.
*/
public void afterCompletion(Request request, Response response, Exception ex) {
for (int i = this.index; i >= 0; i--) {
ActivePostProcess handle = processes.get(i);
try {
handle.afterCompletion(request, response, ex);
} catch (Throwable e) {
if (log.isErrorEnabled()) {
log.error("HandlerInterceptor.afterCompletion threw exception", e);
}
}
}
}
}

@ -72,6 +72,10 @@ public class NettyServerConnection extends AbstractNettyHandlerManager implement
@Override
public void bind(ServerPort port) {
int serverPort = port.getPort();
if (serverPort < 0 || serverPort > 65535) {
throw new ConnectionException("The port number " + serverPort + " is outside 0~65535, which is not a legal port number");
}
ServerBootstrap server = new ServerBootstrap();
server.group(leader, worker)
.channel(socketChannelCls)
@ -79,7 +83,7 @@ public class NettyServerConnection extends AbstractNettyHandlerManager implement
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
protected void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new NettyEncoder());
pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
@ -95,10 +99,10 @@ public class NettyServerConnection extends AbstractNettyHandlerManager implement
}
});
try {
this.future = server.bind(port.getPort()).sync();
this.future = server.bind(serverPort).sync();
this.channel = this.future.channel();
if (log.isDebugEnabled()) {
log.debug("The server is started and can receive requests. The listening port is {}", port.getPort());
log.debug("The server is started and can receive requests. The listening port is {}", serverPort);
}
this.port = port;
this.future.channel().closeFuture().sync();
@ -142,16 +146,4 @@ public class NettyServerConnection extends AbstractNettyHandlerManager implement
return this;
}
@Override
public NettyServerConnection addLast(ChannelHandler handler) {
super.addLast(handler);
return this;
}
@Override
public NettyServerConnection addFirst(ChannelHandler handler) {
super.addFirst(handler);
return this;
}
}

@ -75,7 +75,7 @@ public final class NettyClientSupport {
? (NettyClientPoolHandler) handlerManager
: new NettyClientPoolHandler();
if (handler.isEmpty()) {
handler.addFirst(new NettyClientTakeHandler());
handler.addFirst(null, new NettyClientTakeHandler());
}
NettyClientConnection connection = new NettyClientConnection(address, handler);
return new RPCClient(connection);

@ -96,7 +96,7 @@ public class NettyServerSupport implements Server {
: new NettyServerConnection();
// Assign a default handler if no handler exists
if (connection.isEmpty()) {
connection.addFirst(new NettyServerTakeHandler(new DefaultInstance()));
connection.addFirst(null, new NettyServerTakeHandler(new DefaultInstance()));
}
server = new RPCServer(connection, serverPort);
}

Loading…
Cancel
Save