Server-side and client-side model of rpc mode call based on netty (#880)

* fix : add toolkit

* feat : Implement rpc calls through netty, implement server side and client side respectively, the underlying network connection and pipeline context mechanism depend on netty(#812)

* fix : Modifying the comment Format (#812)
pull/899/head
pizihao 2 years ago committed by GitHub
parent 4831cd6ecb
commit b73725f159
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -26,7 +26,7 @@ import java.lang.management.MemoryUsage;
/**
* memory util<br>
* the obtained information is not invalid, after a long wait, obtain it again
* the obtained information is not real time effective, after a long wait, please get it again
*
* @author liuwenhao
*/

@ -250,4 +250,18 @@ public class ReflectUtil {
throw new IllegalException(e);
}
}
/**
* get instance
*
* @param cls the class
* @return new Instance
*/
public static Object createInstance(Class<?> cls) {
try {
return cls.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
throw new IllegalException(e);
}
}
}

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.client;
import cn.hippo4j.config.rpc.request.Request;
import cn.hippo4j.config.rpc.response.Response;
import java.io.Closeable;
/**
* the client for RPC, Explain the role of the client in the request
*/
public interface Client extends Closeable {
/**
* Start the client and try to send and receive data
*/
Response connection(Request request);
}

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.client;
import cn.hippo4j.config.rpc.handler.Connection;
import cn.hippo4j.config.rpc.request.Request;
import cn.hippo4j.config.rpc.response.Response;
/**
* Applicable to client connections
*/
public interface ClientConnection extends Connection {
/**
* Establish a connection and process
*
* @param request Request information
*/
Response connect(Request request);
/**
* Get timeout, ms
*/
long timeout();
/**
* SET timeout, ms
*/
void setTimeout(long timeout);
}

@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.client;
import cn.hippo4j.config.rpc.exception.TimeOutException;
import cn.hippo4j.config.rpc.process.ActivePostProcess;
import cn.hippo4j.config.rpc.request.Request;
import cn.hippo4j.config.rpc.response.Response;
import cn.hippo4j.config.rpc.support.NettyConnectPool;
import cn.hippo4j.config.rpc.support.NettyConnectPoolHolder;
import cn.hippo4j.config.rpc.support.ResultHolder;
import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.common.web.exception.IllegalException;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.LockSupport;
/**
* Client implemented using netty
*/
@Slf4j
public class NettyClientConnection implements ClientConnection {
String host;
Integer port;
// Obtain the connection timeout period. The default value is 30s
long timeout = 30000L;
Channel channel;
EventLoopGroup worker = new NioEventLoopGroup();
List<ActivePostProcess> activeProcesses;
ChannelFuture future;
NettyConnectPool connectionPool;
public NettyClientConnection(String host, int port,
List<ActivePostProcess> activeProcesses) {
Assert.notNull(worker);
this.host = host;
this.port = port;
this.activeProcesses = activeProcesses;
this.connectionPool = NettyConnectPoolHolder.getPool(host, port, timeout, worker);
}
public NettyClientConnection(String host, int port) {
this(host, port, new LinkedList<>());
}
@Override
public Response connect(Request request) {
preHandlers(request);
this.channel = connectionPool.acquire(timeout);
try {
String key = request.getKey();
this.future = channel.writeAndFlush(request);
log.info("Call successful, target address is {}:{}, request key is {}", host, port, key);
// Wait for execution to complete
ResultHolder.put(key, Thread.currentThread());
LockSupport.parkNanos(timeout() * 1000000);
Response response = ResultHolder.get(key);
if (response == null) {
throw new TimeOutException("Timeout waiting for server-side response");
}
postHandlers(request, response);
log.info("The response from {}:{} was received successfully with the response key {}.", host, port, key);
return response;
} catch (Exception ex) {
afterCompletions(request, null, ex);
throw new IllegalException(ex);
} finally {
connectionPool.release(this.channel);
}
}
@Override
public long timeout() {
return timeout;
}
@Override
public void setTimeout(long timeout) {
this.timeout = timeout;
}
@Override
public void close() {
if (this.channel == null) {
return;
}
worker.shutdownGracefully();
this.future.channel().close();
this.channel.close();
}
private void preHandlers(Request request) {
for (ActivePostProcess process : activeProcesses) {
process.preHandler(request);
}
}
private void postHandlers(Request request, Response response) {
for (ActivePostProcess process : activeProcesses) {
process.postHandler(request, response);
}
}
private void afterCompletions(Request request, Response response, Exception e) {
for (ActivePostProcess process : activeProcesses) {
process.afterCompletion(request, response, e);
}
}
}

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.client;
import cn.hippo4j.config.rpc.request.Request;
import cn.hippo4j.config.rpc.response.Response;
import java.io.IOException;
/**
* The client, which provides a closing mechanism, maintains a persistent connection if not closed
*/
public class RPCClient implements Client {
ClientConnection clientConnection;
public RPCClient(ClientConnection clientConnection) {
this.clientConnection = clientConnection;
}
@Override
public Response connection(Request request) {
return clientConnection.connect(request);
}
/**
* Close the client and release all connections.
*
* @throws IOException exception
*/
@Override
public void close() throws IOException {
clientConnection.close();
}
}

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.coder;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.ObjectStreamClass;
import java.io.OutputStream;
/**
* object OutputStream
*/
public class CompactObjectOutputStream extends ObjectOutputStream {
static final int TYPE_FAT_DESCRIPTOR = 0;
static final int TYPE_THIN_DESCRIPTOR = 1;
public CompactObjectOutputStream(OutputStream out) throws IOException {
super(out);
}
@Override
protected void writeStreamHeader() throws IOException {
writeByte(STREAM_VERSION);
}
@Override
protected void writeClassDescriptor(ObjectStreamClass desc) throws IOException {
Class<?> clazz = desc.forClass();
if (clazz.isPrimitive() || clazz.isArray() || clazz.isInterface() || desc.getSerialVersionUID() == 0) {
write(TYPE_FAT_DESCRIPTOR);
super.writeClassDescriptor(desc);
} else {
write(TYPE_THIN_DESCRIPTOR);
writeUTF(desc.getName());
}
}
}

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.coder;
import cn.hippo4j.config.rpc.exception.CoderException;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.serialization.ClassResolver;
import io.netty.handler.codec.serialization.ObjectDecoder;
/**
* According to the decoder for java objects implemented by ObjectDecoder,
* it is necessary to ensure that the transmitted objects can be serialized
*/
public class NettyDecoder extends ObjectDecoder {
public NettyDecoder(ClassResolver classResolver) {
super(classResolver);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) {
ByteBuf byteBuf = in.retainedDuplicate();
try {
Object o = super.decode(ctx, in);
if (o == null) {
return byteBuf;
} else {
return o;
}
} catch (Exception e) {
throw new CoderException("The encoding is abnormal, which may be caused by the failure of the transfer object to be deserialized");
}
}
}

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.coder;
import cn.hippo4j.config.rpc.exception.CoderException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import java.io.ObjectOutputStream;
import java.io.Serializable;
/**
* this is a encoder, For custom gluing and unpacking<br>
* {@link io.netty.handler.codec.serialization.ObjectEncoder}
*/
public class NettyEncoder extends MessageToByteEncoder<Serializable> {
private static final byte[] BYTE = new byte[4];
@Override
protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception {
int startIndex = out.writerIndex();
try (ByteBufOutputStream outPut = new ByteBufOutputStream(out)) {
outPut.write(BYTE);
try (ObjectOutputStream outputStream = new CompactObjectOutputStream(outPut)) {
outputStream.writeObject(msg);
outputStream.flush();
}
} catch (Exception e) {
throw new CoderException("The encoding is abnormal, which may be caused by the transfer object being unable to be serialized");
}
int endIndex = out.writerIndex();
out.setInt(startIndex, endIndex - startIndex - 4);
}
}

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.discovery;
import java.net.InetSocketAddress;
/**
* The adaptation layer of different service centers is used to know
* the host of different services through the registration center
*/
public interface DiscoveryAdapter {
/**
* get InetSocketAddress served in the registry
*
* @param name server name
* @return InetSocketAddress
*/
InetSocketAddress getSocketAddress(String name);
}

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.exception;
/**
* During decoding and encoding, if an exception occurs, an exception of type {@link CoderException} is thrown,
* which is not different from a {@link RuntimeException}, but is more explicit about the type of exception
*/
public class CoderException extends RuntimeException {
private static final long serialVersionUID = 8247610319171014183L;
public CoderException() {
super();
}
public CoderException(String message) {
super(message);
}
public CoderException(Throwable e) {
super(e.getMessage(), e);
}
public CoderException(String message, Throwable throwable) {
super(message, throwable);
}
public CoderException(String message, Throwable throwable, boolean enableSuppression, boolean writableStackTrace) {
super(message, throwable, enableSuppression, writableStackTrace);
}
}

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.exception;
/**
* If an exception occurs during the connection between the server and the client, an exception of type
* {@link ConnectionException} is thrown, which is not different from {@link RuntimeException}, but is more explicit
* about the type of exception
*/
public class ConnectionException extends RuntimeException {
private static final long serialVersionUID = 8247610319171014183L;
public ConnectionException() {
super();
}
public ConnectionException(String message) {
super(message);
}
public ConnectionException(Throwable e) {
super(e.getMessage(), e);
}
public ConnectionException(String message, Throwable throwable) {
super(message, throwable);
}
public ConnectionException(String message, Throwable throwable, boolean enableSuppression, boolean writableStackTrace) {
super(message, throwable, enableSuppression, writableStackTrace);
}
}

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.exception;
/**
* If there is a timeout between the server and the client, you will get a {@link TimeOutException},
* which is not different from {@link RuntimeException}, but it will be more explicit about the type of exception, right
*/
public class TimeOutException extends RuntimeException {
private static final long serialVersionUID = 8247610319171014183L;
public TimeOutException() {
super();
}
public TimeOutException(String message) {
super(message);
}
public TimeOutException(Throwable e) {
super(e.getMessage(), e);
}
public TimeOutException(String message, Throwable throwable) {
super(message, throwable);
}
public TimeOutException(String message, Throwable throwable, boolean enableSuppression, boolean writableStackTrace) {
super(message, throwable, enableSuppression, writableStackTrace);
}
}

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.handler;
import cn.hippo4j.config.rpc.request.Request;
import cn.hippo4j.config.rpc.response.Response;
/**
* The handler in each connection, where the specific behavior of the connection
* must be specified, such as serialization and parsing, requesting and receiving
* requests, and so on
*/
public interface ConnectHandler {
/**
* Processing after receiving the request
*
* @param request request
*/
default Response handler(Request request) {
return null;
}
/**
* Processing after receiving Response
*
* @param response response
*/
default void handler(Response response) {
//
}
}

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.handler;
import java.io.Closeable;
/**
* Represents a network request connection and provides IO layer support
*/
public interface Connection extends Closeable {
}

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.handler;
import cn.hippo4j.config.rpc.coder.NettyDecoder;
import cn.hippo4j.config.rpc.coder.NettyEncoder;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import lombok.extern.slf4j.Slf4j;
/**
* Processing by the client connection pool handler to clean the buffer and define new connection properties
*/
@Slf4j
public class NettyClientPoolHandler implements ChannelPoolHandler {
@Override
public void channelReleased(Channel ch) {
ch.writeAndFlush(Unpooled.EMPTY_BUFFER);
log.info("The connection buffer has been emptied of data");
}
@Override
public void channelAcquired(Channel ch) {
// NO SOMETHING
}
@Override
public void channelCreated(Channel ch) {
NioSocketChannel channel = (NioSocketChannel) ch;
channel.config()
.setTcpNoDelay(false);
ch.pipeline().addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null)));
ch.pipeline().addLast(new NettyEncoder());
ch.pipeline().addLast(new NettyClientTakeHandler());
}
}

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.handler;
import cn.hippo4j.config.rpc.exception.ConnectionException;
import cn.hippo4j.config.rpc.support.ResultHolder;
import cn.hippo4j.config.rpc.response.Response;
import cn.hippo4j.common.web.exception.IllegalException;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* Interconnect with the netty mediation layer
*/
public class NettyClientTakeHandler extends ChannelInboundHandlerAdapter implements ConnectHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
Response response = (Response) msg;
handler(response);
ctx.flush();
} catch (Exception e) {
ctx.close();
throw new IllegalException(e);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
Channel channel = ctx.channel();
if (channel.isActive()) {
ctx.close();
} else {
throw new ConnectionException(cause);
}
}
@Override
public void handler(Response response) {
ResultHolder.put(response.getKey(), response);
ResultHolder.wake(response.getKey());
}
}

