add new model hippo4j-rpc and Transfer rpc code to hippo4j-rpc (#912)

* fix : add new model hippo4j-rpc (#812)

* fix : fix : Transfer code to hippo4j-rpc (#812)

* fix : Add set multiple ChannelHandler(#812)

* fix : Code format modification
pull/913/head
pizihao 2 years ago committed by GitHub
parent f19ec20c57
commit ed19c983f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-all</artifactId>
<version>${revision}</version>
</parent>
<artifactId>hippo4j-rpc</artifactId>
<packaging>pom</packaging>
<properties>
<maven.deploy.skip>true</maven.deploy.skip>
</properties>
<dependencies>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-common</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
</dependencies>
</project>

@ -15,10 +15,10 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.client;
package cn.hippo4j.rpc.client;
import cn.hippo4j.config.rpc.request.Request;
import cn.hippo4j.config.rpc.response.Response;
import cn.hippo4j.rpc.request.Request;
import cn.hippo4j.rpc.response.Response;
import java.io.Closeable;
@ -31,4 +31,12 @@ public interface Client extends Closeable {
* Start the client and try to send and receive data
*/
Response connection(Request request);
/**
* Check whether the client is active
*
* @return Whether active
*/
boolean isActive();
}

@ -15,11 +15,11 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.client;
package cn.hippo4j.rpc.client;
import cn.hippo4j.config.rpc.handler.Connection;
import cn.hippo4j.config.rpc.request.Request;
import cn.hippo4j.config.rpc.response.Response;
import cn.hippo4j.rpc.handler.Connection;
import cn.hippo4j.rpc.request.Request;
import cn.hippo4j.rpc.response.Response;
/**
* Applicable to client connections

@ -15,21 +15,23 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.client;
package cn.hippo4j.rpc.client;
import cn.hippo4j.config.rpc.exception.TimeOutException;
import cn.hippo4j.config.rpc.process.ActivePostProcess;
import cn.hippo4j.config.rpc.request.Request;
import cn.hippo4j.config.rpc.response.Response;
import cn.hippo4j.config.rpc.support.NettyConnectPool;
import cn.hippo4j.config.rpc.support.NettyConnectPoolHolder;
import cn.hippo4j.config.rpc.support.ResultHolder;
import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.common.web.exception.IllegalException;
import cn.hippo4j.rpc.exception.TimeOutException;
import cn.hippo4j.rpc.process.ActivePostProcess;
import cn.hippo4j.rpc.process.ActiveProcessChain;
import cn.hippo4j.rpc.request.Request;
import cn.hippo4j.rpc.response.Response;
import cn.hippo4j.rpc.support.NettyConnectPool;
import cn.hippo4j.rpc.support.NettyConnectPoolHolder;
import cn.hippo4j.rpc.support.ResultHolder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.pool.ChannelPoolHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
@ -46,47 +48,50 @@ public class NettyClientConnection implements ClientConnection {
Integer port;
// Obtain the connection timeout period. The default value is 30s
long timeout = 30000L;
Channel channel;
EventLoopGroup worker = new NioEventLoopGroup();
List<ActivePostProcess> activeProcesses;
ChannelFuture future;
ActiveProcessChain activeProcessChain;
NettyConnectPool connectionPool;
ChannelFuture future;
Channel channel;
public NettyClientConnection(String host, int port,
List<ActivePostProcess> activeProcesses) {
List<ActivePostProcess> activeProcesses,
ChannelPoolHandler handler) {
Assert.notNull(worker);
this.host = host;
this.port = port;
this.activeProcesses = activeProcesses;
this.connectionPool = NettyConnectPoolHolder.getPool(host, port, timeout, worker);
this.activeProcessChain = new ActiveProcessChain(activeProcesses);
this.connectionPool = NettyConnectPoolHolder.getPool(host, port, timeout, worker, handler);
}
public NettyClientConnection(String host, int port) {
this(host, port, new LinkedList<>());
public NettyClientConnection(String host, int port, ChannelPoolHandler handler) {
this(host, port, new LinkedList<>(), handler);
}
@Override
public Response connect(Request request) {
preHandlers(request);
activeProcessChain.applyPreHandle(request);
this.channel = connectionPool.acquire(timeout);
Response response = null;
try {
String key = request.getKey();
this.future = channel.writeAndFlush(request);
log.info("Call successful, target address is {}:{}, request key is {}", host, port, key);
// Wait for execution to complete
ResultHolder.put(key, Thread.currentThread());
ResultHolder.putThread(key, Thread.currentThread());
LockSupport.parkNanos(timeout() * 1000000);
Response response = ResultHolder.get(key);
response = ResultHolder.get(key);
if (response == null) {
throw new TimeOutException("Timeout waiting for server-side response");
}
postHandlers(request, response);
activeProcessChain.applyPostHandle(request, response);
log.info("The response from {}:{} was received successfully with the response key {}.", host, port, key);
return response;
} catch (Exception ex) {
afterCompletions(request, null, ex);
activeProcessChain.afterCompletion(request, response, ex);
throw new IllegalException(ex);
} finally {
activeProcessChain.afterCompletion(request, response, null);
connectionPool.release(this.channel);
}
}
@ -111,22 +116,8 @@ public class NettyClientConnection implements ClientConnection {
this.channel.close();
}
private void preHandlers(Request request) {
for (ActivePostProcess process : activeProcesses) {
process.preHandler(request);
}
}
private void postHandlers(Request request, Response response) {
for (ActivePostProcess process : activeProcesses) {
process.postHandler(request, response);
}
}
private void afterCompletions(Request request, Response response, Exception e) {
for (ActivePostProcess process : activeProcesses) {
process.afterCompletion(request, response, e);
}
@Override
public boolean isActive() {
return channel.isActive();
}
}

@ -15,10 +15,10 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.client;
package cn.hippo4j.rpc.client;
import cn.hippo4j.config.rpc.request.Request;
import cn.hippo4j.config.rpc.response.Response;
import cn.hippo4j.rpc.request.Request;
import cn.hippo4j.rpc.response.Response;
import java.io.IOException;
@ -38,6 +38,11 @@ public class RPCClient implements Client {
return clientConnection.connect(request);
}
@Override
public boolean isActive() {
return clientConnection.isActive();
}
/**
* Close the client and release all connections.
*

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.coder;
package cn.hippo4j.rpc.coder;
import java.io.IOException;
import java.io.ObjectOutputStream;

@ -15,9 +15,9 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.coder;
package cn.hippo4j.rpc.coder;
import cn.hippo4j.config.rpc.exception.CoderException;
import cn.hippo4j.rpc.exception.CoderException;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.serialization.ClassResolver;

@ -15,9 +15,9 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.coder;
package cn.hippo4j.rpc.coder;
import cn.hippo4j.config.rpc.exception.CoderException;
import cn.hippo4j.rpc.exception.CoderException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.ChannelHandlerContext;

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.discovery;
package cn.hippo4j.rpc.discovery;
import java.net.InetSocketAddress;

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.exception;
package cn.hippo4j.rpc.exception;
/**
* During decoding and encoding, if an exception occurs, an exception of type {@link CoderException} is thrown,

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.exception;
package cn.hippo4j.rpc.exception;
/**
* If an exception occurs during the connection between the server and the client, an exception of type

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.exception;
package cn.hippo4j.rpc.exception;
/**
* If there is a timeout between the server and the client, you will get a {@link TimeOutException},

@ -15,33 +15,26 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.handler;
package cn.hippo4j.rpc.handler;
import cn.hippo4j.config.rpc.exception.ConnectionException;
import cn.hippo4j.config.rpc.support.ResultHolder;
import cn.hippo4j.config.rpc.response.Response;
import cn.hippo4j.common.web.exception.IllegalException;
import cn.hippo4j.rpc.exception.ConnectionException;
import cn.hippo4j.rpc.response.Response;
import cn.hippo4j.rpc.support.ResultHolder;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* Interconnect with the netty mediation layer
* the abstract base of {@link ConnectHandler} and {@link ChannelInboundHandlerAdapter}
*/
public class NettyClientTakeHandler extends ChannelInboundHandlerAdapter implements ConnectHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
Response response = (Response) msg;
handler(response);
ctx.flush();
} catch (Exception e) {
ctx.close();
throw new IllegalException(e);
}
}
public abstract class AbstractNettyTakeHandler extends ChannelInboundHandlerAdapter implements ConnectHandler {
/**
* Manual disconnection is used here in case the server and client are disconnected due to a sudden exception
*
* @param ctx the context
* @param cause the throwable
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
@ -53,9 +46,15 @@ public class NettyClientTakeHandler extends ChannelInboundHandlerAdapter impleme
}
}
/**
* This is a generic process that puts in the result and wakes up the thread
*
* @param response response
*/
@Override
public void handler(Response response) {
ResultHolder.put(response.getKey(), response);
ResultHolder.wake(response.getKey());
}
}

@ -15,15 +15,16 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.handler;
package cn.hippo4j.rpc.handler;
import cn.hippo4j.config.rpc.request.Request;
import cn.hippo4j.config.rpc.response.Response;
import cn.hippo4j.rpc.request.Request;
import cn.hippo4j.rpc.response.Response;
/**
* The handler in each connection, where the specific behavior of the connection
* must be specified, such as serialization and parsing, requesting and receiving
* requests, and so on
* requests, and so on<br>
*
*/
public interface ConnectHandler {
@ -32,12 +33,13 @@ public interface ConnectHandler {
*
* @param request request
*/
default Response handler(Request request) {
default Response sendHandler(Request request) {
return null;
}
/**
* Processing after receiving Response
* Processing after receiving Response<br>
* This is mainly for subsequent processing of the results
*
* @param response response
*/

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.handler;
package cn.hippo4j.rpc.handler;
import java.io.Closeable;
@ -24,4 +24,18 @@ import java.io.Closeable;
*/
public interface Connection extends Closeable {
/**
* Gets the state of the connection, which is interpreted differently by different programs<br>
* <p>
* Client: Active connection indicates that a connection is being maintained with the server.
* Inactive connection indicates that no connection is being established with the server<br>
* <p>
* Server: The active connection indicates that the server has been started, is receiving ports,
* and can obtain requests at any time. The inactive connection indicates that the server has been
* shut down and the ports have been released
*
* @return Whether active
*/
boolean isActive();
}

@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.handler;
import lombok.AllArgsConstructor;
import lombok.Data;
/**
* Manage the Handler used in the processing.<br>
* The Handler must be able to exist multiple times and be invoked once in a single execution
*/
public interface HandlerManager<T> {
/**
* Add handler to the end of the Handler chain
*
* @param name name
* @param handler handler
*/
HandlerManager<T> addLast(String name, T handler);
/**
* Add handler to the head of the Handler chain
*
* @param name name
* @param handler handler
*/
HandlerManager<T> addFirst(String name, T handler);
/**
* Add handler to the end of the Handler chain, without specifying a name
*
* @param handler handler
*/
HandlerManager<T> addLast(T handler);
/**
* Adds handler to the head of the Handler chain, without specifying a name
*
* @param handler handler
*/
HandlerManager<T> addFirst(T handler);
/**
* Create a handler
*
* @param order order
* @param handler Handler
* @param name Handler name
* @return HandlerEntity
*/
default HandlerEntity<T> getHandlerEntity(long order, T handler, String name) {
return new HandlerEntity<>(order, handler, name);
}
@Data
@AllArgsConstructor
class HandlerEntity<T> implements Comparable<HandlerEntity<T>> {
/**
* order, The Handler with a larger value is executed after the Handler with a smaller value
*/
long order;
/**
* handler
*/
T handler;
/**
* A high level summary of handler functionality
*/
String name;
@Override
public int compareTo(HandlerEntity<T> o) {
return (int) (this.getOrder() - o.getOrder());
}
}
}

@ -15,22 +15,57 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.handler;
package cn.hippo4j.rpc.handler;
import cn.hippo4j.config.rpc.coder.NettyDecoder;
import cn.hippo4j.config.rpc.coder.NettyEncoder;
import cn.hippo4j.rpc.coder.NettyDecoder;
import cn.hippo4j.rpc.coder.NettyEncoder;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
/**
* Processing by the client connection pool handler to clean the buffer and define new connection properties
*/
@Slf4j
public class NettyClientPoolHandler implements ChannelPoolHandler {
public class NettyClientPoolHandler extends NettyHandlerManager implements ChannelPoolHandler {
public NettyClientPoolHandler(List<ChannelHandler> handlers) {
super(handlers);
}
public NettyClientPoolHandler(ChannelHandler... handlers) {
super(handlers);
}
public NettyClientPoolHandler() {
super();
}
public NettyClientPoolHandler addLast(String name, ChannelHandler handler) {
super.addLast(name, handler);
return this;
}
public NettyClientPoolHandler addFirst(String name, ChannelHandler handler) {
super.addFirst(name, handler);
return this;
}
public NettyClientPoolHandler addLast(ChannelHandler handler) {
super.addLast(handler);
return this;
}
public NettyClientPoolHandler addFirst(ChannelHandler handler) {
super.addFirst(handler);
return this;
}
@Override
public void channelReleased(Channel ch) {
@ -50,6 +85,15 @@ public class NettyClientPoolHandler implements ChannelPoolHandler {
.setTcpNoDelay(false);
ch.pipeline().addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null)));
ch.pipeline().addLast(new NettyEncoder());
ch.pipeline().addLast(new NettyClientTakeHandler());
this.handlers.stream()
.sorted()
.forEach(h -> {
if (h.getName() == null) {
ch.pipeline().addLast(h.getHandler());
} else {
ch.pipeline().addLast(h.getName(), h.getHandler());
}
});
}
}

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.handler;
import cn.hippo4j.common.web.exception.IllegalException;
import cn.hippo4j.rpc.response.Response;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
/**
* Interconnect with the netty mediation layer
*/
@ChannelHandler.Sharable
public class NettyClientTakeHandler extends AbstractNettyTakeHandler implements ConnectHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
Response response = (Response) msg;
handler(response);
ctx.flush();
} catch (Exception e) {
ctx.close();
throw new IllegalException(e);
}
}
}

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.handler;
import cn.hippo4j.common.toolkit.Assert;
import io.netty.channel.ChannelHandler;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
/**
* Processor manager for ChannelHandler in netty
*/
public abstract class NettyHandlerManager implements HandlerManager<ChannelHandler> {
protected final List<HandlerEntity<ChannelHandler>> handlers;
AtomicLong firstIndex = new AtomicLong(-1);
AtomicLong lastIndex = new AtomicLong(0);
protected NettyHandlerManager(List<ChannelHandler> handlers) {
this.handlers = handlers.stream()
.filter(Objects::nonNull)
.map(c -> getHandlerEntity(lastIndex.getAndIncrement(), c, null))
.collect(Collectors.toList());
}
protected NettyHandlerManager(ChannelHandler... handlers) {
this(handlers != null ? Arrays.asList(handlers) : Collections.emptyList());
}
protected NettyHandlerManager() {
this.handlers = new LinkedList<>();
}
/**
* {@inheritDoc}
*
* @param name name
* @param handler handler
* @return NettyHandlerManager
*/
public NettyHandlerManager addLast(String name, ChannelHandler handler) {
Assert.notNull(handler);
this.handlers.add(getHandlerEntity(lastIndex.getAndIncrement(), handler, name));
return this;
}
/**
* {@inheritDoc}
*
* @param name name
* @param handler handler
* @return NettyHandlerManager
*/
public NettyHandlerManager addFirst(String name, ChannelHandler handler) {
Assert.notNull(handler);
this.handlers.add(getHandlerEntity(firstIndex.getAndIncrement(), handler, name));
return this;
}
/**
* {@inheritDoc}
*
* @param handler handler
* @return NettyHandlerManager
*/
public NettyHandlerManager addLast(ChannelHandler handler) {
Assert.notNull(handler);
this.handlers.add(getHandlerEntity(lastIndex.getAndIncrement(), handler, null));
return this;
}
/**
* {@inheritDoc}
*
* @param handler handler
* @return NettyHandlerManager
*/
public NettyHandlerManager addFirst(ChannelHandler handler) {
Assert.notNull(handler);
this.handlers.add(getHandlerEntity(firstIndex.getAndDecrement(), handler, null));
return this;
}
}

@ -15,20 +15,19 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.handler;
package cn.hippo4j.rpc.handler;
import cn.hippo4j.config.rpc.exception.ConnectionException;
import cn.hippo4j.config.rpc.process.ActivePostProcess;
import cn.hippo4j.config.rpc.response.DefaultResponse;
import cn.hippo4j.config.rpc.support.ClassRegistry;
import cn.hippo4j.config.rpc.support.Instance;
import cn.hippo4j.config.rpc.request.Request;
import cn.hippo4j.config.rpc.response.Response;
import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.common.toolkit.ReflectUtil;
import io.netty.channel.Channel;
import cn.hippo4j.rpc.process.ActivePostProcess;
import cn.hippo4j.rpc.process.ActiveProcessChain;
import cn.hippo4j.rpc.request.Request;
import cn.hippo4j.rpc.response.DefaultResponse;
import cn.hippo4j.rpc.response.Response;
import cn.hippo4j.rpc.support.ClassRegistry;
import cn.hippo4j.rpc.support.Instance;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.lang.reflect.Method;
import java.util.LinkedList;
@ -37,13 +36,14 @@ import java.util.List;
/**
* netty adaptation layer
*/
public class NettyServerTakeHandler extends ChannelInboundHandlerAdapter implements ConnectHandler {
@ChannelHandler.Sharable
public class NettyServerTakeHandler extends AbstractNettyTakeHandler implements ConnectHandler {
List<ActivePostProcess> processes;
ActiveProcessChain activeProcessChain;
Instance instance;
public NettyServerTakeHandler(List<ActivePostProcess> processes, Instance instance) {
this.processes = processes;
this.activeProcessChain = new ActiveProcessChain(processes);
this.instance = instance;
}
@ -57,58 +57,30 @@ public class NettyServerTakeHandler extends ChannelInboundHandlerAdapter impleme
return;
}
Request request = (Request) msg;
Response response = handler(request);
Response response = sendHandler(request);
ctx.writeAndFlush(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel channel = ctx.channel();
if (channel.isActive()) {
ctx.close();
} else {
throw new ConnectionException(cause);
}
}
@Override
public Response handler(Request request) {
if (!preHandlers(request)) {
public Response sendHandler(Request request) {
if (!activeProcessChain.applyPreHandle(request)) {
return null;
}
Response response = null;
try {
Class<?> cls = ClassRegistry.get(request.getClassName());
Method method = ReflectUtil.getMethodByName(cls, request.getMethodName(), request.getParameterTypes());
Assert.notNull(method);
Object invoke = ReflectUtil.invoke(instance.getInstance(cls), method, request.getParameters());
Response response = new DefaultResponse(request.getKey(), invoke.getClass(), invoke);
postHandlers(request, response);
response = new DefaultResponse(request.getKey(), invoke.getClass(), invoke);
activeProcessChain.applyPostHandle(request, response);
return response;
} catch (Exception e) {
Response response = new DefaultResponse(request.getKey(), e, e.getMessage());
afterCompletions(request, response, e);
response = new DefaultResponse(request.getKey(), e, e.getMessage());
activeProcessChain.afterCompletion(request, response, e);
return response;
}
}
private boolean preHandlers(Request request) {
for (ActivePostProcess process : processes) {
if (!process.preHandler(request)) {
return false;
}
}
return true;
}
private void postHandlers(Request request, Response response) {
for (ActivePostProcess process : processes) {
process.postHandler(request, response);
}
}
private void afterCompletions(Request request, Response response, Exception e) {
for (ActivePostProcess process : processes) {
process.afterCompletion(request, response, e);
} finally {
activeProcessChain.afterCompletion(request, response, null);
}
}

@ -15,10 +15,10 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.process;
package cn.hippo4j.rpc.process;
import cn.hippo4j.config.rpc.request.Request;
import cn.hippo4j.config.rpc.response.Response;
import cn.hippo4j.rpc.request.Request;
import cn.hippo4j.rpc.response.Response;
/**
* Callback while the connection is in progress

@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.process;
import cn.hippo4j.rpc.request.Request;
import cn.hippo4j.rpc.response.Response;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* Processor chain for easier processing of processors in different scenarios<br>
* reference resources: spring HandlerExecutionChain
*
* @see ActivePostProcess
*/
@Slf4j
public final class ActiveProcessChain {
/**
* A collection of processors that will be applied to their assigned programs.
* Processors will perform different actions on different occasions for both the server and the client,
* but the execution period of that action must be the same
*/
List<ActivePostProcess> processes;
/**
* index <br>
* that identifies where the {@link ActivePostProcess#preHandler(Request)} processing is performed<br>
* This allows for the fact that some processors will add shutable operations to the class<br>
* eg: {@link java.io.Closeable}, The {@link ActivePostProcess#afterCompletion(Request, Response, Exception)}
* operation is not performed after an exception if the preprocessor is not executed
*/
int index = -1;
public ActiveProcessChain(List<ActivePostProcess> processes) {
this.processes = processes;
}
public ActiveProcessChain(ActivePostProcess... processes) {
this((processes != null ? Arrays.asList(processes) : Collections.emptyList()));
}
/**
* Apply postHandle methods of registered processes.
*/
public boolean applyPreHandle(Request request) {
for (int i = 0; i < this.processes.size(); i++) {
ActivePostProcess handle = processes.get(i);
if (!handle.preHandler(request)) {
afterCompletion(request, null, null);
return false;
}
this.index = i;
}
return true;
}
/**
* Apply postHandle methods of registered processes.
*/
public void applyPostHandle(Request request, Response response) {
for (int i = processes.size() - 1; i >= 0; i--) {
ActivePostProcess handle = processes.get(i);
handle.postHandler(request, response);
}
}
/**
* Trigger afterCompletion callbacks on the mapped ActivePostProcess.
* Will just invoke afterCompletion for all interceptors whose preHandle invocation
* has successfully completed and returned true.
*/
public void afterCompletion(Request request, Response response, Exception ex) {
for (int i = this.index; i >= 0; i--) {
ActivePostProcess handle = processes.get(i);
try {
handle.afterCompletion(request, response, ex);
} catch (Throwable e) {
log.error("HandlerInterceptor.afterCompletion threw exception", e);
}
}
}
}

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.request;
package cn.hippo4j.rpc.request;
import java.io.IOException;
import java.io.ObjectInputStream;
@ -94,7 +94,7 @@ public final class DefaultRequest implements Request {
if (parameters == null) {
return;
}
// 序列化属性 parameters
// Serialization parameters
for (Object parameter : parameters) {
s.writeObject(parameter);
}
@ -110,7 +110,7 @@ public final class DefaultRequest implements Request {
if (parameterTypes == null) {
return;
}
// 反序列化属性 parameters
// Deserialization parameters
int length = parameterTypes.length;
Object[] a = new Object[length];
for (int i = 0; i < length; i++) {

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.request;
package cn.hippo4j.rpc.request;
import java.io.Serializable;

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.response;
package cn.hippo4j.rpc.response;
import java.io.IOException;
import java.io.ObjectInputStream;
@ -105,7 +105,7 @@ public class DefaultResponse implements Response {
if (obj == null) {
return;
}
// 序列化属性 obj
// Serialization obj
s.writeObject(this.obj);
}
@ -116,7 +116,7 @@ public class DefaultResponse implements Response {
*/
private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
s.defaultReadObject();
// 反序列化属性 obj
// Deserialization obj
this.obj = s.readObject();
}
}

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.response;
package cn.hippo4j.rpc.response;
import java.io.Serializable;

@ -15,14 +15,12 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.server;
package cn.hippo4j.rpc.server;
import cn.hippo4j.config.rpc.coder.NettyDecoder;
import cn.hippo4j.config.rpc.coder.NettyEncoder;
import cn.hippo4j.config.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.config.rpc.process.ActivePostProcess;
import cn.hippo4j.config.rpc.support.Instance;
import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.rpc.coder.NettyDecoder;
import cn.hippo4j.rpc.coder.NettyEncoder;
import cn.hippo4j.rpc.handler.NettyHandlerManager;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
@ -31,44 +29,42 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* adapter to the netty server
*/
@Slf4j
public class NettyServerConnection implements ServerConnection {
public class NettyServerConnection extends NettyHandlerManager implements ServerConnection {
Integer port;
EventLoopGroup leader;
EventLoopGroup worker;
Class<? extends ServerChannel> socketChannelCls = NioServerSocketChannel.class;
List<ActivePostProcess> processes;
Instance instance;
ChannelFuture future;
Channel channel;
public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, List<ActivePostProcess> processes, Instance instance) {
Assert.notNull(processes);
Assert.notNull(instance);
public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, List<ChannelHandler> handlers) {
super(handlers);
Assert.notNull(handlers);
Assert.notNull(leader);
Assert.notNull(worker);
this.leader = leader;
this.worker = worker;
this.processes = processes;
this.instance = instance;
}
public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, Instance instance) {
this(leader, worker, new LinkedList<>(), instance);
public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, ChannelHandler... handlers) {
this(leader, worker, (handlers != null ? Arrays.asList(handlers) : Collections.emptyList()));
}
public NettyServerConnection(List<ActivePostProcess> processes, Instance instance) {
this(new NioEventLoopGroup(), new NioEventLoopGroup(), processes, instance);
public NettyServerConnection(ChannelHandler... handlers) {
this(handlers != null ? Arrays.asList(handlers) : Collections.emptyList());
}
public NettyServerConnection(Instance instance) {
this(new NioEventLoopGroup(), new NioEventLoopGroup(), new LinkedList<>(), instance);
public NettyServerConnection(List<ChannelHandler> handlers) {
this(new NioEventLoopGroup(), new NioEventLoopGroup(), handlers);
}
@Override
@ -83,11 +79,20 @@ public class NettyServerConnection implements ServerConnection {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null)));
ch.pipeline().addLast(new NettyEncoder());
ch.pipeline().addLast(new NettyServerTakeHandler(processes, instance));
handlers.stream()
.sorted()
.forEach(h -> {
if (h.getName() == null) {
ch.pipeline().addLast(h.getHandler());
} else {
ch.pipeline().addLast(h.getName(), h.getHandler());
}
});
}
});
try {
this.future = server.bind(port);
this.channel = this.future.channel();
log.info("The server is started and can receive requests. The listening port is {}", port);
this.port = port;
this.future.channel().closeFuture().sync();
@ -106,4 +111,29 @@ public class NettyServerConnection implements ServerConnection {
this.future.channel().close();
log.info("The server is shut down and no more requests are received. The release port is {}", port);
}
@Override
public boolean isActive() {
return channel.isActive();
}
public NettyServerConnection addLast(String name, ChannelHandler handler) {
super.addLast(name, handler);
return this;
}
public NettyServerConnection addFirst(String name, ChannelHandler handler) {
super.addFirst(name, handler);
return this;
}
public NettyServerConnection addLast(ChannelHandler handler) {
super.addLast(handler);
return this;
}
public NettyServerConnection addFirst(ChannelHandler handler) {
super.addFirst(handler);
return this;
}
}

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.server;
package cn.hippo4j.rpc.server;
import java.io.IOException;
@ -37,6 +37,11 @@ public class RPCServer implements Server {
serverConnection.bind(port);
}
@Override
public boolean isActive() {
return serverConnection.isActive();
}
/**
* Shut down the server and release the port
*/

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.server;
package cn.hippo4j.rpc.server;
import java.io.Closeable;
@ -30,4 +30,11 @@ public interface Server extends Closeable {
*/
void bind();
/**
* Check whether the server is active
*
* @return Whether active
*/
boolean isActive();
}

@ -15,9 +15,9 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.server;
package cn.hippo4j.rpc.server;
import cn.hippo4j.config.rpc.handler.Connection;
import cn.hippo4j.rpc.handler.Connection;
/**
* This applies to server-side connections

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
package cn.hippo4j.rpc.support;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
package cn.hippo4j.rpc.support;
import cn.hippo4j.common.toolkit.ReflectUtil;
import cn.hippo4j.common.web.exception.IllegalException;

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
package cn.hippo4j.rpc.support;
/**
* Instance interface to get an instance

@ -15,10 +15,9 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
package cn.hippo4j.rpc.support;
import cn.hippo4j.config.rpc.exception.ConnectionException;
import cn.hippo4j.config.rpc.handler.NettyClientPoolHandler;
import cn.hippo4j.rpc.exception.ConnectionException;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
@ -41,14 +40,15 @@ public class NettyConnectPool {
ChannelHealthChecker healthCheck = ChannelHealthChecker.ACTIVE;
FixedChannelPool.AcquireTimeoutAction acquireTimeoutAction = FixedChannelPool.AcquireTimeoutAction.NEW;
int maxPendingAcquires = Integer.MAX_VALUE;
ChannelPoolHandler handler = new NettyClientPoolHandler();
ChannelPoolHandler handler;
ChannelPool pool;
String host;
int port;
public NettyConnectPool(String host, int port, int maxConnect,
long timeout, EventLoopGroup worker,
Class<? extends Channel> socketChannelCls) {
Class<? extends Channel> socketChannelCls,
ChannelPoolHandler handler) {
InetSocketAddress socketAddress = InetSocketAddress.createUnresolved(host, port);
Bootstrap bootstrap = new Bootstrap()
.group(worker)
@ -56,6 +56,7 @@ public class NettyConnectPool {
.remoteAddress(socketAddress);
this.host = host;
this.port = port;
this.handler = handler;
this.pool = new FixedChannelPool(bootstrap, handler, healthCheck, acquireTimeoutAction,
timeout, maxConnect, maxPendingAcquires, true, true);
log.info("The connection pool is established with the connection target {}:{}", host, port);

@ -15,10 +15,11 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
package cn.hippo4j.rpc.support;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.AccessLevel;
@ -38,11 +39,13 @@ public class NettyConnectPoolHolder {
static Map<String, NettyConnectPool> connectPoolMap = new ConcurrentHashMap<>();
private static NettyConnectPool initPool(String host, int port,
long timeout, EventLoopGroup worker) {
long timeout, EventLoopGroup worker,
ChannelPoolHandler handler) {
return new NettyConnectPool(
host, port, maxConnect,
timeout, worker,
NioSocketChannel.class);
NioSocketChannel.class,
handler);
}
private static String getKey(String host, int port) {
@ -75,19 +78,22 @@ public class NettyConnectPoolHolder {
/**
* Gets a connection pool, and if there is no connectPoolMapping, creates one with the values provided and joins the connectPoolMapping
*
* @param host the host
* @param port the port
* @param timeout timeout
* @param worker Special {@link EventExecutorGroup} which allows registering {@link Channel}s
* that get processed for later selection during the event loop.
* @param host the host
* @param port the port
* @param timeout timeout
* @param worker Special {@link EventExecutorGroup} which allows registering {@link Channel}s
* that get processed for later selection during the event loop.
* @param handler the chandler for netty
* @return Map to the connection pool
*/
public static synchronized NettyConnectPool getPool(String host, int port,
long timeout, EventLoopGroup worker) {
// This cannot use the computeIfAbsent method directly here because put is already used in init.
// Details refer to https://bugs.openjdk.java.net/browse/JDK-8062841
long timeout, EventLoopGroup worker,
ChannelPoolHandler handler) {
/*
* this cannot use the computeIfAbsent method directly here because put is already used in init. Details refer to https://bugs.openjdk.java.net/browse/JDK-8062841
*/
NettyConnectPool pool = getPool(host, port);
return pool == null ? initPool(host, port, timeout, worker) : pool;
return pool == null ? initPool(host, port, timeout, worker, handler) : pool;
}
/**

@ -15,14 +15,15 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
package cn.hippo4j.rpc.support;
import cn.hippo4j.config.rpc.request.DefaultRequest;
import cn.hippo4j.config.rpc.client.NettyClientConnection;
import cn.hippo4j.config.rpc.request.Request;
import cn.hippo4j.config.rpc.response.Response;
import cn.hippo4j.common.toolkit.IdUtil;
import cn.hippo4j.common.web.exception.IllegalException;
import cn.hippo4j.rpc.client.NettyClientConnection;
import cn.hippo4j.rpc.request.DefaultRequest;
import cn.hippo4j.rpc.request.Request;
import cn.hippo4j.rpc.response.Response;
import io.netty.channel.pool.ChannelPoolHandler;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
@ -41,16 +42,17 @@ public class NettyProxyCenter {
static Map<Class<?>, Object> map = new HashMap<>();
/**
* PRC
* A proxy object for PRC is obtained through an interface
*
* @param cls
* @param host
* @param port
* @param <T>
* @return
* @param cls The interface type
* @param host Request the address
* @param port port
* @param <T> Object type
* @param handler the pool handler for netty
* @return Proxy objects
*/
public static <T> T getProxy(Class<T> cls, String host, int port) {
NettyClientConnection connection = new NettyClientConnection(host, port);
public static <T> T getProxy(Class<T> cls, String host, int port, ChannelPoolHandler handler) {
NettyClientConnection connection = new NettyClientConnection(host, port, handler);
return getProxy(connection, cls, host, port);
}

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
package cn.hippo4j.rpc.support;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
@ -56,7 +56,7 @@ public class ResultHolder {
* @param key Request and response keys
* @param t The Thread
*/
public static void put(String key, Thread t) {
public static void putThread(String key, Thread t) {
log.debug("Write thread, waiting to wake up");
threadMap.put(key, t);
}

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
package cn.hippo4j.rpc.support;
import cn.hippo4j.common.config.ApplicationContextHolder;

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.client;
package cn.hippo4j.rpc.client;
public class CallManager {
@ -23,4 +23,8 @@ public class CallManager {
return 1;
}
public int callTest(Integer a, Integer b) {
return a + b;
}
}

@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.client;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.rpc.request.DefaultRequest;
import cn.hippo4j.rpc.request.Request;
import cn.hippo4j.rpc.response.Response;
import cn.hippo4j.rpc.server.NettyServerConnection;
import cn.hippo4j.rpc.server.RPCServer;
import cn.hippo4j.rpc.server.ServerConnection;
import cn.hippo4j.rpc.support.ClassRegistry;
import cn.hippo4j.rpc.support.DefaultInstance;
import cn.hippo4j.rpc.support.Instance;
import io.netty.channel.pool.ChannelPoolHandler;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class RPCClientTest {
String host = "localhost";
int port = 8888;
int portTest = 8889;
@Test
public void connection() throws IOException {
Class<CallManager> cls = CallManager.class;
String className = cls.getName();
ClassRegistry.put(className, cls);
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(port, connection);
CompletableFuture.runAsync(rpcServer::bind);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
ChannelPoolHandler channelPoolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
NettyClientConnection clientConnection = new NettyClientConnection(host, port, channelPoolHandler);
RPCClient rpcClient = new RPCClient(clientConnection);
Request request = new DefaultRequest("127.0.0.18888", className, "call", null, null);
for (int i = 0; i < 100; i++) {
Response response = rpcClient.connection(request);
boolean active = rpcClient.isActive();
Assert.assertTrue(active);
Assert.assertEquals(response.getObj(), 1);
}
rpcClient.close();
rpcServer.close();
}
/**
* This test case can be overridden under the handler and coder packages
*/
@Test
public void connectionTest() throws IOException {
Class<CallManager> cls = CallManager.class;
String className = cls.getName();
ClassRegistry.put(className, cls);
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(portTest, connection);
CompletableFuture.runAsync(rpcServer::bind);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
ChannelPoolHandler channelPoolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
NettyClientConnection clientConnection = new NettyClientConnection(host, portTest, channelPoolHandler);
RPCClient rpcClient = new RPCClient(clientConnection);
Class<?>[] classes = new Class<?>[2];
classes[0] = Integer.class;
classes[1] = Integer.class;
Object[] objects = new Object[2];
objects[0] = 1;
objects[1] = 2;
Request request = new DefaultRequest("127.0.0.18889", className, "callTest", classes, objects);
Response response = rpcClient.connection(request);
boolean active = rpcClient.isActive();
Assert.assertTrue(active);
Assert.assertEquals(response.getObj(), 3);
rpcClient.close();
rpcServer.close();
}
}

@ -15,12 +15,14 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.server;
package cn.hippo4j.rpc.server;
import cn.hippo4j.config.rpc.support.DefaultInstance;
import cn.hippo4j.config.rpc.support.Instance;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.rpc.support.DefaultInstance;
import cn.hippo4j.rpc.support.Instance;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
@ -34,7 +36,8 @@ public class RPCServerTest {
@Test
public void bind() throws IOException {
Instance instance = new DefaultInstance();
ServerConnection connection = new NettyServerConnection(instance);
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(port, connection);
CompletableFuture.runAsync(rpcServer::bind);
try {
@ -42,7 +45,11 @@ public class RPCServerTest {
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
boolean active = rpcServer.isActive();
Assert.assertTrue(active);
rpcServer.close();
boolean serverActive = rpcServer.isActive();
Assert.assertFalse(serverActive);
}
@Test
@ -50,7 +57,8 @@ public class RPCServerTest {
Instance instance = new DefaultInstance();
EventLoopGroup leader = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
ServerConnection connection = new NettyServerConnection(leader, worker, instance);
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(leader, worker, handler);
RPCServer rpcServer = new RPCServer(port, connection);
CompletableFuture.runAsync(rpcServer::bind);
try {

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
package cn.hippo4j.rpc.support;
import org.junit.Assert;
import org.junit.Test;

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
package cn.hippo4j.rpc.support;
import lombok.AllArgsConstructor;
import lombok.Getter;
@ -38,7 +38,7 @@ public class DefaultInstanceTest {
@Test
public void testGetInstance() {
String className = "cn.hippo4j.config.rpc.support.DefaultInstanceTest$InstanceModel";
String className = "cn.hippo4j.rpc.support.DefaultInstanceTest$InstanceModel";
Object instanceInstance = instance.getInstance(className);
Assert.assertNotNull(instanceInstance);
Assert.assertEquals(className, instanceInstance.getClass().getName());

@ -15,8 +15,10 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
package cn.hippo4j.rpc.support;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
@ -35,7 +37,8 @@ public class NettyConnectPoolHolderTest {
@Test
public void createPool() {
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls);
NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, handler);
NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port);
Assert.assertEquals(pool, connectPool);
NettyConnectPoolHolder.clear();
@ -45,7 +48,8 @@ public class NettyConnectPoolHolderTest {
@Test
public void testGetPool() {
NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port, timeout, group);
NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port, timeout, group, handler);
NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(host, port);
Assert.assertEquals(connectPool1, connectPool);
NettyConnectPoolHolder.clear();
@ -55,7 +59,8 @@ public class NettyConnectPoolHolderTest {
@Test
public void remove() {
NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port, timeout, group);
NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port, timeout, group, handler);
NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(host, port);
Assert.assertEquals(connectPool1, connectPool);
NettyConnectPoolHolder.remove(host, port);

@ -15,11 +15,14 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
package cn.hippo4j.rpc.support;
import cn.hippo4j.config.rpc.server.NettyServerConnection;
import cn.hippo4j.config.rpc.server.RPCServer;
import cn.hippo4j.config.rpc.server.ServerConnection;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import cn.hippo4j.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.rpc.server.NettyServerConnection;
import cn.hippo4j.rpc.server.RPCServer;
import cn.hippo4j.rpc.server.ServerConnection;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
@ -45,7 +48,8 @@ public class NettyConnectPoolTest {
public void acquire() throws IOException {
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
ServerConnection connection = new NettyServerConnection(instance);
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(port, connection);
CompletableFuture.runAsync(rpcServer::bind);
// Given the delay in starting the server, wait here
@ -54,8 +58,8 @@ public class NettyConnectPoolTest {
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls);
NettyClientPoolHandler poolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, poolHandler);
Channel acquire = pool.acquire(timeout);
Assert.assertNotNull(acquire);
pool.release(acquire);
@ -66,7 +70,8 @@ public class NettyConnectPoolTest {
public void testAcquire() throws IOException {
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
ServerConnection connection = new NettyServerConnection(instance);
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(port, connection);
CompletableFuture.runAsync(rpcServer::bind);
// Given the delay in starting the server, wait here
@ -75,8 +80,8 @@ public class NettyConnectPoolTest {
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls);
NettyClientPoolHandler poolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, poolHandler);
Future<Channel> acquire = pool.acquire();
Assert.assertNotNull(acquire);
rpcServer.close();
@ -86,7 +91,8 @@ public class NettyConnectPoolTest {
public void close() throws IOException {
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
ServerConnection connection = new NettyServerConnection(instance);
NettyServerTakeHandler handler = new NettyServerTakeHandler(instance);
ServerConnection connection = new NettyServerConnection(handler);
RPCServer rpcServer = new RPCServer(port, connection);
CompletableFuture.runAsync(rpcServer::bind);
// Given the delay in starting the server, wait here
@ -96,7 +102,8 @@ public class NettyConnectPoolTest {
throw new RuntimeException(e);
}
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls);
NettyClientPoolHandler poolHandler = new NettyClientPoolHandler(new NettyClientTakeHandler());
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls, poolHandler);
Channel acquire = pool.acquire(timeout);
Assert.assertNotNull(acquire);
pool.release(acquire);

@ -15,9 +15,12 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
package cn.hippo4j.rpc.support;
import cn.hippo4j.common.web.exception.IllegalException;
import cn.hippo4j.rpc.exception.ConnectionException;
import cn.hippo4j.rpc.handler.NettyClientPoolHandler;
import cn.hippo4j.rpc.handler.NettyClientTakeHandler;
import org.junit.Assert;
import org.junit.Test;
@ -25,17 +28,29 @@ public class NettyProxyCenterTest {
@Test
public void getProxy() {
ProxyInterface localhost = NettyProxyCenter.getProxy(ProxyInterface.class, "localhost", 8888);
NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler());
ProxyInterface localhost = NettyProxyCenter.getProxy(ProxyInterface.class, "localhost", 8888, handler);
Assert.assertNotNull(localhost);
}
@Test(expected = IllegalException.class)
public void getProxyTest() {
ProxyClass localhost = NettyProxyCenter.getProxy(ProxyClass.class, "localhost", 8888);
NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler());
ProxyClass localhost = NettyProxyCenter.getProxy(ProxyClass.class, "localhost", 8888, handler);
Assert.assertNotNull(localhost);
}
@Test(expected = ConnectionException.class)
public void getProxyTestCall() {
NettyClientPoolHandler handler = new NettyClientPoolHandler(new NettyClientTakeHandler());
ProxyInterface localhost = NettyProxyCenter.getProxy(ProxyInterface.class, "localhost", 8888, handler);
localhost.hello();
Assert.assertNotNull(localhost);
}
interface ProxyInterface {
void hello();
}
static class ProxyClass {

@ -15,12 +15,17 @@
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
package cn.hippo4j.rpc.support;
import cn.hippo4j.common.toolkit.IdUtil;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
public class ResultHolderTest {
@Test
@ -39,4 +44,21 @@ public class ResultHolderTest {
Assert.assertEquals(r1, o1);
Assert.assertEquals(r2, o2);
}
@Test
public void testThread() throws InterruptedException {
AtomicInteger a = new AtomicInteger();
String s1 = IdUtil.simpleUUID();
String o1 = s1 + "1";
CompletableFuture.runAsync(() -> {
ResultHolder.putThread(o1, Thread.currentThread());
LockSupport.park();
a.set(1);
});
Assert.assertEquals(0, a.get());
TimeUnit.SECONDS.sleep(1);
ResultHolder.wake(o1);
TimeUnit.SECONDS.sleep(1);
Assert.assertEquals(1, a.get());
}
}

@ -1,68 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.client;
import cn.hippo4j.config.rpc.request.DefaultRequest;
import cn.hippo4j.config.rpc.request.Request;
import cn.hippo4j.config.rpc.response.Response;
import cn.hippo4j.config.rpc.server.NettyServerConnection;
import cn.hippo4j.config.rpc.server.RPCServer;
import cn.hippo4j.config.rpc.server.ServerConnection;
import cn.hippo4j.config.rpc.support.DefaultInstance;
import cn.hippo4j.config.rpc.support.Instance;
import cn.hippo4j.config.rpc.support.ClassRegistry;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class RPCClientTest {
String host = "localhost";
int port = 8888;
@Test
public void connection() throws IOException {
Class<CallManager> cls = CallManager.class;
String className = cls.getName();
ClassRegistry.put(className, cls);
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
ServerConnection connection = new NettyServerConnection(instance);
RPCServer rpcServer = new RPCServer(port, connection);
CompletableFuture.runAsync(rpcServer::bind);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
NettyClientConnection clientConnection = new NettyClientConnection(host, port);
RPCClient rpcClient = new RPCClient(clientConnection);
Request request = new DefaultRequest("127.0.0.18888", className, "call", null, null);
for (int i = 0; i < 100; i++) {
Response response = rpcClient.connection(request);
Assert.assertEquals(response.getObj(), 1);
}
rpcClient.close();
rpcServer.close();
}
}

@ -38,6 +38,7 @@
<module>hippo4j-example</module>
<module>hippo4j-message</module>
<module>hippo4j-monitor</module>
<module>hippo4j-rpc</module>
<module>hippo4j-server</module>
<module>hippo4j-spring-boot</module>
</modules>

Loading…
Cancel
Save