diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/client/Client.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/client/Client.java new file mode 100644 index 00000000..f634e0aa --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/client/Client.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.client; + +import cn.hippo4j.config.rpc.request.Request; +import cn.hippo4j.config.rpc.response.Response; + +import java.io.Closeable; + +/** + * the client for RPC, Explain the role of the client in the request + * + */ +public interface Client extends Closeable { + + /** + * Start the client and try to send and receive data + */ + Response connection(Request request); +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/client/ClientConnection.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/client/ClientConnection.java new file mode 100644 index 00000000..a4e1b6ee --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/client/ClientConnection.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.client; + +import cn.hippo4j.config.rpc.handler.Connection; +import cn.hippo4j.config.rpc.request.Request; +import cn.hippo4j.config.rpc.response.Response; + +/** + * Applicable to client connections + * + */ +public interface ClientConnection extends Connection { + + /** + * Establish a connection and process + * + * @param request Request information + */ + Response connect(Request request); + + /** + * Get timeout, ms + */ + long timeout(); +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/client/NettyClientConnection.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/client/NettyClientConnection.java new file mode 100644 index 00000000..f2b385b1 --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/client/NettyClientConnection.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.client; + +import cn.hippo4j.config.rpc.exception.TimeOutException; +import cn.hippo4j.config.rpc.process.ActivePostProcess; +import cn.hippo4j.config.rpc.request.Request; +import cn.hippo4j.config.rpc.response.Response; +import cn.hippo4j.config.rpc.support.NettyConnectPool; +import cn.hippo4j.config.rpc.support.NettyConnectPoolHolder; +import cn.hippo4j.config.rpc.support.ResultHolder; +import cn.hippo4j.common.toolkit.Assert; +import cn.hippo4j.common.web.exception.IllegalException; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import lombok.extern.slf4j.Slf4j; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.locks.LockSupport; + +/** + * Client implemented using netty + */ +@Slf4j +public class NettyClientConnection implements ClientConnection { + + // TODO InetAddress + String host; + Integer port; + // Obtain the connection timeout period. The default value is 3s + long timeout = 30000L; + Channel channel; + EventLoopGroup worker = new NioEventLoopGroup(); + List activeProcesses; + ChannelFuture future; + NettyConnectPool connectionPool; + + public NettyClientConnection(String host, int port, + List activeProcesses) { + Assert.notNull(worker); + this.host = host; + this.port = port; + this.activeProcesses = activeProcesses; + this.connectionPool = NettyConnectPoolHolder.getPool(host, port, timeout, worker); + } + + public NettyClientConnection(String host, int port) { + this(host, port, new LinkedList<>()); + } + + @Override + public Response connect(Request request) { + preHandlers(request); + this.channel = connectionPool.acquire(timeout); + try { + String key = request.getKey(); + this.future = channel.writeAndFlush(request); + log.info("Call successful, target address is {}:{}, request key is {}", host, port, key); + // Wait for execution to complete + ResultHolder.put(key, Thread.currentThread()); + LockSupport.parkNanos(timeout() * 1000000); + Response response = ResultHolder.get(key); + if (response == null) { + throw new TimeOutException("Timeout waiting for server-side response"); + } + postHandlers(request, response); + log.info("The response from {}:{} was received successfully with the response key {}.", host, port, key); + return response; + } catch (Exception ex) { + afterCompletions(request, null, ex); + throw new IllegalException(ex); + } finally { + connectionPool.release(this.channel); + } + } + + @Override + public long timeout() { + return timeout; + } + + @Override + public void close() { + if (this.channel == null) { + return; + } + worker.shutdownGracefully(); + this.future.channel().close(); + this.channel.close(); + } + + private void preHandlers(Request request) { + for (ActivePostProcess process : activeProcesses) { + process.preHandler(request); + } + } + + private void postHandlers(Request request, Response response) { + for (ActivePostProcess process : activeProcesses) { + process.postHandler(request, response); + } + } + + private void afterCompletions(Request request, Response response, Exception e) { + for (ActivePostProcess process : activeProcesses) { + process.afterCompletion(request, response, e); + } + } + +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/client/RPCClient.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/client/RPCClient.java new file mode 100644 index 00000000..72678da4 --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/client/RPCClient.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.client; + +import cn.hippo4j.config.rpc.request.Request; +import cn.hippo4j.config.rpc.response.Response; + +import java.io.IOException; + +/** + * The client, which provides a closing mechanism, maintains a persistent connection if not closed + * + */ +public class RPCClient implements Client { + + ClientConnection clientConnection; + + public RPCClient(ClientConnection clientConnection) { + this.clientConnection = clientConnection; + } + + @Override + public Response connection(Request request) { + return clientConnection.connect(request); + } + + /** + * Close the client and release all connections. + * + * @throws IOException exception + */ + @Override + public void close() throws IOException { + clientConnection.close(); + } +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/coder/CompactObjectOutputStream.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/coder/CompactObjectOutputStream.java new file mode 100644 index 00000000..a8592ef0 --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/coder/CompactObjectOutputStream.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.coder; + +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.io.ObjectStreamClass; +import java.io.OutputStream; + +/** + * object OutputStream + */ +public class CompactObjectOutputStream extends ObjectOutputStream { + + static final int TYPE_FAT_DESCRIPTOR = 0; + static final int TYPE_THIN_DESCRIPTOR = 1; + + public CompactObjectOutputStream(OutputStream out) throws IOException { + super(out); + } + + @Override + protected void writeStreamHeader() throws IOException { + writeByte(STREAM_VERSION); + } + + @Override + protected void writeClassDescriptor(ObjectStreamClass desc) throws IOException { + Class clazz = desc.forClass(); + if (clazz.isPrimitive() || clazz.isArray() || clazz.isInterface() || desc.getSerialVersionUID() == 0) { + write(TYPE_FAT_DESCRIPTOR); + super.writeClassDescriptor(desc); + } else { + write(TYPE_THIN_DESCRIPTOR); + writeUTF(desc.getName()); + } + } +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/coder/NettyDecoder.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/coder/NettyDecoder.java new file mode 100644 index 00000000..78ab14aa --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/coder/NettyDecoder.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.coder; + +import cn.hippo4j.config.rpc.exception.CoderException; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.serialization.ClassResolver; +import io.netty.handler.codec.serialization.ObjectDecoder; + +/** + * According to the decoder for java objects implemented by ObjectDecoder, + * it is necessary to ensure that the transmitted objects can be serialized + * + */ +public class NettyDecoder extends ObjectDecoder { + + public NettyDecoder(ClassResolver classResolver) { + super(classResolver); + } + + @Override + protected Object decode(ChannelHandlerContext ctx, ByteBuf in) { + ByteBuf byteBuf = in.retainedDuplicate(); + try { + Object o = super.decode(ctx, in); + if (o == null) { + return byteBuf; + } else { + return o; + } + } catch (Exception e) { + throw new CoderException("The encoding is abnormal, which may be caused by the failure of the transfer object to be deserialized"); + } + } +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/coder/NettyEncoder.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/coder/NettyEncoder.java new file mode 100644 index 00000000..32c6374e --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/coder/NettyEncoder.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.coder; + +import cn.hippo4j.config.rpc.exception.CoderException; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToByteEncoder; + +import java.io.ObjectOutputStream; +import java.io.Serializable; + +/** + * this is a encoder + * + */ +public class NettyEncoder extends MessageToByteEncoder { + + private static final byte[] BYTE = new byte[4]; + + @Override + protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) throws Exception { + int startIndex = out.writerIndex(); + try (ByteBufOutputStream outPut = new ByteBufOutputStream(out)) { + outPut.write(BYTE); + try (ObjectOutputStream outputStream = new CompactObjectOutputStream(outPut)) { + outputStream.writeObject(msg); + outputStream.flush(); + } + } catch (Exception e) { + throw new CoderException("The encoding is abnormal, which may be caused by the transfer object being unable to be serialized"); + } + int endIndex = out.writerIndex(); + out.setInt(startIndex, endIndex - startIndex - 4); + } +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/discovery/DiscoveryAdapter.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/discovery/DiscoveryAdapter.java new file mode 100644 index 00000000..0026d0b4 --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/discovery/DiscoveryAdapter.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.discovery; + +import java.net.InetSocketAddress; + +/** + * The adaptation layer of different service centers is used to know + * the host of different services through the registration center + * + */ +public interface DiscoveryAdapter { + + /** + * get InetSocketAddress served in the registry + * + * @param name server name + * @return InetSocketAddress + */ + InetSocketAddress getSocketAddress(String name); + +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/exception/CoderException.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/exception/CoderException.java new file mode 100644 index 00000000..3e5d897c --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/exception/CoderException.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.exception; + +/** + * + */ +public class CoderException extends RuntimeException { + + private static final long serialVersionUID = 8247610319171014183L; + + public CoderException() { + super(); + } + + public CoderException(String message) { + super(message); + } + + public CoderException(Throwable e) { + super(e.getMessage(), e); + } + + public CoderException(String message, Throwable throwable) { + super(message, throwable); + } + + public CoderException(String message, Throwable throwable, boolean enableSuppression, boolean writableStackTrace) { + super(message, throwable, enableSuppression, writableStackTrace); + } + +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/exception/ConnectionException.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/exception/ConnectionException.java new file mode 100644 index 00000000..7b690b61 --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/exception/ConnectionException.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.exception; + +/** + * + */ +public class ConnectionException extends RuntimeException { + + private static final long serialVersionUID = 8247610319171014183L; + + public ConnectionException() { + super(); + } + + public ConnectionException(String message) { + super(message); + } + + public ConnectionException(Throwable e) { + super(e.getMessage(), e); + } + + public ConnectionException(String message, Throwable throwable) { + super(message, throwable); + } + + public ConnectionException(String message, Throwable throwable, boolean enableSuppression, boolean writableStackTrace) { + super(message, throwable, enableSuppression, writableStackTrace); + } + +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/exception/TimeOutException.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/exception/TimeOutException.java new file mode 100644 index 00000000..a99e6f57 --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/exception/TimeOutException.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.exception; + +/** + * + */ +public class TimeOutException extends RuntimeException { + + private static final long serialVersionUID = 8247610319171014183L; + + public TimeOutException() { + super(); + } + + public TimeOutException(String message) { + super(message); + } + + public TimeOutException(Throwable e) { + super(e.getMessage(), e); + } + + public TimeOutException(String message, Throwable throwable) { + super(message, throwable); + } + + public TimeOutException(String message, Throwable throwable, boolean enableSuppression, boolean writableStackTrace) { + super(message, throwable, enableSuppression, writableStackTrace); + } + +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/handler/ConnectHandler.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/handler/ConnectHandler.java new file mode 100644 index 00000000..46d71d03 --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/handler/ConnectHandler.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.handler; + +import cn.hippo4j.config.rpc.request.Request; +import cn.hippo4j.config.rpc.response.Response; + +/** + * The handler in each connection, where the specific behavior of the connection + * must be specified, such as serialization and parsing, requesting and receiving + * requests, and so on + * + */ +public interface ConnectHandler { + + /** + * Processing after receiving the request + * + * @param request request + */ + default Response handler(Request request) { + return null; + } + + /** + * Processing after receiving Response + * + * @param response response + */ + default void handler(Response response) { + // + } + +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/handler/Connection.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/handler/Connection.java new file mode 100644 index 00000000..ce46f8be --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/handler/Connection.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.handler; + +import java.io.Closeable; + +/** + * Represents a network request connection and provides IO layer support + * + */ +public interface Connection extends Closeable { + +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/handler/NettyClientPoolHandler.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/handler/NettyClientPoolHandler.java new file mode 100644 index 00000000..d11415f2 --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/handler/NettyClientPoolHandler.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.handler; + +import cn.hippo4j.config.rpc.coder.NettyDecoder; +import cn.hippo4j.config.rpc.coder.NettyEncoder; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.pool.ChannelPoolHandler; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.serialization.ClassResolvers; +import lombok.extern.slf4j.Slf4j; + +/** + * Processing by the client connection pool handler to clean the buffer and define new connection properties + * + */ +@Slf4j +public class NettyClientPoolHandler implements ChannelPoolHandler { + + @Override + public void channelReleased(Channel ch) { + ch.writeAndFlush(Unpooled.EMPTY_BUFFER); + log.info("The connection buffer has been emptied of data"); + } + + @Override + public void channelAcquired(Channel ch) { + // NO SOMETHING + } + + @Override + public void channelCreated(Channel ch) { + NioSocketChannel channel = (NioSocketChannel) ch; + channel.config() + .setTcpNoDelay(false); + ch.pipeline().addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null))); + ch.pipeline().addLast(new NettyEncoder()); + ch.pipeline().addLast(new NettyClientTakeHandler()); + } +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/handler/NettyClientTakeHandler.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/handler/NettyClientTakeHandler.java new file mode 100644 index 00000000..f23fec36 --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/handler/NettyClientTakeHandler.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.handler; + +import cn.hippo4j.config.rpc.exception.ConnectionException; +import cn.hippo4j.config.rpc.support.ResultHolder; +import cn.hippo4j.config.rpc.response.Response; +import cn.hippo4j.common.web.exception.IllegalException; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +/** + * Interconnect with the netty mediation layer + * + */ +public class NettyClientTakeHandler extends ChannelInboundHandlerAdapter implements ConnectHandler { + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + try { + Response response = (Response) msg; + handler(response); + ctx.flush(); + } catch (Exception e) { + ctx.close(); + throw new IllegalException(e); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + super.exceptionCaught(ctx, cause); + Channel channel = ctx.channel(); + if (channel.isActive()) { + ctx.close(); + } else { + throw new ConnectionException(cause); + } + } + + @Override + public void handler(Response response) { + ResultHolder.put(response.getKey(), response); + ResultHolder.wake(response.getKey()); + } +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/handler/NettyServerTakeHandler.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/handler/NettyServerTakeHandler.java new file mode 100644 index 00000000..555ef608 --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/handler/NettyServerTakeHandler.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.handler; + +import cn.hippo4j.config.rpc.exception.ConnectionException; +import cn.hippo4j.config.rpc.process.ActivePostProcess; +import cn.hippo4j.config.rpc.response.DefaultResponse; +import cn.hippo4j.config.rpc.support.ClassRegistry; +import cn.hippo4j.config.rpc.support.Instance; +import cn.hippo4j.config.rpc.request.Request; +import cn.hippo4j.config.rpc.response.Response; +import cn.hippo4j.common.toolkit.Assert; +import cn.hippo4j.common.toolkit.ReflectUtil; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +import java.lang.reflect.Method; +import java.util.LinkedList; +import java.util.List; + +/** + * netty adaptation layer + * + */ +public class NettyServerTakeHandler extends ChannelInboundHandlerAdapter implements ConnectHandler { + + List processes; + Instance instance; + + public NettyServerTakeHandler(List processes, Instance instance) { + this.processes = processes; + this.instance = instance; + } + + public NettyServerTakeHandler(Instance instance) { + this(new LinkedList<>(), instance); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + if (!(msg instanceof Request)) { + return; + } + Request request = (Request) msg; + Response response = handler(request); + ctx.writeAndFlush(response); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + Channel channel = ctx.channel(); + if (channel.isActive()) { + ctx.close(); + } else { + throw new ConnectionException(cause); + } + } + + @Override + public Response handler(Request request) { + if (!preHandlers(request)) { + return null; + } + try { + Class cls = ClassRegistry.get(request.getClassName()); + Method method = ReflectUtil.getMethodByName(cls, request.getMethodName(), request.getParameterTypes()); + Assert.notNull(method); + Object invoke = ReflectUtil.invoke(instance.getInstance(cls), method, request.getParameters()); + Response response = new DefaultResponse(request.getKey(), invoke.getClass(), invoke); + postHandlers(request, response); + return response; + } catch (Exception e) { + Response response = new DefaultResponse(request.getKey(), e, e.getMessage()); + afterCompletions(request, response, e); + return response; + } + } + + private boolean preHandlers(Request request) { + for (ActivePostProcess process : processes) { + if (!process.preHandler(request)) { + return false; + } + } + return true; + } + + private void postHandlers(Request request, Response response) { + for (ActivePostProcess process : processes) { + process.postHandler(request, response); + } + } + + private void afterCompletions(Request request, Response response, Exception e) { + for (ActivePostProcess process : processes) { + process.afterCompletion(request, response, e); + } + } + +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/process/ActivePostProcess.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/process/ActivePostProcess.java new file mode 100644 index 00000000..3e4078ca --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/process/ActivePostProcess.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.process; + +import cn.hippo4j.config.rpc.request.Request; +import cn.hippo4j.config.rpc.response.Response; + +/** + * Callback while the connection is in progress + * + */ +public interface ActivePostProcess { + + /** + * Client: After establishing a connection and before passing parameters
+ * Server: Receives parameters and performs pre-call operations
+ * + * @param request request + * @return Whether to continue the execution. If it is a client, the returned value does not affect subsequent execution + */ + default boolean preHandler(Request request) { + return true; + } + + /** + * Client: Action after receiving a response
+ * Server: performs the operation after the call
+ * + * @param request request + * @param response response + */ + default void postHandler(Request request, Response response) { + // NO SOMETHING + } + + /** + * Called when an exception or resource is cleaned + * + * @param request request + * @param response response + * @param e Exception + */ + default void afterCompletion(Request request, Response response, Exception e) { + // NO SOMETHING + } +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/request/DefaultRequest.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/request/DefaultRequest.java new file mode 100644 index 00000000..aa215e9c --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/request/DefaultRequest.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.request; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.Objects; + +/** + * default request
+ * Use the fully qualified name key of the interface and override equals and hashCode + * + */ +public final class DefaultRequest implements Request { + + String key; + String className; + String methodName; + Class[] parameterTypes; + transient Object[] parameters; + + public DefaultRequest(String key, String className, String methodName, Class[] parameterTypes, Object[] parameters) { + this.key = key; + this.className = className; + this.methodName = methodName; + this.parameterTypes = parameterTypes; + this.parameters = parameters; + } + + @Override + public String getKey() { + return key; + } + + @Override + public String getClassName() { + return className; + } + + @Override + public String getMethodName() { + return methodName; + } + + @Override + public Class[] getParameterTypes() { + return parameterTypes; + } + + @Override + public Object[] getParameters() { + return parameters; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + DefaultRequest that = (DefaultRequest) o; + return Objects.equals(key, that.key) + && Objects.equals(className, that.className) + && Objects.equals(methodName, that.methodName); + } + + @Override + public int hashCode() { + return Objects.hash(key, className, methodName); + } + + /** + * Redefine the behavior of serialization, that is, re-acquire the initially serialized + * data from the stream and re-serialize it. Simple serialization will result in the + * loss of the field identified by transient. + */ + private void writeObject(ObjectOutputStream s) throws IOException { + s.defaultWriteObject(); + if (parameters == null) { + return; + } + // 序列化属性 parameters + for (Object parameter : parameters) { + s.writeObject(parameter); + } + } + + /** + * Redefine the deserialization behavior, and sequentially deserialize the data specified during + * serialization, because there is data that is not deserialized during initial deserialization, + * such as fields defined by transient + */ + private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException { + s.defaultReadObject(); + if (parameterTypes == null) { + return; + } + // 反序列化属性 parameters + int length = parameterTypes.length; + Object[] a = new Object[length]; + for (int i = 0; i < length; i++) { + a[i] = s.readObject(); + } + this.parameters = a; + } +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/request/Request.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/request/Request.java new file mode 100644 index 00000000..919573c3 --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/request/Request.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.request; + +import java.io.Serializable; + +/** + * request + * + */ +public interface Request extends Serializable { + + /** + * The unique identity of the current request + */ + String getKey(); + + /** + * The Class name of the current request + */ + String getClassName(); + + /** + * The Method name of the current request + */ + String getMethodName(); + + /** + * The parameter type of the current request + */ + Class[] getParameterTypes(); + + /** + * The parameters of the current request + */ + Object[] getParameters(); + +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/response/DefaultResponse.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/response/DefaultResponse.java new file mode 100644 index 00000000..eca6e17d --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/response/DefaultResponse.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.response; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.Objects; + +/** + * default request
+ * Use the fully qualified name key of the interface and override equals and hashCode + * + */ +public class DefaultResponse implements Response { + + String key; + Class cls; + transient Object obj; + Throwable throwable; + String errMsg; + + public DefaultResponse(String key, Class cls, Object obj, Throwable throwable, String errMsg) { + this.key = key; + this.cls = cls; + this.obj = obj; + this.throwable = throwable; + this.errMsg = errMsg; + } + + public DefaultResponse(String key, Throwable throwable, String errMsg) { + this(key, null, null, throwable, errMsg); + } + + public DefaultResponse(String key, Class cls, Object obj) { + this(key, cls, obj, null, null); + } + + @Override + public String getKey() { + return key; + } + + @Override + public Class getCls() { + return cls; + } + + @Override + public Object getObj() { + return obj; + } + + @Override + public Throwable getThrowable() { + return throwable; + } + + @Override + public String getErrMsg() { + return errMsg; + } + + @Override + public boolean isErr() { + return throwable != null || errMsg != null; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + DefaultResponse that = (DefaultResponse) o; + return Objects.equals(key, that.key) && Objects.equals(cls, that.cls); + } + + @Override + public int hashCode() { + return Objects.hash(key, cls); + } + + /** + * Redefine the behavior of serialization, that is, re-acquire the initially serialized + * data from the stream and re-serialize it. Simple serialization will result in the + * loss of the field identified by transient. + */ + private void writeObject(ObjectOutputStream s) throws IOException { + s.defaultWriteObject(); + if (obj == null) { + return; + } + // 序列化属性 obj + s.writeObject(this.obj); + } + + /** + * Redefine the deserialization behavior, and sequentially deserialize the data specified during + * serialization, because there is data that is not deserialized during initial deserialization, + * such as fields defined by transient + */ + private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException { + s.defaultReadObject(); + // 反序列化属性 obj + this.obj = s.readObject(); + } +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/response/Response.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/response/Response.java new file mode 100644 index 00000000..3c06fbaa --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/response/Response.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.response; + +import java.io.Serializable; + +/** + * Response + */ +public interface Response extends Serializable { + + /** + * The unique identity of the current Response + */ + String getKey(); + + /** + * The class of the current Response, The target of deserialization + */ + Class getCls(); + + /** + * The results of this request can be obtained, The source of deserialization + */ + Object getObj(); + + /** + * The Throwable of the current Response + */ + Throwable getThrowable(); + + /** + * the error message + */ + String getErrMsg(); + + /** + * Whether the current request has an error + */ + boolean isErr(); + +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/server/NettyServerConnection.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/server/NettyServerConnection.java new file mode 100644 index 00000000..cb240442 --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/server/NettyServerConnection.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.server; + +import cn.hippo4j.config.rpc.coder.NettyDecoder; +import cn.hippo4j.config.rpc.coder.NettyEncoder; +import cn.hippo4j.config.rpc.handler.NettyServerTakeHandler; +import cn.hippo4j.config.rpc.process.ActivePostProcess; +import cn.hippo4j.config.rpc.support.Instance; +import cn.hippo4j.common.toolkit.Assert; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.serialization.ClassResolvers; +import lombok.extern.slf4j.Slf4j; + +import java.util.LinkedList; +import java.util.List; + +/** + * adapter to the netty server + */ +@Slf4j +public class NettyServerConnection implements ServerConnection { + + Integer port; + EventLoopGroup leader; + EventLoopGroup worker; + Class socketChannelCls = NioServerSocketChannel.class; + List processes; + Instance instance; + ChannelFuture future; + + public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, List processes, Instance instance) { + Assert.notNull(processes); + Assert.notNull(instance); + Assert.notNull(leader); + Assert.notNull(worker); + this.leader = leader; + this.worker = worker; + this.processes = processes; + this.instance = instance; + } + + public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, Instance instance) { + this(leader, worker, new LinkedList<>(), instance); + } + + public NettyServerConnection(List processes, Instance instance) { + this(new NioEventLoopGroup(), new NioEventLoopGroup(), processes, instance); + } + + public NettyServerConnection(Instance instance) { + this(new NioEventLoopGroup(), new NioEventLoopGroup(), new LinkedList<>(), instance); + } + + @Override + public void bind(int port) { + ServerBootstrap server = new ServerBootstrap(); + server.group(leader, worker) + .channel(socketChannelCls) + .childOption(ChannelOption.TCP_NODELAY, true) + .childHandler(new ChannelInitializer() { + + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast(new NettyDecoder(ClassResolvers.cacheDisabled(null))); + ch.pipeline().addLast(new NettyEncoder()); + ch.pipeline().addLast(new NettyServerTakeHandler(processes, instance)); + } + }); + try { + this.future = server.bind(port); + log.info("The server is started and can receive requests. The listening port is {}", port); + this.port = port; + this.future.channel().closeFuture().sync(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + + @Override + public void close() { + if (port == null) { + return; + } + leader.shutdownGracefully(); + worker.shutdownGracefully(); + this.future.channel().close(); + log.info("The server is shut down and no more requests are received. The release port is {}", port); + } +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/server/RPCServer.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/server/RPCServer.java new file mode 100644 index 00000000..bcbe4cef --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/server/RPCServer.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.server; + +import java.io.IOException; + +/** + * Server Implementation + * + */ +public class RPCServer implements Server { + + int port; + ServerConnection serverConnection; + + public RPCServer(int port, ServerConnection serverConnection) { + this.port = port; + this.serverConnection = serverConnection; + } + + @Override + public void bind() { + serverConnection.bind(port); + } + + /** + * Shut down the server and release the port + */ + @Override + public void close() throws IOException { + serverConnection.close(); + } +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/server/Server.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/server/Server.java new file mode 100644 index 00000000..8bd300ff --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/server/Server.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.server; + +import java.io.Closeable; + +/** + * the service for RPC, Explain the role of the service in the request + * + */ +public interface Server extends Closeable { + + /** + * Start the server. Attempt to listen on the port and receive the request.
+ * If the port being processed is already bound, an exception is thrown + */ + void bind(); + +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/server/ServerConnection.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/server/ServerConnection.java new file mode 100644 index 00000000..b527de9b --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/server/ServerConnection.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.server; + +import cn.hippo4j.config.rpc.handler.Connection; + +/** + * This applies to server-side connections + * + */ +public interface ServerConnection extends Connection { + + /** + * Bind ports and process them + */ + void bind(int port); + +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/ClassRegistry.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/ClassRegistry.java new file mode 100644 index 00000000..75264db4 --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/ClassRegistry.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.support; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * the registration center for Client and Server + * + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class ClassRegistry { + + private static final Map> serverRegister = new ConcurrentHashMap<>(); + + /** + * get a Obj in Registry center
+ * + * @param s key + * @return t element + */ + public static Class get(String s) { + return serverRegister.get(s); + } + + /** + * add the element to Registry Table
+ * if the key already exists, failure, and return before the value of the key.
+ * if success return the element + * + * @param s key + * @param cls element + * @return final mapped value + */ + public static Class set(String s, Class cls) { + return serverRegister.putIfAbsent(s, cls); + } + + /** + * add the element to Registry Table
+ * if the key already exists, failure, replace it + * + * @param s key + * @param cls element + */ + public static Class put(String s, Class cls) { + return serverRegister.put(s, cls); + } + + /** + * clear + */ + public static void clear() { + serverRegister.clear(); + } +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/DefaultInstance.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/DefaultInstance.java new file mode 100644 index 00000000..e2592351 --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/DefaultInstance.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.support; + +import cn.hippo4j.common.toolkit.ReflectUtil; +import cn.hippo4j.common.web.exception.IllegalException; + +/** + * Simply creating an instance of a class by its name and its specific type, + * and then throwing an exception if it is an interface, is not elegant + * + */ +public class DefaultInstance implements Instance { + + @Override + public Object getInstance(Class cls) { + return ReflectUtil.createInstance(cls); + } + + @Override + public Object getInstance(String name) { + try { + Class cls = Class.forName(name); + return getInstance(cls); + } catch (ClassNotFoundException e) { + throw new IllegalException(e); + } + } +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/Instance.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/Instance.java new file mode 100644 index 00000000..1f058e54 --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/Instance.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.support; + +/** + * Instance interface to get an instance + * + */ +public interface Instance { + + /** + * get a instance + * + * @param cls Class object + * @return Information about instances created or found + */ + Object getInstance(Class cls); + + /** + * Gets an instance of a class with a recognizable identity, + * which can be the fully qualified name of class. It can also be a unique name in a container + * + * @param name Identifying name + * @return Information about instances created or found + */ + Object getInstance(String name); + +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/NettyConnectPool.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/NettyConnectPool.java new file mode 100644 index 00000000..f0d09299 --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/NettyConnectPool.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.support; + +import cn.hippo4j.config.rpc.exception.ConnectionException; +import cn.hippo4j.config.rpc.handler.NettyClientPoolHandler; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.pool.ChannelHealthChecker; +import io.netty.channel.pool.ChannelPool; +import io.netty.channel.pool.ChannelPoolHandler; +import io.netty.channel.pool.FixedChannelPool; +import io.netty.util.concurrent.Future; +import lombok.extern.slf4j.Slf4j; + +import java.net.InetSocketAddress; +import java.util.concurrent.TimeUnit; + +/** + * This parameter applies only to the connection pool of netty + * + */ +@Slf4j +public class NettyConnectPool { + + ChannelHealthChecker healthCheck = ChannelHealthChecker.ACTIVE; + FixedChannelPool.AcquireTimeoutAction acquireTimeoutAction = FixedChannelPool.AcquireTimeoutAction.NEW; + int maxPendingAcquires = Integer.MAX_VALUE; + ChannelPoolHandler handler = new NettyClientPoolHandler(); + ChannelPool pool; + String host; + int port; + + public NettyConnectPool(String host, int port, int maxConnect, + long timeout, EventLoopGroup worker, + Class socketChannelCls) { + InetSocketAddress socketAddress = InetSocketAddress.createUnresolved(host, port); + Bootstrap bootstrap = new Bootstrap() + .group(worker) + .channel(socketChannelCls) + .remoteAddress(socketAddress); + this.host = host; + this.port = port; + this.pool = new FixedChannelPool(bootstrap, handler, healthCheck, acquireTimeoutAction, + timeout, maxConnect, maxPendingAcquires, true, true); + log.info("The connection pool is established with the connection target {}:{}", host, port); + NettyConnectPoolHolder.createPool(host, port, this); + } + + public Channel acquire(long timeoutMillis) { + try { + Future fch = pool.acquire(); + return fch.get(timeoutMillis, TimeUnit.MILLISECONDS); + } catch (Exception e) { + throw new ConnectionException("Failed to get the connection", e); + } + } + + public Future acquire() { + try { + return pool.acquire(); + } catch (Exception e) { + throw new ConnectionException("Failed to get the connection", e); + } + } + + public void release(Channel channel) { + try { + if (channel != null) { + pool.release(channel); + } + } catch (Exception e) { + throw new ConnectionException("Failed to release the connection", e); + } + } + + public void close() { + try { + pool.close(); + NettyConnectPoolHolder.remove(host, port); + } catch (Exception e) { + throw new ConnectionException("Failed to close the connection pool", e); + } + } +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/NettyConnectPoolHolder.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/NettyConnectPoolHolder.java new file mode 100644 index 00000000..9a1ca731 --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/NettyConnectPoolHolder.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.support; + +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.concurrent.EventExecutorGroup; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * To avoid creating multiple connection pools for the same host:port, save all connection pools of the client + * + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class NettyConnectPoolHolder { + + static int maxConnect = 64; + + // TODO InetSocketAddress + static Map map = new ConcurrentHashMap<>(); + + private static NettyConnectPool initPool(String host, int port, + long timeout, EventLoopGroup worker) { + return new NettyConnectPool( + host, port, maxConnect, + timeout, worker, + NioSocketChannel.class); + } + + private static String getKey(String host, int port) { + return host + ":" + port; + } + + /** + * The connection pool mapping may already exist before the connection pool + * mapping is established. In this case, the connection pool is directly overwritten + * + * @param host the host + * @param port the port + * @param pool This parameter applies only to the connection pool of netty + */ + public static void createPool(String host, int port, NettyConnectPool pool) { + map.put(getKey(host, port), pool); + } + + /** + * Gets a connection pool, or null if there is no corresponding mapping + * + * @param host the host + * @param port the port + * @return Map to the connection pool + */ + public static NettyConnectPool getPool(String host, int port) { + return map.get(getKey(host, port)); + } + + /** + * Gets a connection pool, and if there is no mapping, creates one with the values provided and joins the mapping + * + * @param host the host + * @param port the port + * @param timeout timeout + * @param worker Special {@link EventExecutorGroup} which allows registering {@link Channel}s + * that get processed for later selection during the event loop. + * @return Map to the connection pool + */ + public static synchronized NettyConnectPool getPool(String host, int port, + long timeout, EventLoopGroup worker) { + /* + * this cannot use the computeIfAbsent method directly here because put is already used in init. + * Details refer to https://bugs.openjdk.java.net/browse/JDK-8062841 + */ + NettyConnectPool pool = getPool(host, port); + return pool == null ? initPool(host, port, timeout, worker) : pool; + } + + /** + * Disconnect a connection mapping. This must take effect at the same time as the connection pool is closed + * + * @param host host + * @param port port + */ + public static void remove(String host, int port) { + map.remove(getKey(host, port)); + } + + /** + * clear + */ + public static void clear() { + map.clear(); + } +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/NettyProxyCenter.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/NettyProxyCenter.java new file mode 100644 index 00000000..21ec3283 --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/NettyProxyCenter.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.support; + +import cn.hippo4j.config.rpc.request.DefaultRequest; +import cn.hippo4j.config.rpc.client.NettyClientConnection; +import cn.hippo4j.config.rpc.request.Request; +import cn.hippo4j.config.rpc.response.Response; +import cn.hippo4j.common.toolkit.IdUtil; +import cn.hippo4j.common.web.exception.IllegalException; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Proxy; +import java.util.HashMap; +import java.util.Map; + +/** + * Add a proxy for the request, {@link Proxy} and {@link InvocationHandler} + * + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class NettyProxyCenter { + + static Map, Object> map = new HashMap<>(); + + /** + * 通过一个接口得到一个适用于PRC的代理对象 + * + * @param cls 接口类型 + * @param host 请求地址 + * @param port 端口 + * @param 对象类型 + * @return 代理对象 + */ + public static T getProxy(Class cls, String host, int port) { + NettyClientConnection connection = new NettyClientConnection(host, port); + return getProxy(connection, cls, host, port); + } + + @SuppressWarnings("unchecked") + public static T getProxy(NettyClientConnection connection, Class cls, String host, int port) { + boolean b = cls.isInterface(); + if (!b) { + throw new IllegalException(cls.getName() + "is not a Interface"); + } + Object o = map.get(cls); + if (o != null) { + return (T) o; + } + T obj = (T) Proxy.newProxyInstance( + cls.getClassLoader(), + new Class[]{cls}, + (proxy, method, args) -> { + String clsName = cls.getName(); + String methodName = method.getName(); + String key = host + port + clsName + methodName + IdUtil.simpleUUID(); + Class[] parameterTypes = method.getParameterTypes(); + Request request = new DefaultRequest(key, clsName, methodName, parameterTypes, args); + Response response = connection.connect(request); + if (response == null) { + return null; + } + if (response.isErr()) { + throw new IllegalException(response.getErrMsg(), response.getThrowable()); + } + return response.getObj(); + }); + map.put(cls, obj); + return obj; + } +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/ResultHolder.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/ResultHolder.java new file mode 100644 index 00000000..1ac3fc97 --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/ResultHolder.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.support; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.LockSupport; + +/** + * The staging results
+ * The unique remote call can be determined by the key of request and + * response, and the result of the call is stored in the secondary cache, + * which is convenient for the client to use at any time. + * + */ +@Slf4j +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class ResultHolder { + + private static final Map map = new ConcurrentHashMap<>(); + private static final Map threadMap = new HashMap<>(); + + /** + * Writes when the client receives a response + * + * @param key Request and response keys + * @param o The result + */ + public static void put(String key, Object o) { + log.debug("Write the result, wake up the thread"); + map.put(key, o); + } + + /** + * Stores a thread that can be woken up and is in a waiting state + * + * @param key Request and response keys + * @param t The Thread + */ + public static void put(String key, Thread t) { + log.debug("Write thread, waiting to wake up"); + threadMap.put(key, t); + } + + /** + * Stores a thread that can be woken up and is in a waiting state + * + * @param key Request and response keys + */ + public static synchronized void wake(String key) { + log.debug("The future has been fetched, wake up the thread"); + Thread thread = threadMap.remove(key); + LockSupport.unpark(thread); + } + + /** + * Called when the client gets the response
+ * After the result is obtained, the corresponding key is cleared from the cache
+ * So it's only true when you first get the result + * + * @param key Request and response keys + * @return Response body + */ + @SuppressWarnings("unchecked") + public static T get(String key) { + log.debug("Get the future"); + return (T) map.remove(key); + } + +} diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/SpringContextInstance.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/SpringContextInstance.java new file mode 100644 index 00000000..e6d547d2 --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/SpringContextInstance.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.support; + +import cn.hippo4j.common.config.ApplicationContextHolder; + +/** + * Adapter Spring, The requested object is managed by spring + * + */ +public class SpringContextInstance implements Instance { + + @Override + public Object getInstance(Class cls) { + return ApplicationContextHolder.getBean(cls); + } + + @Override + public Object getInstance(String name) { + return ApplicationContextHolder.getInstance().getBean(name); + } + +} diff --git a/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/client/CallManager.java b/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/client/CallManager.java new file mode 100644 index 00000000..d3fae3ae --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/client/CallManager.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.client; + +public class CallManager { + + public int call() { + return 1; + } + +} diff --git a/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/client/RPCClientTest.java b/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/client/RPCClientTest.java new file mode 100644 index 00000000..54c2633e --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/client/RPCClientTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.client; + +import cn.hippo4j.config.rpc.request.DefaultRequest; +import cn.hippo4j.config.rpc.request.Request; +import cn.hippo4j.config.rpc.response.Response; +import cn.hippo4j.config.rpc.server.NettyServerConnection; +import cn.hippo4j.config.rpc.server.RPCServer; +import cn.hippo4j.config.rpc.server.ServerConnection; +import cn.hippo4j.config.rpc.support.DefaultInstance; +import cn.hippo4j.config.rpc.support.Instance; +import cn.hippo4j.config.rpc.support.ClassRegistry; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +public class RPCClientTest { + + String host = "localhost"; + int port = 8888; + + @Test + public void connection() throws IOException { + + Class cls = CallManager.class; + String className = cls.getName(); + ClassRegistry.put(className, cls); + // The mode connection was denied when the server was started on the specified port + Instance instance = new DefaultInstance(); + ServerConnection connection = new NettyServerConnection(instance); + RPCServer rpcServer = new RPCServer(port, connection); + CompletableFuture.runAsync(rpcServer::bind); + try { + TimeUnit.SECONDS.sleep(3); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + NettyClientConnection clientConnection = new NettyClientConnection(host, port); + RPCClient rpcClient = new RPCClient(clientConnection); + Request request = new DefaultRequest("127.0.0.18888", className, "call", null, null); + for (int i = 0; i < 100; i++) { + Response response = rpcClient.connection(request); + Assert.assertEquals(response.getObj(), 1); + } + rpcClient.close(); + rpcServer.close(); + } + +} \ No newline at end of file diff --git a/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/server/RPCServerTest.java b/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/server/RPCServerTest.java new file mode 100644 index 00000000..726ca910 --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/server/RPCServerTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.server; + +import cn.hippo4j.config.rpc.support.DefaultInstance; +import cn.hippo4j.config.rpc.support.Instance; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +public class RPCServerTest { + + public static int port = 8888; + + @Test + public void bind() throws IOException { + Instance instance = new DefaultInstance(); + ServerConnection connection = new NettyServerConnection(instance); + RPCServer rpcServer = new RPCServer(port, connection); + CompletableFuture.runAsync(rpcServer::bind); + try { + TimeUnit.SECONDS.sleep(3); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + rpcServer.close(); + } + + @Test + public void bindTest() throws IOException { + Instance instance = new DefaultInstance(); + EventLoopGroup leader = new NioEventLoopGroup(); + EventLoopGroup worker = new NioEventLoopGroup(); + ServerConnection connection = new NettyServerConnection(leader, worker, instance); + RPCServer rpcServer = new RPCServer(port, connection); + CompletableFuture.runAsync(rpcServer::bind); + try { + TimeUnit.SECONDS.sleep(3); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + rpcServer.close(); + } + +} \ No newline at end of file diff --git a/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/ClassRegistryTest.java b/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/ClassRegistryTest.java new file mode 100644 index 00000000..403a7114 --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/ClassRegistryTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.support; + +import org.junit.Assert; +import org.junit.Test; + +public class ClassRegistryTest { + + @Test + public void get() { + String getStr = "GetModel"; + Class cls = ClassRegistry.get(getStr); + Assert.assertNull(cls); + ClassRegistry.put(getStr, GetModel.class); + Class aClass = ClassRegistry.get(getStr); + Assert.assertNotNull(aClass); + ClassRegistry.clear(); + } + + @Test + public void set() { + String getStr = "GetModel"; + ClassRegistry.set(getStr, GetModel.class); + Class aClass = ClassRegistry.get(getStr); + Assert.assertEquals(aClass, GetModel.class); + ClassRegistry.set(getStr, SetModel.class); + Class aClass1 = ClassRegistry.get(getStr); + Assert.assertEquals(aClass1, GetModel.class); + ClassRegistry.clear(); + } + + @Test + public void put() { + String getStr = "GetModel"; + ClassRegistry.put(getStr, GetModel.class); + Class aClass = ClassRegistry.get(getStr); + Assert.assertEquals(aClass, GetModel.class); + ClassRegistry.put(getStr, SetModel.class); + Class aClass1 = ClassRegistry.get(getStr); + Assert.assertEquals(aClass1, SetModel.class); + ClassRegistry.clear(); + } + + public static class GetModel { + + } + + public static class SetModel { + + } +} \ No newline at end of file diff --git a/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/DefaultInstanceTest.java b/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/DefaultInstanceTest.java new file mode 100644 index 00000000..6d070446 --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/DefaultInstanceTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.support; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import org.junit.Assert; +import org.junit.Test; + +public class DefaultInstanceTest { + + Instance instance = new DefaultInstance(); + + @Test + public void getInstance() { + Class cls = InstanceModel.class; + Object instanceInstance = instance.getInstance(cls); + Assert.assertNotNull(instanceInstance); + Assert.assertEquals(cls, instanceInstance.getClass()); + } + + @Test + public void testGetInstance() { + String className = "cn.hippo4j.config.rpc.support.DefaultInstanceTest$InstanceModel"; + Object instanceInstance = instance.getInstance(className); + Assert.assertNotNull(instanceInstance); + Assert.assertEquals(className, instanceInstance.getClass().getName()); + } + + @Setter + @Getter + @AllArgsConstructor + @NoArgsConstructor + public static class InstanceModel { + + String name; + } +} \ No newline at end of file diff --git a/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/NettyConnectPoolHolderTest.java b/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/NettyConnectPoolHolderTest.java new file mode 100644 index 00000000..f510ecd5 --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/NettyConnectPoolHolderTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.support; + +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import org.junit.Assert; +import org.junit.Test; + +public class NettyConnectPoolHolderTest { + + String host = "127.0.0.1"; + int port = 8888; + int maxCount = 8; + int timeout = 5000; + EventLoopGroup group = new NioEventLoopGroup(); + Class cls = NioSocketChannel.class; + + @Test + public void createPool() { + NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls); + NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port); + Assert.assertEquals(pool, connectPool); + NettyConnectPoolHolder.clear(); + NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(host, port); + Assert.assertNull(connectPool1); + } + + @Test + public void testGetPool() { + NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port, timeout, group); + NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(host, port); + Assert.assertEquals(connectPool1, connectPool); + NettyConnectPoolHolder.clear(); + NettyConnectPool connectPool2 = NettyConnectPoolHolder.getPool(host, port); + Assert.assertNull(connectPool2); + } + + @Test + public void remove() { + NettyConnectPool connectPool = NettyConnectPoolHolder.getPool(host, port, timeout, group); + NettyConnectPool connectPool1 = NettyConnectPoolHolder.getPool(host, port); + Assert.assertEquals(connectPool1, connectPool); + NettyConnectPoolHolder.remove(host, port); + NettyConnectPool connectPool2 = NettyConnectPoolHolder.getPool(host, port); + Assert.assertNull(connectPool2); + } +} \ No newline at end of file diff --git a/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/NettyConnectPoolTest.java b/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/NettyConnectPoolTest.java new file mode 100644 index 00000000..622e575d --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/NettyConnectPoolTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.support; + +import cn.hippo4j.config.rpc.server.NettyServerConnection; +import cn.hippo4j.config.rpc.server.RPCServer; +import cn.hippo4j.config.rpc.server.ServerConnection; +import io.netty.channel.Channel; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.concurrent.Future; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +public class NettyConnectPoolTest { + + String host = "127.0.0.1"; + int port = 8888; + int maxCount = 64; + int timeout = 5000; + EventLoopGroup group = new NioEventLoopGroup(); + Class cls = NioSocketChannel.class; + + @Test + public void acquire() throws IOException { + // The mode connection was denied when the server was started on the specified port + Instance instance = new DefaultInstance(); + ServerConnection connection = new NettyServerConnection(instance); + RPCServer rpcServer = new RPCServer(port, connection); + CompletableFuture.runAsync(rpcServer::bind); + // Given the delay in starting the server, wait here + try { + TimeUnit.SECONDS.sleep(3); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls); + Channel acquire = pool.acquire(timeout); + Assert.assertNotNull(acquire); + pool.release(acquire); + rpcServer.close(); + } + + @Test + public void testAcquire() throws IOException { + // The mode connection was denied when the server was started on the specified port + Instance instance = new DefaultInstance(); + ServerConnection connection = new NettyServerConnection(instance); + RPCServer rpcServer = new RPCServer(port, connection); + CompletableFuture.runAsync(rpcServer::bind); + // Given the delay in starting the server, wait here + try { + TimeUnit.SECONDS.sleep(3); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls); + Future acquire = pool.acquire(); + Assert.assertNotNull(acquire); + rpcServer.close(); + } + + @Test + public void close() throws IOException { + // The mode connection was denied when the server was started on the specified port + Instance instance = new DefaultInstance(); + ServerConnection connection = new NettyServerConnection(instance); + RPCServer rpcServer = new RPCServer(port, connection); + CompletableFuture.runAsync(rpcServer::bind); + // Given the delay in starting the server, wait here + try { + TimeUnit.SECONDS.sleep(3); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + NettyConnectPool pool = new NettyConnectPool(host, port, maxCount, timeout, group, cls); + Channel acquire = pool.acquire(timeout); + Assert.assertNotNull(acquire); + pool.release(acquire); + pool.close(); + rpcServer.close(); + } +} \ No newline at end of file diff --git a/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/NettyProxyCenterTest.java b/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/NettyProxyCenterTest.java new file mode 100644 index 00000000..9ae4ac50 --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/NettyProxyCenterTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.support; + +import cn.hippo4j.common.web.exception.IllegalException; +import org.junit.Assert; +import org.junit.Test; + +public class NettyProxyCenterTest { + + @Test + public void getProxy() { + ProxyInterface localhost = NettyProxyCenter.getProxy(ProxyInterface.class, "localhost", 8888); + Assert.assertNotNull(localhost); + } + + @Test(expected = IllegalException.class) + public void getProxyTest() { + ProxyClass localhost = NettyProxyCenter.getProxy(ProxyClass.class, "localhost", 8888); + Assert.assertNotNull(localhost); + } + interface ProxyInterface { + + } + + static class ProxyClass { + + } +} \ No newline at end of file diff --git a/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/ResultHolderTest.java b/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/ResultHolderTest.java new file mode 100644 index 00000000..3defaf3b --- /dev/null +++ b/hippo4j-server/hippo4j-config/src/test/java/cn/hippo4j/config/rpc/support/ResultHolderTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.config.rpc.support; + +import cn.hippo4j.common.toolkit.IdUtil; +import org.junit.Assert; +import org.junit.Test; + +public class ResultHolderTest { + + @Test + public void test() { + String s1 = IdUtil.simpleUUID(); + String o1 = s1 + "1"; + String s2 = IdUtil.simpleUUID(); + String o2 = s2 + "2"; + + ResultHolder.put(s1, o1); + ResultHolder.put(s2, o2); + + Object r1 = ResultHolder.get(s1); + Object r2 = ResultHolder.get(s2); + + Assert.assertEquals(r1, o1); + Assert.assertEquals(r2, o2); + } +} \ No newline at end of file