@ -0,0 +1,115 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.handler;
import cn.hippo4j.config.rpc.exception.ConnectionException;
import cn.hippo4j.config.rpc.process.ActivePostProcess;
import cn.hippo4j.config.rpc.response.DefaultResponse;
import cn.hippo4j.config.rpc.support.ClassRegistry;
import cn.hippo4j.config.rpc.support.Instance;
import cn.hippo4j.config.rpc.request.Request;
import cn.hippo4j.config.rpc.response.Response;
import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.common.toolkit.ReflectUtil;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.lang.reflect.Method;
import java.util.LinkedList;
import java.util.List;
/**
* netty adaptation layer
*/
public class NettyServerTakeHandler extends ChannelInboundHandlerAdapter implements ConnectHandler {
List<ActivePostProcess> processes;
Instance instance;
public NettyServerTakeHandler(List<ActivePostProcess> processes, Instance instance) {
this.processes = processes;
this.instance = instance;
}
public NettyServerTakeHandler(Instance instance) {
this(new LinkedList<>(), instance);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (!(msg instanceof Request)) {
return;
}
Request request = (Request) msg;
Response response = handler(request);
ctx.writeAndFlush(response);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Channel channel = ctx.channel();
if (channel.isActive()) {
ctx.close();
} else {
throw new ConnectionException(cause);
}
}
@Override
public Response handler(Request request) {
if (!preHandlers(request)) {
return null;
}
try {
Class<?> cls = ClassRegistry.get(request.getClassName());
Method method = ReflectUtil.getMethodByName(cls, request.getMethodName(), request.getParameterTypes());
Assert.notNull(method);
Object invoke = ReflectUtil.invoke(instance.getInstance(cls), method, request.getParameters());
Response response = new DefaultResponse(request.getKey(), invoke.getClass(), invoke);
postHandlers(request, response);
return response;
} catch (Exception e) {
Response response = new DefaultResponse(request.getKey(), e, e.getMessage());
afterCompletions(request, response, e);
return response;
}
}
private boolean preHandlers(Request request) {
for (ActivePostProcess process : processes) {
if (!process.preHandler(request)) {
return false;
}
}
return true;
}
private void postHandlers(Request request, Response response) {
for (ActivePostProcess process : processes) {
process.postHandler(request, response);
}
}
private void afterCompletions(Request request, Response response, Exception e) {
for (ActivePostProcess process : processes) {
process.afterCompletion(request, response, e);
}
}
}

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.process;
import cn.hippo4j.config.rpc.request.Request;
import cn.hippo4j.config.rpc.response.Response;
/**
* Callback while the connection is in progress
*/
public interface ActivePostProcess {
/**
* Client: After establishing a connection and before passing parameters<br>
* Server: Receives parameters and performs pre-call operations<br>
*
* @param request request
* @return Whether to continue the execution. If it is a client, the returned value does not affect subsequent execution
*/
default boolean preHandler(Request request) {
return true;
}
/**
* Client: Action after receiving a response<br>
* Server: performs the operation after the call<br>
*
* @param request request
* @param response response
*/
default void postHandler(Request request, Response response) {
// NO SOMETHING
}
/**
* Called when an exception or resource is cleaned
*
* @param request request
* @param response response
* @param e Exception
*/
default void afterCompletion(Request request, Response response, Exception e) {
// NO SOMETHING
}
}

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.request;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Objects;
/**
* default request<br>
* Use the fully qualified name key of the interface and override equals and hashCode
*/
public final class DefaultRequest implements Request {
String key;
String className;
String methodName;
Class<?>[] parameterTypes;
transient Object[] parameters;
public DefaultRequest(String key, String className, String methodName, Class<?>[] parameterTypes, Object[] parameters) {
this.key = key;
this.className = className;
this.methodName = methodName;
this.parameterTypes = parameterTypes;
this.parameters = parameters;
}
@Override
public String getKey() {
return key;
}
@Override
public String getClassName() {
return className;
}
@Override
public String getMethodName() {
return methodName;
}
@Override
public Class<?>[] getParameterTypes() {
return parameterTypes;
}
@Override
public Object[] getParameters() {
return parameters;
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
DefaultRequest that = (DefaultRequest) o;
return Objects.equals(key, that.key)
&& Objects.equals(className, that.className)
&& Objects.equals(methodName, that.methodName);
}
@Override
public int hashCode() {
return Objects.hash(key, className, methodName);
}
/**
* Redefine the behavior of serialization, that is, re-acquire the initially serialized
* data from the stream and re-serialize it. Simple serialization will result in the
* loss of the field identified by transient.
*/
private void writeObject(ObjectOutputStream s) throws IOException {
s.defaultWriteObject();
if (parameters == null) {
return;
}
// 序列化属性 parameters
for (Object parameter : parameters) {
s.writeObject(parameter);
}
}
/**
* Redefine the deserialization behavior, and sequentially deserialize the data specified during
* serialization, because there is data that is not deserialized during initial deserialization,
* such as fields defined by transient
*/
private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
s.defaultReadObject();
if (parameterTypes == null) {
return;
}
// 反序列化属性 parameters
int length = parameterTypes.length;
Object[] a = new Object[length];
for (int i = 0; i < length; i++) {
a[i] = s.readObject();
}
this.parameters = a;
}
}

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.request;
import java.io.Serializable;
/**
* request
*/
public interface Request extends Serializable {
/**
* The unique identity of the current request
*/
String getKey();
/**
* The Class name of the current request
*/
String getClassName();
/**
* The Method name of the current request
*/
String getMethodName();
/**
* The parameter type of the current request
*/
Class<?>[] getParameterTypes();
/**
* The parameters of the current request
*/
Object[] getParameters();
}

