diff --git a/.codecov.yml b/.codecov.yml index 74099668..273a5e5f 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -20,9 +20,10 @@ coverage: project: default: target: auto - # adjust accordingly based on how flaky your tests are - # this allows a 10% drop from the previous base commit coverage threshold: 10% + patch: + target: auto + threshold: 10% ignore: - "hippo4j-example/.*" - "docs/.*" diff --git a/docs/docs/community/developer.md b/docs/docs/community/developer.md index 3979b12f..89b57b88 100644 --- a/docs/docs/community/developer.md +++ b/docs/docs/community/developer.md @@ -61,11 +61,18 @@ sidebar_position: 2
If you use Server mode, you can view the thread pool operation in the built-in console. + *
If you use Config mode, you can observe with Prometheus and Grafana. + * + *
The annotation is normally marked on the + * spring bean defined by {@link java.util.concurrent.ThreadPoolExecutor}. + * + *
Can also be marked on the following types:
+ *
+ * @see java.util.concurrent.Executor
+ * @see java.util.concurrent.ExecutorService
+ * @see org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
+ * @see com.alibaba.ttl.threadpool.ExecutorTtlWrapper
+ * @see com.alibaba.ttl.threadpool.ExecutorServiceTtlWrapper
+ * @since 1.0
*/
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java
index 71cc5bed..e4feda74 100644
--- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java
@@ -45,7 +45,7 @@ import java.util.concurrent.atomic.AtomicLong;
public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor implements DisposableBean {
/**
- * wait for tasks to complete on shutdown
+ * Wait for tasks to complete on shutdown
*/
@Getter
@Setter
@@ -54,23 +54,23 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
/**
* Creates a new {@code DynamicThreadPoolExecutor} with the given initial parameters.
*
- * @param threadPoolId thread-pool id
- * @param executeTimeOut execute time out
+ * @param threadPoolId thread-pool id
+ * @param executeTimeOut execute time out
* @param waitForTasksToCompleteOnShutdown wait for tasks to complete on shutdown
- * @param awaitTerminationMillis await termination millis
- * @param corePoolSize the number of threads to keep in the pool, even
- * if they are idle, unless {@code allowCoreThreadTimeOut} is set
- * @param maximumPoolSize the maximum number of threads to allow in the
- * pool
- * @param keepAliveTime when the number of threads is greater than
- * the core, this is the maximum time that excess idle threads
- * will wait for new tasks before terminating.
- * @param unit the time unit for the {@code keepAliveTime} argument
- * @param blockingQueue the queue to use for holding tasks before they are
- * executed. This queue will hold only the {@code Runnable}
- * tasks submitted by the {@code execute} method.
- * @param threadFactory the factory to use when the executor creates a new thread
- * @param rejectedExecutionHandler the handler to use when execution is blocked because the thread bounds and queue capacities are reached
+ * @param awaitTerminationMillis await termination millis
+ * @param corePoolSize the number of threads to keep in the pool, even
+ * if they are idle, unless {@code allowCoreThreadTimeOut} is set
+ * @param maximumPoolSize the maximum number of threads to allow in the
+ * pool
+ * @param keepAliveTime when the number of threads is greater than
+ * the core, this is the maximum time that excess idle threads
+ * will wait for new tasks before terminating.
+ * @param unit the time unit for the {@code keepAliveTime} argument
+ * @param blockingQueue the queue to use for holding tasks before they are
+ * executed. This queue will hold only the {@code Runnable}
+ * tasks submitted by the {@code execute} method.
+ * @param threadFactory the factory to use when the executor creates a new thread
+ * @param rejectedExecutionHandler the handler to use when execution is blocked because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds: Before calling the parent class method, {@link ExecuteAwarePlugin#beforeExecute} will be called first.
*
- * @param thread the thread that will run task {@code r}
+ * @param thread the thread that will run task {@code r}
* @param runnable the task that will be executed
*/
@Override
@@ -145,9 +142,9 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements
*
* After calling the superclass method, {@link ExecuteAwarePlugin#afterExecute} will be called last.
*
- * @param runnable the runnable that has completed
+ * @param runnable the runnable that has completed
* @param throwable the exception that caused termination, or null if
- * execution completed normally
+ * execution completed normally
*/
@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {
@@ -191,7 +188,7 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements
}
/**
- * {@inheritDoc}.
+ * {@inheritDoc}
*
* Before calling the superclass method, {@link ShutdownAwarePlugin#afterTerminated} will be called first.
*/
@@ -291,22 +288,21 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements
private static class RejectedAwareHandlerWrapper implements RejectedExecutionHandler {
/**
- * thread-pool action aware registry
+ * Thread-pool action aware registry
*/
private final ThreadPoolPluginManager registry;
/**
- * original target
+ * Original target
*/
- @NonNull
@Setter
@Getter
private RejectedExecutionHandler handler;
/**
- * Call {@link RejectedAwarePlugin#beforeRejectedExecution}, then reject the task
+ * Call {@link RejectedAwarePlugin#beforeRejectedExecution}, then reject the task.
*
- * @param r the runnable task requested to be executed
+ * @param r the runnable task requested to be executed
* @param executor the executor attempting to execute this task
*/
@Override
@@ -315,6 +311,5 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements
rejectedAwarePluginList.forEach(aware -> aware.beforeRejectedExecution(r, executor));
handler.rejectedExecution(r, executor);
}
-
}
}
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java
index 7b832473..62c8bf53 100644
--- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java
@@ -29,7 +29,6 @@ import cn.hippo4j.message.request.AlarmNotifyRequest;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.message.service.Hippo4jSendMessageService;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
-import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
@@ -37,7 +36,14 @@ import org.springframework.boot.CommandLineRunner;
import java.util.List;
import java.util.Objects;
-import java.util.concurrent.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
/**
* Thread-pool notify alarm handler.
@@ -46,7 +52,6 @@ import java.util.concurrent.*;
@RequiredArgsConstructor
public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner {
- @NonNull
private final Hippo4jSendMessageService hippo4jSendMessageService;
@Value("${spring.profiles.active:UNKNOWN}")
@@ -96,8 +101,8 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
/**
* Check thread pool capacity alarm.
*
- * @param threadPoolId
- * @param threadPoolExecutor
+ * @param threadPoolId thread-pool id
+ * @param threadPoolExecutor thread-pool executor
*/
public void checkPoolCapacityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) {
ThreadPoolNotifyAlarm alarmConfig = GlobalNotifyAlarmManage.get(threadPoolId);
@@ -119,12 +124,12 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
/**
* Check thread pool activity alarm.
*
- * @param threadPoolId
- * @param threadPoolExecutor
+ * @param threadPoolId thread-pool id
+ * @param threadPoolExecutor thread-pool executor
*/
public void checkPoolActivityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) {
ThreadPoolNotifyAlarm alarmConfig = GlobalNotifyAlarmManage.get(threadPoolId);
- if (Objects.isNull(alarmConfig) || !alarmConfig.getAlarm() || alarmConfig.getCapacityAlarm() <= 0) {
+ if (Objects.isNull(alarmConfig) || !alarmConfig.getAlarm() || alarmConfig.getActiveAlarm() <= 0) {
return;
}
int activeCount = threadPoolExecutor.getActiveCount();
@@ -141,7 +146,7 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
/**
* Async send rejected alarm.
*
- * @param threadPoolId
+ * @param threadPoolId thread-pool id
*/
public void asyncSendRejectedAlarm(String threadPoolId) {
Runnable checkPoolRejectedAlarmTask = () -> {
@@ -162,10 +167,10 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
/**
* Async send execute time out alarm.
*
- * @param threadPoolId
- * @param executeTime
- * @param executeTimeOut
- * @param threadPoolExecutor
+ * @param threadPoolId thread-pool id
+ * @param executeTime execute time
+ * @param executeTimeOut execute time-out
+ * @param threadPoolExecutor thread-pool executor
*/
public void asyncSendExecuteTimeOutAlarm(String threadPoolId, long executeTime, long executeTimeOut, ThreadPoolExecutor threadPoolExecutor) {
ThreadPoolNotifyAlarm alarmConfig = GlobalNotifyAlarmManage.get(threadPoolId);
@@ -193,7 +198,7 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
/**
* Send pool config change.
*
- * @param request
+ * @param request change parameter notify request
*/
public void sendPoolConfigChange(ChangeParameterNotifyRequest request) {
request.setActive(active.toUpperCase());
@@ -206,7 +211,7 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
/**
* Build alarm notify request.
*
- * @param threadPoolExecutor
+ * @param threadPoolExecutor thread-pool executor
* @return
*/
public AlarmNotifyRequest buildAlarmNotifyRequest(ThreadPoolExecutor threadPoolExecutor) {
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/AbstractThreadPoolRuntime.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/AbstractThreadPoolRuntime.java
index 6287b24a..8d9ca68f 100644
--- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/AbstractThreadPoolRuntime.java
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/AbstractThreadPoolRuntime.java
@@ -35,18 +35,18 @@ import java.util.concurrent.ThreadPoolExecutor;
public abstract class AbstractThreadPoolRuntime {
/**
- * Supplement.
+ * Supplemental thread pool runtime information.
*
- * @param threadPoolRunStateInfo
- * @return
+ * @param threadPoolRunStateInfo thread-pool run state info
+ * @return thread-pool run state info
*/
public abstract ThreadPoolRunStateInfo supplement(ThreadPoolRunStateInfo threadPoolRunStateInfo);
/**
* Get pool run state.
*
- * @param threadPoolId
- * @return
+ * @param threadPoolId thread-pool id
+ * @return thread-pool run state info
*/
public ThreadPoolRunStateInfo getPoolRunState(String threadPoolId) {
DynamicThreadPoolWrapper executorService = GlobalThreadPoolManage.getExecutorService(threadPoolId);
@@ -57,56 +57,34 @@ public abstract class AbstractThreadPoolRuntime {
/**
* Get pool run state.
*
- * @param threadPoolId
- * @param executor
- * @return
+ * @param threadPoolId thread-pool id
+ * @param executor executor
+ * @return thread-pool run state info
*/
public ThreadPoolRunStateInfo getPoolRunState(String threadPoolId, Executor executor) {
- ThreadPoolRunStateInfo stateInfo = new ThreadPoolRunStateInfo();
- ThreadPoolExecutor pool = (ThreadPoolExecutor) executor;
- // 核心线程数
- int corePoolSize = pool.getCorePoolSize();
- // 最大线程数
- int maximumPoolSize = pool.getMaximumPoolSize();
- // 线程池当前线程数 (有锁)
- int poolSize = pool.getPoolSize();
- // 活跃线程数 (有锁)
- int activeCount = pool.getActiveCount();
- // 同时进入池中的最大线程数 (有锁)
- int largestPoolSize = pool.getLargestPoolSize();
- // 线程池中执行任务总数量 (有锁)
- long completedTaskCount = pool.getCompletedTaskCount();
- // 当前负载
- String currentLoad = CalculateUtil.divide(activeCount, maximumPoolSize) + "";
- // 峰值负载
- String peakLoad = CalculateUtil.divide(largestPoolSize, maximumPoolSize) + "";
- BlockingQueue
* {@code corePoolSize < 0}
* {@code keepAliveTime < 0}
@@ -80,27 +80,26 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
* or {@code threadFactory} or {@code handler} is null
*/
public DynamicThreadPoolExecutor(
- int corePoolSize, int maximumPoolSize,
- long keepAliveTime, TimeUnit unit,
- long executeTimeOut, boolean waitForTasksToCompleteOnShutdown, long awaitTerminationMillis,
- @NonNull BlockingQueue
* {@code corePoolSize < 0}
* {@code keepAliveTime < 0}
@@ -89,20 +88,18 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements
* or {@code threadFactory} or {@code handler} is null
*/
public ExtensibleThreadPoolExecutor(
- @NonNull String threadPoolId,
- @NonNull ThreadPoolPluginManager threadPoolPluginManager,
- int corePoolSize, int maximumPoolSize,
- long keepAliveTime, TimeUnit unit,
- @NonNull BlockingQueue
+ * {@link io.netty.handler.codec.serialization.ObjectEncoder}
+ */
+public class NettyEncoder extends MessageToByteEncoder
+ * Server: Receives parameters and performs pre-call operations
+ *
+ * @param request request
+ * @return Whether to continue the execution. If it is a client, the returned value does not affect subsequent execution
+ */
+ default boolean preHandler(Request request) {
+ return true;
+ }
+
+ /**
+ * Client: Action after receiving a response
+ * Server: performs the operation after the call
+ *
+ * @param request request
+ * @param response response
+ */
+ default void postHandler(Request request, Response response) {
+ // NO SOMETHING
+ }
+
+ /**
+ * Called when an exception or resource is cleaned
+ *
+ * @param request request
+ * @param response response
+ * @param e Exception
+ */
+ default void afterCompletion(Request request, Response response, Exception e) {
+ // NO SOMETHING
+ }
+}
diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/request/DefaultRequest.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/request/DefaultRequest.java
new file mode 100644
index 00000000..6b82a102
--- /dev/null
+++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/request/DefaultRequest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.config.rpc.request;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Objects;
+
+/**
+ * default request
+ * Use the fully qualified name key of the interface and override equals and hashCode
+ */
+public final class DefaultRequest implements Request {
+
+ String key;
+ String className;
+ String methodName;
+ Class>[] parameterTypes;
+ transient Object[] parameters;
+
+ public DefaultRequest(String key, String className, String methodName, Class>[] parameterTypes, Object[] parameters) {
+ this.key = key;
+ this.className = className;
+ this.methodName = methodName;
+ this.parameterTypes = parameterTypes;
+ this.parameters = parameters;
+ }
+
+ @Override
+ public String getKey() {
+ return key;
+ }
+
+ @Override
+ public String getClassName() {
+ return className;
+ }
+
+ @Override
+ public String getMethodName() {
+ return methodName;
+ }
+
+ @Override
+ public Class>[] getParameterTypes() {
+ return parameterTypes;
+ }
+
+ @Override
+ public Object[] getParameters() {
+ return parameters;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ DefaultRequest that = (DefaultRequest) o;
+ return Objects.equals(key, that.key)
+ && Objects.equals(className, that.className)
+ && Objects.equals(methodName, that.methodName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key, className, methodName);
+ }
+
+ /**
+ * Redefine the behavior of serialization, that is, re-acquire the initially serialized
+ * data from the stream and re-serialize it. Simple serialization will result in the
+ * loss of the field identified by transient.
+ */
+ private void writeObject(ObjectOutputStream s) throws IOException {
+ s.defaultWriteObject();
+ if (parameters == null) {
+ return;
+ }
+ // 序列化属性 parameters
+ for (Object parameter : parameters) {
+ s.writeObject(parameter);
+ }
+ }
+
+ /**
+ * Redefine the deserialization behavior, and sequentially deserialize the data specified during
+ * serialization, because there is data that is not deserialized during initial deserialization,
+ * such as fields defined by transient
+ */
+ private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
+ s.defaultReadObject();
+ if (parameterTypes == null) {
+ return;
+ }
+ // 反序列化属性 parameters
+ int length = parameterTypes.length;
+ Object[] a = new Object[length];
+ for (int i = 0; i < length; i++) {
+ a[i] = s.readObject();
+ }
+ this.parameters = a;
+ }
+}
diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/request/Request.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/request/Request.java
new file mode 100644
index 00000000..a045fbbf
--- /dev/null
+++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/request/Request.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.config.rpc.request;
+
+import java.io.Serializable;
+
+/**
+ * request
+ */
+public interface Request extends Serializable {
+
+ /**
+ * The unique identity of the current request
+ */
+ String getKey();
+
+ /**
+ * The Class name of the current request
+ */
+ String getClassName();
+
+ /**
+ * The Method name of the current request
+ */
+ String getMethodName();
+
+ /**
+ * The parameter type of the current request
+ */
+ Class>[] getParameterTypes();
+
+ /**
+ * The parameters of the current request
+ */
+ Object[] getParameters();
+
+}
diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/response/DefaultResponse.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/response/DefaultResponse.java
new file mode 100644
index 00000000..408d299f
--- /dev/null
+++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/response/DefaultResponse.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.config.rpc.response;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Objects;
+
+/**
+ * default request
+ * Use the fully qualified name key of the interface and override equals and hashCode
+ */
+public class DefaultResponse implements Response {
+
+ String key;
+ Class> cls;
+ transient Object obj;
+ Throwable throwable;
+ String errMsg;
+
+ public DefaultResponse(String key, Class> cls, Object obj, Throwable throwable, String errMsg) {
+ this.key = key;
+ this.cls = cls;
+ this.obj = obj;
+ this.throwable = throwable;
+ this.errMsg = errMsg;
+ }
+
+ public DefaultResponse(String key, Throwable throwable, String errMsg) {
+ this(key, null, null, throwable, errMsg);
+ }
+
+ public DefaultResponse(String key, Class> cls, Object obj) {
+ this(key, cls, obj, null, null);
+ }
+
+ @Override
+ public String getKey() {
+ return key;
+ }
+
+ @Override
+ public Class> getCls() {
+ return cls;
+ }
+
+ @Override
+ public Object getObj() {
+ return obj;
+ }
+
+ @Override
+ public Throwable getThrowable() {
+ return throwable;
+ }
+
+ @Override
+ public String getErrMsg() {
+ return errMsg;
+ }
+
+ @Override
+ public boolean isErr() {
+ return throwable != null || errMsg != null;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ DefaultResponse that = (DefaultResponse) o;
+ return Objects.equals(key, that.key) && Objects.equals(cls, that.cls);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key, cls);
+ }
+
+ /**
+ * Redefine the behavior of serialization, that is, re-acquire the initially serialized
+ * data from the stream and re-serialize it. Simple serialization will result in the
+ * loss of the field identified by transient.
+ */
+ private void writeObject(ObjectOutputStream s) throws IOException {
+ s.defaultWriteObject();
+ if (obj == null) {
+ return;
+ }
+ // 序列化属性 obj
+ s.writeObject(this.obj);
+ }
+
+ /**
+ * Redefine the deserialization behavior, and sequentially deserialize the data specified during
+ * serialization, because there is data that is not deserialized during initial deserialization,
+ * such as fields defined by transient
+ */
+ private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
+ s.defaultReadObject();
+ // 反序列化属性 obj
+ this.obj = s.readObject();
+ }
+}
diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/response/Response.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/response/Response.java
new file mode 100644
index 00000000..3c06fbaa
--- /dev/null
+++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/response/Response.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.config.rpc.response;
+
+import java.io.Serializable;
+
+/**
+ * Response
+ */
+public interface Response extends Serializable {
+
+ /**
+ * The unique identity of the current Response
+ */
+ String getKey();
+
+ /**
+ * The class of the current Response, The target of deserialization
+ */
+ Class> getCls();
+
+ /**
+ * The results of this request can be obtained, The source of deserialization
+ */
+ Object getObj();
+
+ /**
+ * The Throwable of the current Response
+ */
+ Throwable getThrowable();
+
+ /**
+ * the error message
+ */
+ String getErrMsg();
+
+ /**
+ * Whether the current request has an error
+ */
+ boolean isErr();
+
+}
diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/server/NettyServerConnection.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/server/NettyServerConnection.java
new file mode 100644
index 00000000..cb240442
--- /dev/null
+++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/server/NettyServerConnection.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.config.rpc.server;
+
+import cn.hippo4j.config.rpc.coder.NettyDecoder;
+import cn.hippo4j.config.rpc.coder.NettyEncoder;
+import cn.hippo4j.config.rpc.handler.NettyServerTakeHandler;
+import cn.hippo4j.config.rpc.process.ActivePostProcess;
+import cn.hippo4j.config.rpc.support.Instance;
+import cn.hippo4j.common.toolkit.Assert;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.*;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.serialization.ClassResolvers;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * adapter to the netty server
+ */
+@Slf4j
+public class NettyServerConnection implements ServerConnection {
+
+ Integer port;
+ EventLoopGroup leader;
+ EventLoopGroup worker;
+ Class extends ServerChannel> socketChannelCls = NioServerSocketChannel.class;
+ List
+ * If the port being processed is already bound, an exception is thrown
+ */
+ void bind();
+
+}
diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/server/ServerConnection.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/server/ServerConnection.java
new file mode 100644
index 00000000..1e1d8a4b
--- /dev/null
+++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/server/ServerConnection.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.config.rpc.server;
+
+import cn.hippo4j.config.rpc.handler.Connection;
+
+/**
+ * This applies to server-side connections
+ */
+public interface ServerConnection extends Connection {
+
+ /**
+ * Bind ports and process them
+ */
+ void bind(int port);
+
+}
diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/ClassRegistry.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/ClassRegistry.java
new file mode 100644
index 00000000..4b938550
--- /dev/null
+++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/ClassRegistry.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.config.rpc.support;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * the registration center for Client and Server
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class ClassRegistry {
+
+ private static final Map
+ *
+ * @param s key
+ * @return t element
+ */
+ public static Class> get(String s) {
+ return serverRegister.get(s);
+ }
+
+ /**
+ * add the element to Registry Table
+ * if the key already exists, failure, and return before the value of the key.
+ * if success return the element
+ *
+ * @param s key
+ * @param cls element
+ * @return final mapped value
+ */
+ public static Class> set(String s, Class> cls) {
+ return serverRegister.putIfAbsent(s, cls);
+ }
+
+ /**
+ * add the element to Registry Table
+ * if the key already exists, failure, replace it
+ *
+ * @param s key
+ * @param cls element
+ */
+ public static Class> put(String s, Class> cls) {
+ return serverRegister.put(s, cls);
+ }
+
+ /**
+ * clear
+ */
+ public static void clear() {
+ serverRegister.clear();
+ }
+}
diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/DefaultInstance.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/DefaultInstance.java
new file mode 100644
index 00000000..1c5ec559
--- /dev/null
+++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/DefaultInstance.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.config.rpc.support;
+
+import cn.hippo4j.common.toolkit.ReflectUtil;
+import cn.hippo4j.common.web.exception.IllegalException;
+
+/**
+ * Simply creating an instance of a class by its name and its specific type,
+ * and then throwing an exception if it is an interface, is not elegant
+ */
+public class DefaultInstance implements Instance {
+
+ @Override
+ public Object getInstance(Class> cls) {
+ return ReflectUtil.createInstance(cls);
+ }
+
+ @Override
+ public Object getInstance(String name) {
+ try {
+ Class> cls = Class.forName(name);
+ return getInstance(cls);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalException(e);
+ }
+ }
+}
diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/Instance.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/Instance.java
new file mode 100644
index 00000000..e1b7f33a
--- /dev/null
+++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/Instance.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.config.rpc.support;
+
+/**
+ * Instance interface to get an instance
+ */
+public interface Instance {
+
+ /**
+ * get a instance
+ *
+ * @param cls Class object
+ * @return Information about instances created or found
+ */
+ Object getInstance(Class> cls);
+
+ /**
+ * Gets an instance of a class with a recognizable identity,
+ * which can be the fully qualified name of class. It can also be a unique name in a container
+ *
+ * @param name Identifying name
+ * @return Information about instances created or found
+ */
+ Object getInstance(String name);
+
+}
diff --git a/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/NettyConnectPool.java b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/NettyConnectPool.java
new file mode 100644
index 00000000..4268ba4c
--- /dev/null
+++ b/hippo4j-server/hippo4j-config/src/main/java/cn/hippo4j/config/rpc/support/NettyConnectPool.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.config.rpc.support;
+
+import cn.hippo4j.config.rpc.exception.ConnectionException;
+import cn.hippo4j.config.rpc.handler.NettyClientPoolHandler;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.pool.ChannelHealthChecker;
+import io.netty.channel.pool.ChannelPool;
+import io.netty.channel.pool.ChannelPoolHandler;
+import io.netty.channel.pool.FixedChannelPool;
+import io.netty.util.concurrent.Future;
+import lombok.extern.slf4j.Slf4j;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This parameter applies only to the connection pool of netty
+ */
+@Slf4j
+public class NettyConnectPool {
+
+ ChannelHealthChecker healthCheck = ChannelHealthChecker.ACTIVE;
+ FixedChannelPool.AcquireTimeoutAction acquireTimeoutAction = FixedChannelPool.AcquireTimeoutAction.NEW;
+ int maxPendingAcquires = Integer.MAX_VALUE;
+ ChannelPoolHandler handler = new NettyClientPoolHandler();
+ ChannelPool pool;
+ String host;
+ int port;
+
+ public NettyConnectPool(String host, int port, int maxConnect,
+ long timeout, EventLoopGroup worker,
+ Class extends Channel> socketChannelCls) {
+ InetSocketAddress socketAddress = InetSocketAddress.createUnresolved(host, port);
+ Bootstrap bootstrap = new Bootstrap()
+ .group(worker)
+ .channel(socketChannelCls)
+ .remoteAddress(socketAddress);
+ this.host = host;
+ this.port = port;
+ this.pool = new FixedChannelPool(bootstrap, handler, healthCheck, acquireTimeoutAction,
+ timeout, maxConnect, maxPendingAcquires, true, true);
+ log.info("The connection pool is established with the connection target {}:{}", host, port);
+ NettyConnectPoolHolder.createPool(host, port, this);
+ }
+
+ public Channel acquire(long timeoutMillis) {
+ try {
+ Future
+ * The unique remote call can be determined by the key of request and
+ * response, and the result of the call is stored in the secondary cache,
+ * which is convenient for the client to use at any time.
+ */
+@Slf4j
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class ResultHolder {
+
+ private static final Map
+ * After the result is obtained, the corresponding key is cleared from the cache
+ * So it's only true when you first get the result
+ *
+ * @param key Request and response keys
+ * @return Response body
+ */
+ @SuppressWarnings("unchecked")
+ public static