Modify some comments and code structure (#986)

* fix : Modify some comments and code structure

* fix : Modify some comments and code structure
pull/1372/head
pizihao 2 years ago committed by GitHub
parent d0ab6c932d
commit 4265c9d834
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -23,12 +23,37 @@ import cn.hippo4j.rpc.model.Response;
import java.io.Closeable;
/**
* the client for RPC, Explain the role of the client in the request
* <h3>CLIENT</h3>
* The highest level interface for the client, it does not care how to communicate with the server,
* nor can it know the specific connection information. The client plays the role of message producer
* in the whole connection.By sending a Request to the server ({@link Request}), the client will be
* able to communicate with the server. Wait for the server's Response ({@link Response})
* <h3>METHOD</h3>
* <ul>
* <li>{@link #connection(Request)}</li>
* <li>{@link #isActive()}</li>
* <li>{@link #close()}</li>
* </ul>
* You can usually use the client in this way:
* <pre>
* Request request = new Request();
* try(Client client = new Client()){
* Response response = client.connection(request);
* }
* </pre>
*
* <b>The client implements Closeable and supports automatic shutdown, However, you can manually
* disable it when you want to use it</b>
*
* @since 1.5.1
*/
public interface Client extends Closeable {
/**
* Start the client and try to send and receive data
*
* @param request Request information, Requested methods and parameters
* @return response Response from server side
*/
Response connection(Request request);

@ -22,7 +22,9 @@ import cn.hippo4j.rpc.model.Request;
import cn.hippo4j.rpc.model.Response;
/**
* Applicable to client connections
* Applicable to client connections<br>
*
* @since 1.5.1
*/
public interface ClientConnection extends Connection {

@ -37,10 +37,13 @@ import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.locks.LockSupport;
/**
* Client implemented using netty
*
* @since 1.5.1
*/
@Slf4j
public class NettyClientConnection implements ClientConnection {
@ -114,12 +117,12 @@ public class NettyClientConnection implements ClientConnection {
@Override
public void close() {
if (this.channel == null) {
return;
}
worker.shutdownGracefully();
this.future.channel().close();
this.channel.close();
Optional.ofNullable(this.channel)
.ifPresent(c -> {
worker.shutdownGracefully();
this.future.channel().close();
this.channel.close();
});
}
@Override

@ -23,7 +23,10 @@ import cn.hippo4j.rpc.model.Response;
import java.io.IOException;
/**
* The client, which provides a closing mechanism, maintains a persistent connection if not closed
* The client, which provides a closing mechanism, maintains a persistent connection if not closed<br>
* Delegate the method to the {@link ClientConnection} for implementation
*
* @since 1.5.1
*/
public class RPCClient implements Client {

@ -24,6 +24,8 @@ import java.io.OutputStream;
/**
* object OutputStream
*
* @since 1.5.1
*/
public class CompactObjectOutputStream extends ObjectOutputStream {

@ -1,50 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.rpc.coder;
import cn.hippo4j.rpc.exception.CoderException;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.serialization.ClassResolver;
import io.netty.handler.codec.serialization.ObjectDecoder;
/**
* According to the decoder for java objects implemented by ObjectDecoder,
* it is necessary to ensure that the transmitted objects can be serialized
*/
public class NettyDecoder extends ObjectDecoder {
public NettyDecoder(ClassResolver classResolver) {
super(classResolver);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) {
ByteBuf byteBuf = in.retainedDuplicate();
try {
Object o = super.decode(ctx, in);
if (o == null) {
return byteBuf;
} else {
return o;
}
} catch (Exception e) {
throw new CoderException("The encoding is abnormal, which may be caused by the failure of the transfer object to be deserialized");
}
}
}

@ -29,6 +29,8 @@ import java.io.Serializable;
/**
* this is a encoder, For custom gluing and unpacking<br>
* {@link io.netty.handler.codec.serialization.ObjectEncoder}
*
* @since 1.5.1
*/
public class NettyEncoder extends MessageToByteEncoder<Serializable> {

@ -25,6 +25,8 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* the registration center for Client and Server
*
* @since 1.5.1
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class ClassRegistry {

@ -27,6 +27,8 @@ import java.util.ServiceLoader;
* You simply create an instance of a class based on its name and specific type.
* Load through the ServiceLoader first. If the load fails, load directly through the instantiation.
* If it is an interface, throw an exception. This is not elegant implementation
*
* @since 1.5.1
*/
public class DefaultInstance implements Instance {

@ -22,6 +22,8 @@ import java.net.InetSocketAddress;
/**
* The adaptation layer of different service centers is used to know
* the host of different services through the registration center
*
* @since 1.5.1
*/
public interface DiscoveryAdapter {

@ -19,6 +19,8 @@ package cn.hippo4j.rpc.discovery;
/**
* Instance interface to get an instance
*
* @since 1.5.1
*/
public interface Instance {

@ -19,6 +19,8 @@ package cn.hippo4j.rpc.discovery;
/**
* Gets the top-level interface of the instance port
*
* @since 1.5.1
*/
@FunctionalInterface
public interface ServerPort {

@ -21,6 +21,8 @@ import cn.hippo4j.common.config.ApplicationContextHolder;
/**
* Adapter Spring, The requested object is managed by spring
*
* @since 1.5.1
*/
public class SpringContextInstance implements Instance {

@ -20,6 +20,8 @@ package cn.hippo4j.rpc.exception;
/**
* During decoding and encoding, if an exception occurs, an exception of type {@link CoderException} is thrown,
* which is not different from a {@link RuntimeException}, but is more explicit about the type of exception
*
* @since 1.5.1
*/
public class CoderException extends RuntimeException {

@ -21,6 +21,8 @@ package cn.hippo4j.rpc.exception;
* If an exception occurs during the connection between the server and the client, an exception of type
* {@link ConnectionException} is thrown, which is not different from {@link RuntimeException}, but is more explicit
* about the type of exception
*
* @since 1.5.1
*/
public class ConnectionException extends RuntimeException {

@ -20,6 +20,8 @@ package cn.hippo4j.rpc.exception;
/**
* If there is a timeout between the server and the client, you will get a {@link TimeOutException},
* which is not different from {@link RuntimeException}, but it will be more explicit about the type of exception, right
*
* @since 1.5.1
*/
public class TimeOutException extends RuntimeException {

@ -26,6 +26,8 @@ import java.util.stream.Collectors;
/**
* Processor manager for ChannelHandler in netty
*
* @since 1.5.1
*/
public abstract class AbstractNettyHandlerManager implements HandlerManager<ChannelHandler> {

@ -24,8 +24,12 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Optional;
/**
* the abstract base of {@link ConnectHandler} and {@link ChannelInboundHandlerAdapter}
*
* @since 1.5.1
*/
public abstract class AbstractNettyTakeHandler extends ChannelInboundHandlerAdapter implements ConnectHandler {
@ -42,9 +46,10 @@ public abstract class AbstractNettyTakeHandler extends ChannelInboundHandlerAdap
if (channel.isActive()) {
ctx.close();
}
if (cause != null) {
throw new ConnectionException(cause);
}
Optional.ofNullable(cause)
.ifPresent(t -> {
throw new ConnectionException(cause);
});
}
/**

@ -25,6 +25,7 @@ import cn.hippo4j.rpc.model.Response;
* must be specified, such as serialization and parsing, requesting and receiving
* requests, and so on<br>
*
* @since 1.5.1
*/
public interface ConnectHandler {

@ -20,7 +20,14 @@ package cn.hippo4j.rpc.handler;
import java.io.Closeable;
/**
* Represents a network request connection and provides IO layer support
* Represents a network request connection and provides IO layer support<br>
* <p>
* This is not a strict and stateless Connection interface, it contains the necessary
* operations that should be done in the connection. It is more like integrating the
* connection and the connection channel together, so creating {@link Connection} is
* very resource intensive, for which caching is recommended
*
* @since 1.5.1
*/
public interface Connection extends Closeable {

@ -17,12 +17,16 @@
package cn.hippo4j.rpc.handler;
import cn.hippo4j.common.web.exception.IllegalException;
import io.netty.channel.ChannelHandler;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* Manage the Handler used in the processing.<br>
* The Handler must be able to exist multiple times and be invoked once in a single execution
*
* @since 1.5.1
*/
public interface HandlerManager<T> {
@ -72,6 +76,14 @@ public interface HandlerManager<T> {
* @return HandlerEntity
*/
default HandlerEntity<T> getHandlerEntity(long order, T handler, String name) {
Class<?> cls = handler.getClass();
boolean b = cls.isAnnotationPresent(ChannelHandler.Sharable.class)
|| HandlerManager.class.isAssignableFrom(cls);
if (!b) {
throw new IllegalException("Join the execution of the handler must add io.netty.channel.ChannelHandler." +
"Sharable annotations, Please for the handler class " + cls.getName() + " add io.netty.channel." +
"ChannelHandler.Sharable annotation");
}
return new HandlerEntity<>(order, handler, name);
}

@ -32,6 +32,8 @@ import java.util.List;
/**
* Processing by the client connection pool handler to clean the buffer and define new connection properties
*
* @since 1.5.1
*/
@Slf4j
public class NettyClientPoolHandler extends AbstractNettyHandlerManager implements ChannelPoolHandler {

@ -24,6 +24,8 @@ import io.netty.channel.ChannelHandlerContext;
/**
* Interconnect with the netty mediation layer
*
* @since 1.5.1
*/
@ChannelHandler.Sharable
public class NettyClientTakeHandler extends AbstractNettyTakeHandler implements ConnectHandler {

@ -35,6 +35,8 @@ import java.util.List;
/**
* netty adaptation layer
*
* @since 1.5.1
*/
@ChannelHandler.Sharable
public class NettyServerTakeHandler extends AbstractNettyTakeHandler implements ConnectHandler {

@ -25,6 +25,8 @@ import java.util.Objects;
/**
* default request<br>
* Use the fully qualified name key of the interface and override equals and hashCode
*
* @since 1.5.1
*/
public final class DefaultRequest implements Request {

@ -25,6 +25,8 @@ import java.util.Objects;
/**
* default request<br>
* Use the fully qualified name key of the interface and override equals and hashCode
*
* @since 1.5.1
*/
public class DefaultResponse implements Response {

@ -21,6 +21,8 @@ import java.io.Serializable;
/**
* request
*
* @since 1.5.1
*/
public interface Request extends Serializable {

@ -21,6 +21,8 @@ import java.io.Serializable;
/**
* Response
*
* @since 1.5.1
*/
public interface Response extends Serializable {

@ -22,6 +22,8 @@ import cn.hippo4j.rpc.model.Response;
/**
* Callback while the connection is in progress
*
* @since 1.5.1
*/
public interface ActivePostProcess {

@ -30,6 +30,7 @@ import java.util.List;
* reference resources: spring HandlerExecutionChain
*
* @see ActivePostProcess
* @since 1.5.1
*/
@Slf4j
public final class ActiveProcessChain {

@ -37,6 +37,8 @@ import java.util.List;
/**
* adapter to the netty server
*
* @since 1.5.1
*/
@Slf4j
public class NettyServerConnection extends AbstractNettyHandlerManager implements ServerConnection {

@ -25,6 +25,8 @@ import java.util.concurrent.CompletableFuture;
/**
* Server Implementation
*
* @since 1.5.1
*/
public class RPCServer implements Server {

@ -21,6 +21,8 @@ import java.io.Closeable;
/**
* the service for RPC, Explain the role of the service in the request
*
* @since 1.5.1
*/
public interface Server extends Closeable {

@ -22,6 +22,8 @@ import cn.hippo4j.rpc.handler.Connection;
/**
* This applies to server-side connections
*
* @since 1.5.1
*/
public interface ServerConnection extends Connection {

@ -17,6 +17,7 @@
package cn.hippo4j.rpc.support;
import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.rpc.client.Client;
import cn.hippo4j.rpc.discovery.DiscoveryAdapter;
import cn.hippo4j.rpc.exception.ConnectionException;
@ -30,11 +31,30 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import java.net.InetSocketAddress;
import java.util.Optional;
/**
* A FactoryBean that builds interfaces to invoke proxy objects
* is responsible for managing the entire life cycle of the proxy objects<br>
* <h3>APPLICATION START</h3>
* When the application is started, the request initiator needs to complete the proxy of the calling interface,
* which ensures that the method can be requested to the server side when the method is called, rather than simply
* request an interface that cannot be instantiated. The classes involved in adding proxy to the interface are:
* <ul>
* <li>{@link NettyClientSupport}</li>
* <li>{@link NettyProxyCenter}</li>
* <li>{@link NettyClientPoolHandler}</li>
* </ul>
* <h3>AND SPRING</h3>
* In order to fully integrate {@link ClientFactoryBean} into the life cycle of spring beans,
* {@link ClientFactoryBean} also needs to implement the following interfaces:
* <ul>
* <li>{@link InitializingBean}</li>
* <li>{@link ApplicationContextAware}</li>
* <li>{@link DisposableBean}</li>
* </ul>
*
* @since 1.5.1
* @deprecated With {@link cn.hippo4j.config.service.ThreadPoolAdapterService} structure, FactoryBean is not the best choice
*/
@Deprecated
@ -52,10 +72,16 @@ public class ClientFactoryBean implements FactoryBean<Object>, InitializingBean,
*/
private String discoveryAdapterName;
/**
* The adaptation interface for obtaining ip information in the registry is used together with
* {@link #discoveryAdapterName}, so that the adapter implementation can be obtained in the container
* during the initialization phase
*/
private DiscoveryAdapter discoveryAdapter;
/**
* the channel handler
* the channel handler, To ensure the security and reliability of netty calls,
* {@link ChannelHandler} must be identified by {@link ChannelHandler.Sharable}
*/
private ChannelHandler[] handlers;
@ -70,11 +96,13 @@ public class ClientFactoryBean implements FactoryBean<Object>, InitializingBean,
private ApplicationContext applicationContext;
/**
* InetSocketAddress
* InetSocketAddress, It is usually converted from {@link #applicationName} and {@link #discoveryAdapter}
*/
InetSocketAddress address;
public ClientFactoryBean(String applicationName, String discoveryAdapterName, Class<?> cls) {
Assert.notNull(applicationName);
Assert.notNull(cls);
this.applicationName = applicationName;
this.discoveryAdapterName = discoveryAdapterName;
this.cls = cls;
@ -82,16 +110,17 @@ public class ClientFactoryBean implements FactoryBean<Object>, InitializingBean,
@Override
public Object getObject() throws Exception {
this.address = discoveryAdapter.getSocketAddress(applicationName);
if (this.address == null) {
String[] addressStr = applicationName.split(":");
if (addressStr.length < 2) {
throw new ConnectionException("Failed to connect to the server because the IP address is invalid. Procedure");
}
this.address = InetSocketAddress.createUnresolved(addressStr[0], Integer.parseInt(addressStr[1]));
}
NettyClientPoolHandler handler = new NettyClientPoolHandler(handlers);
Client client = NettyClientSupport.getClient(this.address, handler);
this.address = Optional.ofNullable(applicationName)
.map(a -> discoveryAdapter.getSocketAddress(a))
.map(a -> {
String[] addressStr = applicationName.split(":");
if (addressStr.length < 2) {
throw new ConnectionException("Failed to connect to the server because the IP address is invalid. Procedure");
}
return InetSocketAddress.createUnresolved(addressStr[0], Integer.parseInt(addressStr[1]));
})
.orElseThrow(() -> new ConnectionException("Failed to connect to the server because the IP address is invalid. Procedure"));
Client client = NettyClientSupport.getClient(this.address, new NettyClientPoolHandler(handlers));
return NettyProxyCenter.createProxy(client, cls, this.address);
}
@ -102,15 +131,15 @@ public class ClientFactoryBean implements FactoryBean<Object>, InitializingBean,
@Override
public void afterPropertiesSet() throws Exception {
this.discoveryAdapter = (DiscoveryAdapter) applicationContext.getBean(discoveryAdapterName);
this.discoveryAdapter = Optional.ofNullable(discoveryAdapterName)
.map(s -> (DiscoveryAdapter) applicationContext.getBean(discoveryAdapterName))
.orElse(null);
}
@Override
public void destroy() throws Exception {
if (this.address == null) {
return;
}
NettyClientSupport.closeClient(this.address);
Optional.ofNullable(this.address)
.ifPresent(a -> NettyClientSupport.closeClient(this.address));
}
@Override

@ -33,6 +33,7 @@ import java.io.IOException;
import java.lang.ref.WeakReference;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
/**
@ -51,6 +52,7 @@ import java.util.concurrent.ConcurrentHashMap;
* @see cn.hippo4j.rpc.client.NettyClientConnection
* @see NettyServerSupport
* @see ClientFactoryBean
* @since 1.5.1
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class NettyClientSupport {
@ -97,12 +99,13 @@ public final class NettyClientSupport {
*/
public static void closeClient(InetSocketAddress address) {
Client client = clientMap.remove(address);
try {
if (client != null) {
client.close();
}
} catch (IOException e) {
throw new IllegalException(e);
}
Optional.ofNullable(client)
.ifPresent(c -> {
try {
c.close();
} catch (IOException e) {
throw new IllegalException(e);
}
});
}
}

@ -30,10 +30,13 @@ import io.netty.util.concurrent.Future;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
/**
* This parameter applies only to the connection pool of netty
*
* @since 1.5.1
*/
@Slf4j
public class NettyConnectPool {
@ -84,14 +87,15 @@ public class NettyConnectPool {
}
public void release(Channel channel) {
try {
if (channel != null) {
pool.release(channel);
}
} catch (Exception e) {
NettyClientSupport.closeClient(address);
throw new ConnectionException("Failed to release the connection", e);
}
Optional.ofNullable(channel)
.ifPresent(c -> {
try {
pool.release(channel);
} catch (Exception e) {
NettyClientSupport.closeClient(address);
throw new ConnectionException("Failed to release the connection", e);
}
});
}
public void close() {

@ -31,6 +31,8 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* To avoid creating multiple connection pools for the same host:port, save all connection pools of the client
*
* @since 1.5.1
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class NettyConnectPoolHolder {

@ -36,6 +36,8 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* Add a proxy for the request, {@link Proxy} and {@link InvocationHandler}
*
* @since 1.5.1
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class NettyProxyCenter {

@ -40,6 +40,7 @@ import java.util.List;
* @see RPCServer
* @see NettyServerConnection
* @see NettyClientSupport
* @since 1.5.1
*/
public class NettyServerSupport implements Server {

@ -31,6 +31,8 @@ import java.util.concurrent.locks.LockSupport;
* The unique remote call can be determined by the key of request and
* response, and the result of the call is stored in the secondary cache,
* which is convenient for the client to use at any time.
*
* @since 1.5.1
*/
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)

@ -19,7 +19,7 @@ package cn.hippo4j.rpc.handler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
@ChannelHandler.Sharable
public class TestHandler implements ChannelHandler {
@Override

Loading…
Cancel
Save