@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.response;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Objects;
/**
* default request<br>
* Use the fully qualified name key of the interface and override equals and hashCode
*/
public class DefaultResponse implements Response {
String key;
Class<?> cls;
transient Object obj;
Throwable throwable;
String errMsg;
public DefaultResponse(String key, Class<?> cls, Object obj, Throwable throwable, String errMsg) {
this.key = key;
this.cls = cls;
this.obj = obj;
this.throwable = throwable;
this.errMsg = errMsg;
}
public DefaultResponse(String key, Throwable throwable, String errMsg) {
this(key, null, null, throwable, errMsg);
}
public DefaultResponse(String key, Class<?> cls, Object obj) {
this(key, cls, obj, null, null);
}
@Override
public String getKey() {
return key;
}
@Override
public Class<?> getCls() {
return cls;
}
@Override
public Object getObj() {
return obj;
}
@Override
public Throwable getThrowable() {
return throwable;
}
@Override
public String getErrMsg() {
return errMsg;
}
@Override
public boolean isErr() {
return throwable != null || errMsg != null;
}
@Override
public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
DefaultResponse that = (DefaultResponse) o;
return Objects.equals(key, that.key) && Objects.equals(cls, that.cls);
}
@Override
public int hashCode() {
return Objects.hash(key, cls);
}
/**
* Redefine the behavior of serialization, that is, re-acquire the initially serialized
* data from the stream and re-serialize it. Simple serialization will result in the
* loss of the field identified by transient.
*/
private void writeObject(ObjectOutputStream s) throws IOException {
s.defaultWriteObject();
if (obj == null) {
return;
}
// 序列化属性 obj
s.writeObject(this.obj);
}
/**
* Redefine the deserialization behavior, and sequentially deserialize the data specified during
* serialization, because there is data that is not deserialized during initial deserialization,
* such as fields defined by transient
*/
private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
s.defaultReadObject();
// 反序列化属性 obj
this.obj = s.readObject();
}
}

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.response;
import java.io.Serializable;
/**
* Response
*/
public interface Response extends Serializable {
/**
* The unique identity of the current Response
*/
String getKey();
/**
* The class of the current Response, The target of deserialization
*/
Class<?> getCls();
/**
* The results of this request can be obtained, The source of deserialization
*/
Object getObj();
/**
* The Throwable of the current Response
*/
Throwable getThrowable();
/**
* the error message
*/
String getErrMsg();
/**
* Whether the current request has an error
*/
boolean isErr();
}

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.server;
import cn.hippo4j.config.rpc.coder.NettyDecoder;
import cn.hippo4j.config.rpc.coder.NettyEncoder;
import cn.hippo4j.config.rpc.handler.NettyServerTakeHandler;
import cn.hippo4j.config.rpc.process.ActivePostProcess;
import cn.hippo4j.config.rpc.support.Instance;
import cn.hippo4j.common.toolkit.Assert;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedList;
import java.util.List;
/**
* adapter to the netty server
*/
@Slf4j
public class NettyServerConnection implements ServerConnection {
Integer port;
EventLoopGroup leader;
EventLoopGroup worker;
Class<? extends ServerChannel> socketChannelCls = NioServerSocketChannel.class;
List<ActivePostProcess> processes;
Instance instance;
ChannelFuture future;
public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, List<ActivePostProcess> processes, Instance instance) {
Assert.notNull(processes);
Assert.notNull(instance);
Assert.notNull(leader);
Assert.notNull(worker);
this.leader = leader;
this.worker = worker;
this.processes = processes;
this.instance = instance;
}
public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, Instance instance) {
this(leader, worker, new LinkedList<>(), instance);
}
public NettyServerConnection(List<ActivePostProcess> processes, Instance instance) {
this(new NioEventLoopGroup(), new NioEventLoopGroup(), processes, instance);
}
public NettyServerConnection(Instance instance) {
this(new NioEventLoopGroup(), new NioEventLoopGroup(), new LinkedList<>(), instance);
}
@Override
public void bind(int port) {
ServerBootstrap server = new ServerBootstrap();
server.group(leader, worker)
.channel(socketChannelCls)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null)));
ch.pipeline().addLast(new NettyEncoder());
ch.pipeline().addLast(new NettyServerTakeHandler(processes, instance));
}
});
try {
this.future = server.bind(port);
log.info("The server is started and can receive requests. The listening port is {}", port);
this.port = port;
this.future.channel().closeFuture().sync();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
@Override
public void close() {
if (port == null) {
return;
}
leader.shutdownGracefully();
worker.shutdownGracefully();
this.future.channel().close();
log.info("The server is shut down and no more requests are received. The release port is {}", port);
}
}

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.server;
import java.io.IOException;
/**
* Server Implementation
*/
public class RPCServer implements Server {
int port;
ServerConnection serverConnection;
public RPCServer(int port, ServerConnection serverConnection) {
this.port = port;
this.serverConnection = serverConnection;
}
@Override
public void bind() {
serverConnection.bind(port);
}
/**
* Shut down the server and release the port
*/
@Override
public void close() throws IOException {
serverConnection.close();
}
}

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.server;
import java.io.Closeable;
/**
* the service for RPC, Explain the role of the service in the request
*/
public interface Server extends Closeable {
/**
* Start the server. Attempt to listen on the port and receive the request.<br>
* If the port being processed is already bound, an exception is thrown
*/
void bind();
}

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.server;
import cn.hippo4j.config.rpc.handler.Connection;
/**
* This applies to server-side connections
*/
public interface ServerConnection extends Connection {
/**
* Bind ports and process them
*/
void bind(int port);
}

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* the registration center for Client and Server
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class ClassRegistry {
private static final Map<String, Class<?>> serverRegister = new ConcurrentHashMap<>();
/**
* get a Obj in Registry center <br>
*
* @param s key
* @return t element
*/
public static Class<?> get(String s) {
return serverRegister.get(s);
}
/**
* add the element to Registry Table <br>
* if the key already exists, failure, and return before the value of the key. <br>
* if success return the element
*
* @param s key
* @param cls element
* @return final mapped value
*/
public static Class<?> set(String s, Class<?> cls) {
return serverRegister.putIfAbsent(s, cls);
}
/**
* add the element to Registry Table <br>
* if the key already exists, failure, replace it
*
* @param s key
* @param cls element
*/
public static Class<?> put(String s, Class<?> cls) {
return serverRegister.put(s, cls);
}
/**
* clear
*/
public static void clear() {
serverRegister.clear();
}
}

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
import cn.hippo4j.common.toolkit.ReflectUtil;
import cn.hippo4j.common.web.exception.IllegalException;
/**
* Simply creating an instance of a class by its name and its specific type,
* and then throwing an exception if it is an interface, is not elegant
*/
public class DefaultInstance implements Instance {
@Override
public Object getInstance(Class<?> cls) {
return ReflectUtil.createInstance(cls);
}
@Override
public Object getInstance(String name) {
try {
Class<?> cls = Class.forName(name);
return getInstance(cls);
} catch (ClassNotFoundException e) {
throw new IllegalException(e);
}
}
}

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
/**
* Instance interface to get an instance
*/
public interface Instance {
/**
* get a instance
*
* @param cls Class object
* @return Information about instances created or found
*/
Object getInstance(Class<?> cls);
/**
* Gets an instance of a class with a recognizable identity,
* which can be the fully qualified name of class. It can also be a unique name in a container
*
* @param name Identifying name
* @return Information about instances created or found
*/
Object getInstance(String name);
}

@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
import cn.hippo4j.config.rpc.exception.ConnectionException;
import cn.hippo4j.config.rpc.handler.NettyClientPoolHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.pool.ChannelHealthChecker;
import io.netty.channel.pool.ChannelPool;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.pool.FixedChannelPool;
import io.netty.util.concurrent.Future;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
/**
* This parameter applies only to the connection pool of netty
*/
@Slf4j
public class NettyConnectPool {
ChannelHealthChecker healthCheck = ChannelHealthChecker.ACTIVE;
FixedChannelPool.AcquireTimeoutAction acquireTimeoutAction = FixedChannelPool.AcquireTimeoutAction.NEW;
int maxPendingAcquires = Integer.MAX_VALUE;
ChannelPoolHandler handler = new NettyClientPoolHandler();
ChannelPool pool;
String host;
int port;
public NettyConnectPool(String host, int port, int maxConnect,
long timeout, EventLoopGroup worker,
Class<? extends Channel> socketChannelCls) {
InetSocketAddress socketAddress = InetSocketAddress.createUnresolved(host, port);
Bootstrap bootstrap = new Bootstrap()
.group(worker)
.channel(socketChannelCls)
.remoteAddress(socketAddress);
this.host = host;
this.port = port;
this.pool = new FixedChannelPool(bootstrap, handler, healthCheck, acquireTimeoutAction,
timeout, maxConnect, maxPendingAcquires, true, true);
log.info("The connection pool is established with the connection target {}:{}", host, port);
NettyConnectPoolHolder.createPool(host, port, this);
}
public Channel acquire(long timeoutMillis) {
try {
Future<Channel> fch = pool.acquire();
return fch.get(timeoutMillis, TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new ConnectionException("Failed to get the connection", e);
}
}
public Future<Channel> acquire() {
try {
return pool.acquire();
} catch (Exception e) {
throw new ConnectionException("Failed to get the connection", e);
}
}
public void release(Channel channel) {
try {
if (channel != null) {
pool.release(channel);
}
} catch (Exception e) {
throw new ConnectionException("Failed to release the connection", e);
}
}
public void close() {
try {
pool.close();
NettyConnectPoolHolder.remove(host, port);
} catch (Exception e) {
throw new ConnectionException("Failed to close the connection pool", e);
}
}
}

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.EventExecutorGroup;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* To avoid creating multiple connection pools for the same host:port, save all connection pools of the client
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class NettyConnectPoolHolder {
static int maxConnect = 64;
static Map<String, NettyConnectPool> connectPoolMap = new ConcurrentHashMap<>();
private static NettyConnectPool initPool(String host, int port,
long timeout, EventLoopGroup worker) {
return new NettyConnectPool(
host, port, maxConnect,
timeout, worker,
NioSocketChannel.class);
}
private static String getKey(String host, int port) {
return host + ":" + port;
}
/**
* The connection pool connectPoolMapping may already exist before the connection pool
* connectPoolMapping is established. In this case, the connection pool is directly overwritten
*
* @param host the host
* @param port the port
* @param pool This parameter applies only to the connection pool of netty
*/
public static void createPool(String host, int port, NettyConnectPool pool) {
connectPoolMap.put(getKey(host, port), pool);
}
/**
* Gets a connection pool, or null if there is no corresponding connectPoolMapping
*
* @param host the host
* @param port the port
* @return Map to the connection pool
*/
public static NettyConnectPool getPool(String host, int port) {
return connectPoolMap.get(getKey(host, port));
}
/**
* Gets a connection pool, and if there is no connectPoolMapping, creates one with the values provided and joins the connectPoolMapping
*
* @param host the host
* @param port the port
* @param timeout timeout
* @param worker Special {@link EventExecutorGroup} which allows registering {@link Channel}s
* that get processed for later selection during the event loop.
* @return Map to the connection pool
*/
public static synchronized NettyConnectPool getPool(String host, int port,
long timeout, EventLoopGroup worker) {
/*
* this cannot use the computeIfAbsent method directly here because put is already used in init.
* Details refer to https://bugs.openjdk.java.net/browse/JDK-8062841
*/
NettyConnectPool pool = getPool(host, port);
return pool == null ? initPool(host, port, timeout, worker) : pool;
}
/**
* Disconnect a connection connectPoolMapping. This must take effect at the same time as the connection pool is closed
*
* @param host host
* @param port port
*/
public static void remove(String host, int port) {
connectPoolMap.remove(getKey(host, port));
}
/**
* clear
*/
public static void clear() {
connectPoolMap.clear();
}
}

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
import cn.hippo4j.config.rpc.request.DefaultRequest;
import cn.hippo4j.config.rpc.client.NettyClientConnection;
import cn.hippo4j.config.rpc.request.Request;
import cn.hippo4j.config.rpc.response.Response;
import cn.hippo4j.common.toolkit.IdUtil;
import cn.hippo4j.common.web.exception.IllegalException;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.util.HashMap;
import java.util.Map;
/**
* Add a proxy for the request, {@link Proxy} and {@link InvocationHandler}
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class NettyProxyCenter {
// cache
static Map<Class<?>, Object> map = new HashMap<>();
/**
* PRC
*
* @param cls
* @param host
* @param port
* @param <T>
* @return
*/
public static <T> T getProxy(Class<T> cls, String host, int port) {
NettyClientConnection connection = new NettyClientConnection(host, port);
return getProxy(connection, cls, host, port);
}
@SuppressWarnings("unchecked")
public static <T> T getProxy(NettyClientConnection connection, Class<T> cls, String host, int port) {
boolean b = cls.isInterface();
if (!b) {
throw new IllegalException(cls.getName() + "is not a Interface");
}
Object o = map.get(cls);
if (o != null) {
return (T) o;
}
T obj = (T) Proxy.newProxyInstance(
cls.getClassLoader(),
new Class[]{cls},
(proxy, method, args) -> {
String clsName = cls.getName();
String methodName = method.getName();
String key = host + port + clsName + methodName + IdUtil.simpleUUID();
Class<?>[] parameterTypes = method.getParameterTypes();
Request request = new DefaultRequest(key, clsName, methodName, parameterTypes, args);
Response response = connection.connect(request);
if (response == null) {
return null;
}
if (response.isErr()) {
throw new IllegalException(response.getErrMsg(), response.getThrowable());
}
return response.getObj();
});
map.put(cls, obj);
return obj;
}
}

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.LockSupport;
/**
* The staging results<br>
* The unique remote call can be determined by the key of request and
* response, and the result of the call is stored in the secondary cache,
* which is convenient for the client to use at any time.
*/
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class ResultHolder {
private static final Map<String, Object> map = new ConcurrentHashMap<>();
private static final Map<String, Thread> threadMap = new HashMap<>();
/**
* Writes when the client receives a response
*
* @param key Request and response keys
* @param o The result
*/
public static void put(String key, Object o) {
log.debug("Write the result, wake up the thread");
map.put(key, o);
}
/**
* Stores a thread that can be woken up and is in a waiting state
*
* @param key Request and response keys
* @param t The Thread
*/
public static void put(String key, Thread t) {
log.debug("Write thread, waiting to wake up");
threadMap.put(key, t);
}
/**
* Stores a thread that can be woken up and is in a waiting state
*
* @param key Request and response keys
*/
public static synchronized void wake(String key) {
log.debug("The future has been fetched, wake up the thread");
Thread thread = threadMap.remove(key);
LockSupport.unpark(thread);
}
/**
* Called when the client gets the response<br>
* After the result is obtained, the corresponding key is cleared from the cache<br>
* So it's only true when you first get the result
*
* @param key Request and response keys
* @return Response body
*/
@SuppressWarnings("unchecked")
public static <T> T get(String key) {
log.debug("Get the future");
return (T) map.remove(key);
}
}

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
import cn.hippo4j.common.config.ApplicationContextHolder;
/**
* Adapter Spring, The requested object is managed by spring
*/
public class SpringContextInstance implements Instance {
@Override
public Object getInstance(Class<?> cls) {
return ApplicationContextHolder.getBean(cls);
}
@Override
public Object getInstance(String name) {
return ApplicationContextHolder.getInstance().getBean(name);
}
}

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.client;
public class CallManager {
public int call() {
return 1;
}
}

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.client;
import cn.hippo4j.config.rpc.request.DefaultRequest;
import cn.hippo4j.config.rpc.request.Request;
import cn.hippo4j.config.rpc.response.Response;
import cn.hippo4j.config.rpc.server.NettyServerConnection;
import cn.hippo4j.config.rpc.server.RPCServer;
import cn.hippo4j.config.rpc.server.ServerConnection;
import cn.hippo4j.config.rpc.support.DefaultInstance;
import cn.hippo4j.config.rpc.support.Instance;
import cn.hippo4j.config.rpc.support.ClassRegistry;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class RPCClientTest {
String host = "localhost";
int port = 8888;
@Test
public void connection() throws IOException {
Class<CallManager> cls = CallManager.class;
String className = cls.getName();
ClassRegistry.put(className, cls);
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
ServerConnection connection = new NettyServerConnection(instance);
RPCServer rpcServer = new RPCServer(port, connection);
CompletableFuture.runAsync(rpcServer::bind);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
NettyClientConnection clientConnection = new NettyClientConnection(host, port);
RPCClient rpcClient = new RPCClient(clientConnection);
Request request = new DefaultRequest("127.0.0.18888", className, "call", null, null);
for (int i = 0; i < 100; i++) {
Response response = rpcClient.connection(request);
Assert.assertEquals(response.getObj(), 1);
}
rpcClient.close();
rpcServer.close();
}
}

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.server;
import cn.hippo4j.config.rpc.support.DefaultInstance;
import cn.hippo4j.config.rpc.support.Instance;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class RPCServerTest {
public static int port = 8888;
@Test
public void bind() throws IOException {
Instance instance = new DefaultInstance();
ServerConnection connection = new NettyServerConnection(instance);
RPCServer rpcServer = new RPCServer(port, connection);
CompletableFuture.runAsync(rpcServer::bind);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
rpcServer.close();
}
@Test
public void bindTest() throws IOException {
Instance instance = new DefaultInstance();
EventLoopGroup leader = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
ServerConnection connection = new NettyServerConnection(leader, worker, instance);
RPCServer rpcServer = new RPCServer(port, connection);
CompletableFuture.runAsync(rpcServer::bind);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
rpcServer.close();
}
}

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
import org.junit.Assert;
import org.junit.Test;
public class ClassRegistryTest {
@Test
public void get() {
String getStr = "GetModel";
Class<?> cls = ClassRegistry.get(getStr);
Assert.assertNull(cls);
ClassRegistry.put(getStr, GetModel.class);
Class<?> aClass = ClassRegistry.get(getStr);
Assert.assertNotNull(aClass);
ClassRegistry.clear();
}
@Test
public void set() {
String getStr = "GetModel";
ClassRegistry.set(getStr, GetModel.class);
Class<?> aClass = ClassRegistry.get(getStr);
Assert.assertEquals(aClass, GetModel.class);
ClassRegistry.set(getStr, SetModel.class);
Class<?> aClass1 = ClassRegistry.get(getStr);
Assert.assertEquals(aClass1, GetModel.class);
ClassRegistry.clear();
}
@Test
public void put() {
String getStr = "GetModel";
ClassRegistry.put(getStr, GetModel.class);
Class<?> aClass = ClassRegistry.get(getStr);
Assert.assertEquals(aClass, GetModel.class);
ClassRegistry.put(getStr, SetModel.class);
Class<?> aClass1 = ClassRegistry.get(getStr);
Assert.assertEquals(aClass1, SetModel.class);
ClassRegistry.clear();
}
public static class GetModel {
}
public static class SetModel {
}
}

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.junit.Assert;
import org.junit.Test;
public class DefaultInstanceTest {
Instance instance = new DefaultInstance();
@Test
public void getInstance() {
Class<InstanceModel> cls = InstanceModel.class;
Object instanceInstance = instance.getInstance(cls);
Assert.assertNotNull(instanceInstance);
Assert.assertEquals(cls, instanceInstance.getClass());
}
@Test
public void testGetInstance() {
String className = "cn.hippo4j.config.rpc.support.DefaultInstanceTest$InstanceModel";
Object instanceInstance = instance.getInstance(className);
Assert.assertNotNull(instanceInstance);
Assert.assertEquals(className, instanceInstance.getClass().getName());
}
@Setter
@Getter
@AllArgsConstructor
@NoArgsConstructor
public static class InstanceModel {
String name;
}
}

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.junit.Assert;
import org.junit.Test;
public class NettyConnectPoolHolderTest {
String host = "127.0.0.1";
int port = 8888;
int maxCount = 8;
int timeout = 5000;
EventLoopGroup group = new NioEventLoopGroup();
Class<? extends Channel> cls = NioSocketChannel.class;
@Test
public void createPool() {
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls);
NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port);
Assert.assertEquals(pool, connectPool);
NettyConnectPoolHolder.clear();
NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(host, port);
Assert.assertNull(connectPool1);
}
@Test
public void testGetPool() {
NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port, timeout, group);
NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(host, port);
Assert.assertEquals(connectPool1, connectPool);
NettyConnectPoolHolder.clear();
NettyConnectPool connectPool2 = NettyConnectPoolHolder.getPool(host, port);
Assert.assertNull(connectPool2);
}
@Test
public void remove() {
NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port, timeout, group);
NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(host, port);
Assert.assertEquals(connectPool1, connectPool);
NettyConnectPoolHolder.remove(host, port);
NettyConnectPool connectPool2 = NettyConnectPoolHolder.getPool(host, port);
Assert.assertNull(connectPool2);
}
}

@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
import cn.hippo4j.config.rpc.server.NettyServerConnection;
import cn.hippo4j.config.rpc.server.RPCServer;
import cn.hippo4j.config.rpc.server.ServerConnection;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.Future;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
public class NettyConnectPoolTest {
String host = "127.0.0.1";
int port = 8888;
int maxCount = 64;
int timeout = 5000;
EventLoopGroup group = new NioEventLoopGroup();
Class<? extends Channel> cls = NioSocketChannel.class;
@Test
public void acquire() throws IOException {
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
ServerConnection connection = new NettyServerConnection(instance);
RPCServer rpcServer = new RPCServer(port, connection);
CompletableFuture.runAsync(rpcServer::bind);
// Given the delay in starting the server, wait here
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls);
Channel acquire = pool.acquire(timeout);
Assert.assertNotNull(acquire);
pool.release(acquire);
rpcServer.close();
}
@Test
public void testAcquire() throws IOException {
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
ServerConnection connection = new NettyServerConnection(instance);
RPCServer rpcServer = new RPCServer(port, connection);
CompletableFuture.runAsync(rpcServer::bind);
// Given the delay in starting the server, wait here
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls);
Future<Channel> acquire = pool.acquire();
Assert.assertNotNull(acquire);
rpcServer.close();
}
@Test
public void close() throws IOException {
// The mode connection was denied when the server was started on the specified port
Instance instance = new DefaultInstance();
ServerConnection connection = new NettyServerConnection(instance);
RPCServer rpcServer = new RPCServer(port, connection);
CompletableFuture.runAsync(rpcServer::bind);
// Given the delay in starting the server, wait here
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls);
Channel acquire = pool.acquire(timeout);
Assert.assertNotNull(acquire);
pool.release(acquire);
pool.close();
rpcServer.close();
}
}

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
import cn.hippo4j.common.web.exception.IllegalException;
import org.junit.Assert;
import org.junit.Test;
public class NettyProxyCenterTest {
@Test
public void getProxy() {
ProxyInterface localhost = NettyProxyCenter.getProxy(ProxyInterface.class, "localhost", 8888);
Assert.assertNotNull(localhost);
}
@Test(expected = IllegalException.class)
public void getProxyTest() {
ProxyClass localhost = NettyProxyCenter.getProxy(ProxyClass.class, "localhost", 8888);
Assert.assertNotNull(localhost);
}
interface ProxyInterface {
}
static class ProxyClass {
}
}

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.config.rpc.support;
import cn.hippo4j.common.toolkit.IdUtil;
import org.junit.Assert;
import org.junit.Test;
public class ResultHolderTest {
@Test
public void test() {
String s1 = IdUtil.simpleUUID();
String o1 = s1 + "1";
String s2 = IdUtil.simpleUUID();
String o2 = s2 + "2";
ResultHolder.put(s1, o1);
ResultHolder.put(s2, o2);
Object r1 = ResultHolder.get(s1);
Object r2 = ResultHolder.get(s2);
Assert.assertEquals(r1, o1);
Assert.assertEquals(r2, o2);
}
}
Loading…
Cancel
Save