diff --git a/.codecov.yml b/.codecov.yml index 74099668..8cb2470c 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -20,9 +20,11 @@ coverage: project: default: target: auto - # adjust accordingly based on how flaky your tests are - # this allows a 10% drop from the previous base commit coverage - threshold: 10% + threshold: 0% + informational: true + patch: + default: + informational: true ignore: - "hippo4j-example/.*" - "docs/.*" diff --git a/.github/workflows/reademe-contributors.yml b/.github/workflows/reademe-contributors.yml index 34583d53..34476f4b 100644 --- a/.github/workflows/reademe-contributors.yml +++ b/.github/workflows/reademe-contributors.yml @@ -21,7 +21,7 @@ on: push: branches: - - main + - develop jobs: contrib-readme-job: diff --git a/README.md b/README.md index 01ba603e..e75f43eb 100644 --- a/README.md +++ b/README.md @@ -72,10 +72,10 @@ Hippo-4J 通过对 JDK 线程池增强,以及扩展三方框架底层线程池
- | 姓名 | -GitHub ID | -博客地址 | -联系方式 | -
马称 | -magegoofy | -小马哥的技术专栏 | -machen@apache.org | -|
陆宽 | -shining-stars-lk | -宽仔的代码之路 | -1031900093@qq.com | -|
王杰 | -iwangjie | -- | -wangchenmo1025@gmail.com | -|
魏虎 | -weihubeats | -weihubeats | -weihubeats@163.com | -|
李剑鑫 | -BigXin0109 | -Only丶Big | -1064730540@qq.com | -|
刘文浩 | -pizihao | -pizihao | -hao3073liu@163.com | -|
叶炜 | -shanjianq | -- | -17855368071@163.com | -|
黄成兴 | -Createsequence | -Createsequence's Blog | -841396397@qq.com | -
- | 姓名 | -GitHub ID | -博客地址 | -联系方式 | -
马称 | -magegoofy | -小马哥的技术专栏 | -machen@apache.org | -|
陆宽 | -shining-stars-lk | -宽仔的代码之路 | -1031900093@qq.com | -|
王杰 | -iwangjie | -- | -wangchenmo1025@gmail.com | -|
魏虎 | -weihubeats | -weihubeats | -weihubeats@163.com | -|
李剑鑫 | -BigXin0109 | -Only丶Big | -1064730540@qq.com | -|
刘文浩 | -pizihao | -pizihao | -hao3073liu@163.com | -|
叶炜 | -shanjianq | -- | -17855368071@163.com | -|
黄成兴 | -Createsequence | -Createsequence's Blog | -841396397@qq.com | -
- | 姓名 | -GitHub ID | -博客地址 | -联系方式 | -
马称 | -magegoofy | -小马哥的技术专栏 | -machen@apache.org | -|
陆宽 | -shining-stars-lk | -宽仔的代码之路 | -1031900093@qq.com | -|
王杰 | -iwangjie | -- | -wangchenmo1025@gmail.com | -|
魏虎 | -weihubeats | -weihubeats | -weihubeats@163.com | -|
李剑鑫 | -BigXin0109 | -Only丶Big | -1064730540@qq.com | -|
刘文浩 | -pizihao | -pizihao | -hao3073liu@163.com | -|
叶炜 | -shanjianq | -- | -17855368071@163.com | -|
黄成兴 | -Createsequence | -Createsequence's Blog | -841396397@qq.com | -
+ | 姓名 | +GitHub ID | +博客地址 | +联系方式 | +
马称 | +mageeric | +小马哥的技术专栏 | +machen@apache.org | +|
陆宽 | +shining-stars-lk | +宽仔的代码之路 | +1031900093@qq.com | +|
王杰 | +iwangjie | +- | +wangchenmo1025@gmail.com | +|
魏虎 | +weihubeats | +weihubeats | +weihubeats@163.com | +|
李剑鑫 | +BigXin0109 | +Only丶Big | +1064730540@qq.com | +|
刘文浩 | +pizihao | +pizihao | +hao3073liu@163.com | +|
叶炜 | +shanjianq | +- | +17855368071@163.com | +|
黄成兴 | +Createsequence | +Createsequence's Blog | +841396397@qq.com | +
+ | 姓名 | +GitHub ID | +博客地址 | +联系方式 | +
马称 | +mageeric | +小马哥的技术专栏 | +machen@apache.org | +|
陆宽 | +shining-stars-lk | +宽仔的代码之路 | +1031900093@qq.com | +|
王杰 | +iwangjie | +- | +wangchenmo1025@gmail.com | +|
魏虎 | +weihubeats | +weihubeats | +weihubeats@163.com | +|
李剑鑫 | +BigXin0109 | +Only丶Big | +1064730540@qq.com | +|
刘文浩 | +pizihao | +pizihao | +hao3073liu@163.com | +|
叶炜 | +shanjianq | +- | +17855368071@163.com | +|
黄成兴 | +Createsequence | +Createsequence's Blog | +841396397@qq.com | +
- | 姓名 | -GitHub ID | -博客地址 | -联系方式 | -
马称 | -magegoofy | -小马哥的技术专栏 | -machen@apache.org | -|
陆宽 | -shining-stars-lk | -宽仔的代码之路 | -1031900093@qq.com | -|
王杰 | -iwangjie | -- | -wangchenmo1025@gmail.com | -|
魏虎 | -weihubeats | -weihubeats | -weihubeats@163.com | -|
李剑鑫 | -BigXin0109 | -Only丶Big | -1064730540@qq.com | -|
刘文浩 | -pizihao | -pizihao | -hao3073liu@163.com | -|
叶炜 | -shanjianq | -- | -17855368071@163.com | -|
黄成兴 | -Createsequence | -Createsequence's Blog | -841396397@qq.com | -
Dynamic thread pool check and send logic wait for refactoring,
+ * Try not to rely on this component for custom extensions, because it is undefined.
+ */
+public interface ThreadPoolCheckAlarm extends CommandLineRunner {
+
+ /**
+ * Check pool capacity alarm.
+ *
+ * @param threadPoolId thread-pool id
+ * @param threadPoolExecutor thread-pool executor
+ */
+ void checkPoolCapacityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor);
+
+ /**
+ * Check pool activity alarm.
+ *
+ * @param threadPoolId thread-pool id
+ * @param threadPoolExecutor thread-pool executor
+ */
+ void checkPoolActivityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor);
+
+ /**
+ * Async send rejected alarm.
+ *
+ * @param threadPoolId thread-pool id
+ */
+ void asyncSendRejectedAlarm(String threadPoolId);
+
+ /**
+ * Async send execute time-out alarm.
+ *
+ * @param threadPoolId thread-pool id
+ * @param executeTime execute time
+ * @param executeTimeOut execute time-out
+ * @param threadPoolExecutor thread-pool executor
+ */
+ void asyncSendExecuteTimeOutAlarm(String threadPoolId, long executeTime, long executeTimeOut, ThreadPoolExecutor threadPoolExecutor);
+}
diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/api/ThreadPoolConfigChange.java b/hippo4j-common/src/main/java/cn/hippo4j/common/api/ThreadPoolConfigChange.java
new file mode 100644
index 00000000..8086240e
--- /dev/null
+++ b/hippo4j-common/src/main/java/cn/hippo4j/common/api/ThreadPoolConfigChange.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.common.api;
+
+/**
+ * Thread-pool config change.
+ */
+public interface ThreadPoolConfigChange If you use Server mode, you can view the thread pool operation in the built-in console.
+ * If you use Config mode, you can observe with Prometheus and Grafana.
+ *
+ * The annotation is normally marked on the
+ * spring bean defined by {@link java.util.concurrent.ThreadPoolExecutor}.
+ *
+ * Can also be marked on the following types:
+ *
+ * @see java.util.concurrent.Executor
+ * @see java.util.concurrent.ExecutorService
+ * @see org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
+ * @see com.alibaba.ttl.threadpool.ExecutorTtlWrapper
+ * @see com.alibaba.ttl.threadpool.ExecutorServiceTtlWrapper
+ * @since 1.0
*/
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java
index 71cc5bed..6790a374 100644
--- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java
@@ -45,7 +45,7 @@ import java.util.concurrent.atomic.AtomicLong;
public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor implements DisposableBean {
/**
- * wait for tasks to complete on shutdown
+ * Wait for tasks to complete on shutdown
*/
@Getter
@Setter
@@ -54,23 +54,23 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
/**
* Creates a new {@code DynamicThreadPoolExecutor} with the given initial parameters.
*
- * @param threadPoolId thread-pool id
- * @param executeTimeOut execute time out
+ * @param threadPoolId thread-pool id
+ * @param executeTimeOut execute time out
* @param waitForTasksToCompleteOnShutdown wait for tasks to complete on shutdown
- * @param awaitTerminationMillis await termination millis
- * @param corePoolSize the number of threads to keep in the pool, even
- * if they are idle, unless {@code allowCoreThreadTimeOut} is set
- * @param maximumPoolSize the maximum number of threads to allow in the
- * pool
- * @param keepAliveTime when the number of threads is greater than
- * the core, this is the maximum time that excess idle threads
- * will wait for new tasks before terminating.
- * @param unit the time unit for the {@code keepAliveTime} argument
- * @param blockingQueue the queue to use for holding tasks before they are
- * executed. This queue will hold only the {@code Runnable}
- * tasks submitted by the {@code execute} method.
- * @param threadFactory the factory to use when the executor creates a new thread
- * @param rejectedExecutionHandler the handler to use when execution is blocked because the thread bounds and queue capacities are reached
+ * @param awaitTerminationMillis await termination millis
+ * @param corePoolSize the number of threads to keep in the pool, even
+ * if they are idle, unless {@code allowCoreThreadTimeOut} is set
+ * @param maximumPoolSize the maximum number of threads to allow in the
+ * pool
+ * @param keepAliveTime when the number of threads is greater than
+ * the core, this is the maximum time that excess idle threads
+ * will wait for new tasks before terminating.
+ * @param unit the time unit for the {@code keepAliveTime} argument
+ * @param blockingQueue the queue to use for holding tasks before they are
+ * executed. This queue will hold only the {@code Runnable}
+ * tasks submitted by the {@code execute} method.
+ * @param threadFactory the factory to use when the executor creates a new thread
+ * @param rejectedExecutionHandler the handler to use when execution is blocked because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds: Before calling the parent class method, {@link ExecuteAwarePlugin#beforeExecute} will be called first.
*
- * @param thread the thread that will run task {@code r}
+ * @param thread the thread that will run task {@code r}
* @param runnable the task that will be executed
*/
@Override
@@ -145,9 +142,9 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements
*
* After calling the superclass method, {@link ExecuteAwarePlugin#afterExecute} will be called last.
*
- * @param runnable the runnable that has completed
+ * @param runnable the runnable that has completed
* @param throwable the exception that caused termination, or null if
- * execution completed normally
+ * execution completed normally
*/
@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {
@@ -191,7 +188,7 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements
}
/**
- * {@inheritDoc}.
+ * {@inheritDoc}
*
* Before calling the superclass method, {@link ShutdownAwarePlugin#afterTerminated} will be called first.
*/
@@ -291,22 +288,21 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements
private static class RejectedAwareHandlerWrapper implements RejectedExecutionHandler {
/**
- * thread-pool action aware registry
+ * Thread-pool action aware registry
*/
private final ThreadPoolPluginManager registry;
/**
- * original target
+ * Original target
*/
- @NonNull
@Setter
@Getter
private RejectedExecutionHandler handler;
/**
- * Call {@link RejectedAwarePlugin#beforeRejectedExecution}, then reject the task
+ * Call {@link RejectedAwarePlugin#beforeRejectedExecution}, then reject the task.
*
- * @param r the runnable task requested to be executed
+ * @param r the runnable task requested to be executed
* @param executor the executor attempting to execute this task
*/
@Override
@@ -315,6 +311,5 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements
rejectedAwarePluginList.forEach(aware -> aware.beforeRejectedExecution(r, executor));
handler.rejectedExecution(r, executor);
}
-
}
}
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/FastThreadPoolExecutor.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/FastThreadPoolExecutor.java
index 31835191..c4e4f569 100644
--- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/FastThreadPoolExecutor.java
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/FastThreadPoolExecutor.java
@@ -41,8 +41,16 @@ public class FastThreadPoolExecutor extends ThreadPoolExecutorTemplate {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
+ /**
+ * Statistics on the number of tasks submitted by the fast consumption thread pool
+ */
private final AtomicInteger submittedTaskCount = new AtomicInteger(0);
+ /**
+ * Get submitted task count.
+ *
+ * @return submitted task count
+ */
public int getSubmittedTaskCount() {
return submittedTaskCount.get();
}
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/TaskQueue.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/TaskQueue.java
index 3d16c439..9eeedcf9 100644
--- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/TaskQueue.java
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/TaskQueue.java
@@ -17,6 +17,8 @@
package cn.hippo4j.core.executor.support;
+import lombok.Setter;
+
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
@@ -28,16 +30,13 @@ public class TaskQueue
* com.google.common.base.Joiner
*/
-public class Joiner {
+public final class Joiner {
private final String separator;
@@ -45,11 +45,11 @@ public class Joiner {
* Returns a string containing the string representation of each of {@code parts}, using the
* previously configured separator between each.
*/
- public final String join(Object[] parts) {
+ public String join(Object[] parts) {
return join(Arrays.asList(parts));
}
- public final String join(Iterable> parts) {
+ public String join(Iterable> parts) {
return join(parts.iterator());
}
@@ -57,11 +57,11 @@ public class Joiner {
* Returns a string containing the string representation of each of {@code parts}, using the
* previously configured separator between each.
*/
- public final String join(Iterator> parts) {
+ public String join(Iterator> parts) {
return appendTo(new StringBuilder(), parts).toString();
}
- public final StringBuilder appendTo(StringBuilder builder, Iterator> parts) {
+ public StringBuilder appendTo(StringBuilder builder, Iterator> parts) {
try {
appendTo((Appendable) builder, parts);
} catch (IOException impossible) {
diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/toolkit/MemoryUtil.java b/hippo4j-common/src/main/java/cn/hippo4j/common/toolkit/MemoryUtil.java
index 3b62bab9..ad2b0678 100644
--- a/hippo4j-common/src/main/java/cn/hippo4j/common/toolkit/MemoryUtil.java
+++ b/hippo4j-common/src/main/java/cn/hippo4j/common/toolkit/MemoryUtil.java
@@ -26,7 +26,7 @@ import java.lang.management.MemoryUsage;
/**
* memory util
- * the obtained information is not invalid, after a long wait, obtain it again
+ * the obtained information is not real time effective, after a long wait, please get it again
*
* @author liuwenhao
*/
diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/toolkit/ReflectUtil.java b/hippo4j-common/src/main/java/cn/hippo4j/common/toolkit/ReflectUtil.java
index 9a4bfa86..d278ec55 100644
--- a/hippo4j-common/src/main/java/cn/hippo4j/common/toolkit/ReflectUtil.java
+++ b/hippo4j-common/src/main/java/cn/hippo4j/common/toolkit/ReflectUtil.java
@@ -250,4 +250,18 @@ public class ReflectUtil {
throw new IllegalException(e);
}
}
+
+ /**
+ * get instance
+ *
+ * @param cls the class
+ * @return new Instance
+ */
+ public static Object createInstance(Class> cls) {
+ try {
+ return cls.newInstance();
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new IllegalException(e);
+ }
+ }
}
diff --git a/hippo4j-core/pom.xml b/hippo4j-core/pom.xml
index e9e05f10..ae43fb91 100644
--- a/hippo4j-core/pom.xml
+++ b/hippo4j-core/pom.xml
@@ -10,34 +10,15 @@
+ *
* {@code corePoolSize < 0}
* {@code keepAliveTime < 0}
@@ -93,14 +93,13 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
blockingQueue, threadFactory, rejectedExecutionHandler);
log.info("Initializing ExecutorService {}", threadPoolId);
this.waitForTasksToCompleteOnShutdown = waitForTasksToCompleteOnShutdown;
- // init default plugins
+ // Init default plugins.
new DefaultThreadPoolPluginRegistrar(executeTimeOut, awaitTerminationMillis)
.doRegister(this);
}
/**
* Invoked by the containing {@code BeanFactory} on destruction of a bean.
- *
*/
@Override
public void destroy() {
@@ -128,7 +127,7 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
/**
* Set support param.
*
- * @param awaitTerminationMillis await termination millis
+ * @param awaitTerminationMillis await termination millis
* @param waitForTasksToCompleteOnShutdown wait for tasks to complete on shutdown
* @deprecated use {@link ThreadPoolExecutorShutdownPlugin}
*/
@@ -238,5 +237,4 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
public void setRedundancyHandler(RejectedExecutionHandler handler) {
setRejectedExecutionHandler(handler);
}
-
}
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutor.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutor.java
index 4bd96b16..87386a28 100644
--- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutor.java
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutor.java
@@ -43,43 +43,42 @@ import java.util.concurrent.*;
public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements ThreadPoolPluginSupport {
/**
- * thread pool id
+ * Thread pool id
*/
@Getter
- @NonNull
private final String threadPoolId;
/**
- * action aware registry
+ * Action aware registry
*/
@Getter
private final ThreadPoolPluginManager threadPoolPluginManager;
/**
- * handler wrapper, any changes to the current instance {@link RejectedExecutionHandler} should be made through this wrapper
+ * Handler wrapper, any changes to the current instance {@link RejectedExecutionHandler} should be made through this wrapper
*/
private final RejectedAwareHandlerWrapper handlerWrapper;
/**
* Creates a new {@code ExtensibleThreadPoolExecutor} with the given initial parameters.
*
- * @param threadPoolId thread-pool id
- * @param threadPoolPluginManager action aware registry
- * @param corePoolSize the number of threads to keep in the pool, even
- * if they are idle, unless {@code allowCoreThreadTimeOut} is set
- * @param maximumPoolSize the maximum number of threads to allow in the
- * pool
- * @param keepAliveTime when the number of threads is greater than
- * the core, this is the maximum time that excess idle threads
- * will wait for new tasks before terminating.
- * @param unit the time unit for the {@code keepAliveTime} argument
- * @param workQueue the queue to use for holding tasks before they are
- * executed. This queue will hold only the {@code Runnable}
- * tasks submitted by the {@code execute} method.
- * @param threadFactory the factory to use when the executor
- * creates a new thread
- * @param handler the handler to use when execution is blocked
- * because the thread bounds and queue capacities are reached
+ * @param threadPoolId thread-pool id
+ * @param threadPoolPluginManager action aware registry
+ * @param corePoolSize the number of threads to keep in the pool, even
+ * if they are idle, unless {@code allowCoreThreadTimeOut} is set
+ * @param maximumPoolSize the maximum number of threads to allow in the
+ * pool
+ * @param keepAliveTime when the number of threads is greater than
+ * the core, this is the maximum time that excess idle threads
+ * will wait for new tasks before terminating.
+ * @param unit the time unit for the {@code keepAliveTime} argument
+ * @param workQueue the queue to use for holding tasks before they are
+ * executed. This queue will hold only the {@code Runnable}
+ * tasks submitted by the {@code execute} method.
+ * @param threadFactory the factory to use when the executor
+ * creates a new thread
+ * @param handler the handler to use when execution is blocked
+ * because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:
* {@code corePoolSize < 0}
* {@code keepAliveTime < 0}
@@ -97,12 +96,10 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements
@NonNull ThreadFactory threadFactory,
@NonNull RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
-
- // pool extended info
+ // Pool extended info.
this.threadPoolId = threadPoolId;
this.threadPoolPluginManager = threadPoolPluginManager;
-
- // proxy handler to support Aware callback
+ // Proxy handler to support Aware callback.
while (handler instanceof RejectedAwareHandlerWrapper) {
handler = ((RejectedAwareHandlerWrapper) handler).getHandler();
}
@@ -115,7 +112,7 @@ public class ExtensibleThreadPoolExecutor extends ThreadPoolExecutor implements
*
*
+ * Client: Active connection indicates that a connection is being maintained with the server.
+ * Inactive connection indicates that no connection is being established with the server
+ *
+ * Server: The active connection indicates that the server has been started, is receiving ports,
+ * and can obtain requests at any time. The inactive connection indicates that the server has been
+ * shut down and the ports have been released
+ *
+ * @return Whether active
+ */
+ boolean isActive();
+
+}
diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/HandlerManager.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/HandlerManager.java
new file mode 100644
index 00000000..7d0f3d29
--- /dev/null
+++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/handler/HandlerManager.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.rpc.handler;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+/**
+ * Manage the Handler used in the processing.
+ * The Handler must be able to exist multiple times and be invoked once in a single execution
+ */
+public interface HandlerManager
+ * Server: Receives parameters and performs pre-call operations
+ *
+ * @param request request
+ * @return Whether to continue the execution. If it is a client, the returned value does not affect subsequent execution
+ */
+ default boolean preHandler(Request request) {
+ return true;
+ }
+
+ /**
+ * Client: Action after receiving a response
+ * Server: performs the operation after the call
+ *
+ * @param request request
+ * @param response response
+ */
+ default void postHandler(Request request, Response response) {
+ // NO SOMETHING
+ }
+
+ /**
+ * Called when an exception or resource is cleaned
+ *
+ * @param request request
+ * @param response response
+ * @param e Exception
+ */
+ default void afterCompletion(Request request, Response response, Exception e) {
+ // NO SOMETHING
+ }
+}
diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/process/ActiveProcessChain.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/process/ActiveProcessChain.java
new file mode 100644
index 00000000..693f83ce
--- /dev/null
+++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/process/ActiveProcessChain.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.rpc.process;
+
+import cn.hippo4j.rpc.request.Request;
+import cn.hippo4j.rpc.response.Response;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Processor chain for easier processing of processors in different scenarios
+ * reference resources: spring HandlerExecutionChain
+ *
+ * @see ActivePostProcess
+ */
+@Slf4j
+public final class ActiveProcessChain {
+
+ /**
+ * A collection of processors that will be applied to their assigned programs.
+ * Processors will perform different actions on different occasions for both the server and the client,
+ * but the execution period of that action must be the same
+ */
+ List
+ * that identifies where the {@link ActivePostProcess#preHandler(Request)} processing is performed
+ * This allows for the fact that some processors will add shutable operations to the class
+ * eg: {@link java.io.Closeable}, The {@link ActivePostProcess#afterCompletion(Request, Response, Exception)}
+ * operation is not performed after an exception if the preprocessor is not executed
+ */
+ int index = -1;
+
+ public ActiveProcessChain(List
+ * Use the fully qualified name key of the interface and override equals and hashCode
+ */
+public final class DefaultRequest implements Request {
+
+ String key;
+ String className;
+ String methodName;
+ Class>[] parameterTypes;
+ transient Object[] parameters;
+
+ public DefaultRequest(String key, String className, String methodName, Class>[] parameterTypes, Object[] parameters) {
+ this.key = key;
+ this.className = className;
+ this.methodName = methodName;
+ this.parameterTypes = parameterTypes;
+ this.parameters = parameters;
+ }
+
+ @Override
+ public String getKey() {
+ return key;
+ }
+
+ @Override
+ public String getClassName() {
+ return className;
+ }
+
+ @Override
+ public String getMethodName() {
+ return methodName;
+ }
+
+ @Override
+ public Class>[] getParameterTypes() {
+ return parameterTypes;
+ }
+
+ @Override
+ public Object[] getParameters() {
+ return parameters;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ DefaultRequest that = (DefaultRequest) o;
+ return Objects.equals(key, that.key)
+ && Objects.equals(className, that.className)
+ && Objects.equals(methodName, that.methodName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key, className, methodName);
+ }
+
+ /**
+ * Redefine the behavior of serialization, that is, re-acquire the initially serialized
+ * data from the stream and re-serialize it. Simple serialization will result in the
+ * loss of the field identified by transient.
+ */
+ private void writeObject(ObjectOutputStream s) throws IOException {
+ s.defaultWriteObject();
+ if (parameters == null) {
+ return;
+ }
+ // Serialization parameters
+ for (Object parameter : parameters) {
+ s.writeObject(parameter);
+ }
+ }
+
+ /**
+ * Redefine the deserialization behavior, and sequentially deserialize the data specified during
+ * serialization, because there is data that is not deserialized during initial deserialization,
+ * such as fields defined by transient
+ */
+ private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
+ s.defaultReadObject();
+ if (parameterTypes == null) {
+ return;
+ }
+ // Deserialization parameters
+ int length = parameterTypes.length;
+ Object[] a = new Object[length];
+ for (int i = 0; i < length; i++) {
+ a[i] = s.readObject();
+ }
+ this.parameters = a;
+ }
+}
diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/request/Request.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/request/Request.java
new file mode 100644
index 00000000..db68fe3d
--- /dev/null
+++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/request/Request.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.rpc.request;
+
+import java.io.Serializable;
+
+/**
+ * request
+ */
+public interface Request extends Serializable {
+
+ /**
+ * The unique identity of the current request
+ */
+ String getKey();
+
+ /**
+ * The Class name of the current request
+ */
+ String getClassName();
+
+ /**
+ * The Method name of the current request
+ */
+ String getMethodName();
+
+ /**
+ * The parameter type of the current request
+ */
+ Class>[] getParameterTypes();
+
+ /**
+ * The parameters of the current request
+ */
+ Object[] getParameters();
+
+}
diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/response/DefaultResponse.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/response/DefaultResponse.java
new file mode 100644
index 00000000..e2d38c4d
--- /dev/null
+++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/response/DefaultResponse.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.rpc.response;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Objects;
+
+/**
+ * default request
+ * Use the fully qualified name key of the interface and override equals and hashCode
+ */
+public class DefaultResponse implements Response {
+
+ String key;
+ Class> cls;
+ transient Object obj;
+ Throwable throwable;
+ String errMsg;
+
+ public DefaultResponse(String key, Class> cls, Object obj, Throwable throwable, String errMsg) {
+ this.key = key;
+ this.cls = cls;
+ this.obj = obj;
+ this.throwable = throwable;
+ this.errMsg = errMsg;
+ }
+
+ public DefaultResponse(String key, Throwable throwable, String errMsg) {
+ this(key, null, null, throwable, errMsg);
+ }
+
+ public DefaultResponse(String key, Class> cls, Object obj) {
+ this(key, cls, obj, null, null);
+ }
+
+ @Override
+ public String getKey() {
+ return key;
+ }
+
+ @Override
+ public Class> getCls() {
+ return cls;
+ }
+
+ @Override
+ public Object getObj() {
+ return obj;
+ }
+
+ @Override
+ public Throwable getThrowable() {
+ return throwable;
+ }
+
+ @Override
+ public String getErrMsg() {
+ return errMsg;
+ }
+
+ @Override
+ public boolean isErr() {
+ return throwable != null || errMsg != null;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+ DefaultResponse that = (DefaultResponse) o;
+ return Objects.equals(key, that.key) && Objects.equals(cls, that.cls);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(key, cls);
+ }
+
+ /**
+ * Redefine the behavior of serialization, that is, re-acquire the initially serialized
+ * data from the stream and re-serialize it. Simple serialization will result in the
+ * loss of the field identified by transient.
+ */
+ private void writeObject(ObjectOutputStream s) throws IOException {
+ s.defaultWriteObject();
+ if (obj == null) {
+ return;
+ }
+ // Serialization obj
+ s.writeObject(this.obj);
+ }
+
+ /**
+ * Redefine the deserialization behavior, and sequentially deserialize the data specified during
+ * serialization, because there is data that is not deserialized during initial deserialization,
+ * such as fields defined by transient
+ */
+ private void readObject(ObjectInputStream s) throws IOException, ClassNotFoundException {
+ s.defaultReadObject();
+ // Deserialization obj
+ this.obj = s.readObject();
+ }
+}
diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/response/Response.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/response/Response.java
new file mode 100644
index 00000000..cdb26e5b
--- /dev/null
+++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/response/Response.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.rpc.response;
+
+import java.io.Serializable;
+
+/**
+ * Response
+ */
+public interface Response extends Serializable {
+
+ /**
+ * The unique identity of the current Response
+ */
+ String getKey();
+
+ /**
+ * The class of the current Response, The target of deserialization
+ */
+ Class> getCls();
+
+ /**
+ * The results of this request can be obtained, The source of deserialization
+ */
+ Object getObj();
+
+ /**
+ * The Throwable of the current Response
+ */
+ Throwable getThrowable();
+
+ /**
+ * the error message
+ */
+ String getErrMsg();
+
+ /**
+ * Whether the current request has an error
+ */
+ boolean isErr();
+
+}
diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/NettyServerConnection.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/NettyServerConnection.java
new file mode 100644
index 00000000..671e0748
--- /dev/null
+++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/NettyServerConnection.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.rpc.server;
+
+import cn.hippo4j.common.toolkit.Assert;
+import cn.hippo4j.rpc.coder.NettyDecoder;
+import cn.hippo4j.rpc.coder.NettyEncoder;
+import cn.hippo4j.rpc.handler.NettyHandlerManager;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.*;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.serialization.ClassResolvers;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * adapter to the netty server
+ */
+@Slf4j
+public class NettyServerConnection extends NettyHandlerManager implements ServerConnection {
+
+ Integer port;
+ EventLoopGroup leader;
+ EventLoopGroup worker;
+ Class extends ServerChannel> socketChannelCls = NioServerSocketChannel.class;
+ ChannelFuture future;
+ Channel channel;
+
+ public NettyServerConnection(EventLoopGroup leader, EventLoopGroup worker, List
+ * If the port being processed is already bound, an exception is thrown
+ */
+ void bind();
+
+ /**
+ * Check whether the server is active
+ *
+ * @return Whether active
+ */
+ boolean isActive();
+
+}
diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/ServerConnection.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/ServerConnection.java
new file mode 100644
index 00000000..fcb5a9e1
--- /dev/null
+++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/server/ServerConnection.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.rpc.server;
+
+import cn.hippo4j.rpc.handler.Connection;
+
+/**
+ * This applies to server-side connections
+ */
+public interface ServerConnection extends Connection {
+
+ /**
+ * Bind ports and process them
+ */
+ void bind(int port);
+
+}
diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/ClassRegistry.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/ClassRegistry.java
new file mode 100644
index 00000000..ebcc86f6
--- /dev/null
+++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/ClassRegistry.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.rpc.support;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * the registration center for Client and Server
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class ClassRegistry {
+
+ private static final Map
+ *
+ * @param s key
+ * @return t element
+ */
+ public static Class> get(String s) {
+ return serverRegister.get(s);
+ }
+
+ /**
+ * add the element to Registry Table
+ * if the key already exists, failure, and return before the value of the key.
+ * if success return the element
+ *
+ * @param s key
+ * @param cls element
+ * @return final mapped value
+ */
+ public static Class> set(String s, Class> cls) {
+ return serverRegister.putIfAbsent(s, cls);
+ }
+
+ /**
+ * add the element to Registry Table
+ * if the key already exists, failure, replace it
+ *
+ * @param s key
+ * @param cls element
+ */
+ public static Class> put(String s, Class> cls) {
+ return serverRegister.put(s, cls);
+ }
+
+ /**
+ * clear
+ */
+ public static void clear() {
+ serverRegister.clear();
+ }
+}
diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/DefaultInstance.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/DefaultInstance.java
new file mode 100644
index 00000000..c6cf9a6c
--- /dev/null
+++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/DefaultInstance.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.rpc.support;
+
+import cn.hippo4j.common.toolkit.ReflectUtil;
+import cn.hippo4j.common.web.exception.IllegalException;
+
+/**
+ * Simply creating an instance of a class by its name and its specific type,
+ * and then throwing an exception if it is an interface, is not elegant
+ */
+public class DefaultInstance implements Instance {
+
+ @Override
+ public Object getInstance(Class> cls) {
+ return ReflectUtil.createInstance(cls);
+ }
+
+ @Override
+ public Object getInstance(String name) {
+ try {
+ Class> cls = Class.forName(name);
+ return getInstance(cls);
+ } catch (ClassNotFoundException e) {
+ throw new IllegalException(e);
+ }
+ }
+}
diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/Instance.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/Instance.java
new file mode 100644
index 00000000..840dff3a
--- /dev/null
+++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/Instance.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.rpc.support;
+
+/**
+ * Instance interface to get an instance
+ */
+public interface Instance {
+
+ /**
+ * get a instance
+ *
+ * @param cls Class object
+ * @return Information about instances created or found
+ */
+ Object getInstance(Class> cls);
+
+ /**
+ * Gets an instance of a class with a recognizable identity,
+ * which can be the fully qualified name of class. It can also be a unique name in a container
+ *
+ * @param name Identifying name
+ * @return Information about instances created or found
+ */
+ Object getInstance(String name);
+
+}
diff --git a/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPool.java b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPool.java
new file mode 100644
index 00000000..a34159e9
--- /dev/null
+++ b/hippo4j-rpc/src/main/java/cn/hippo4j/rpc/support/NettyConnectPool.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.rpc.support;
+
+import cn.hippo4j.rpc.exception.ConnectionException;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.pool.ChannelHealthChecker;
+import io.netty.channel.pool.ChannelPool;
+import io.netty.channel.pool.ChannelPoolHandler;
+import io.netty.channel.pool.FixedChannelPool;
+import io.netty.util.concurrent.Future;
+import lombok.extern.slf4j.Slf4j;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This parameter applies only to the connection pool of netty
+ */
+@Slf4j
+public class NettyConnectPool {
+
+ ChannelHealthChecker healthCheck = ChannelHealthChecker.ACTIVE;
+ FixedChannelPool.AcquireTimeoutAction acquireTimeoutAction = FixedChannelPool.AcquireTimeoutAction.NEW;
+ int maxPendingAcquires = Integer.MAX_VALUE;
+ ChannelPoolHandler handler;
+ ChannelPool pool;
+ String host;
+ int port;
+
+ public NettyConnectPool(String host, int port, int maxConnect,
+ long timeout, EventLoopGroup worker,
+ Class extends Channel> socketChannelCls,
+ ChannelPoolHandler handler) {
+ InetSocketAddress socketAddress = InetSocketAddress.createUnresolved(host, port);
+ Bootstrap bootstrap = new Bootstrap()
+ .group(worker)
+ .channel(socketChannelCls)
+ .remoteAddress(socketAddress);
+ this.host = host;
+ this.port = port;
+ this.handler = handler;
+ this.pool = new FixedChannelPool(bootstrap, handler, healthCheck, acquireTimeoutAction,
+ timeout, maxConnect, maxPendingAcquires, true, true);
+ log.info("The connection pool is established with the connection target {}:{}", host, port);
+ NettyConnectPoolHolder.createPool(host, port, this);
+ }
+
+ public Channel acquire(long timeoutMillis) {
+ try {
+ Future
+ * The unique remote call can be determined by the key of request and
+ * response, and the result of the call is stored in the secondary cache,
+ * which is convenient for the client to use at any time.
+ */
+@Slf4j
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class ResultHolder {
+
+ private static final Map
+ * After the result is obtained, the corresponding key is cleared from the cache
+ * So it's only true when you first get the result
+ *
+ * @param key Request and response keys
+ * @return Response body
+ */
+ @SuppressWarnings("unchecked")
+ public static
The extension implementation of {@link GlobalThreadPoolPluginManager} and {@link BeanPostProcessor}, + * used to register {@link ThreadPoolPlugin} for the bean initialization stage of the {@link ThreadPoolPluginSupport}. + * + *
NOTE: + * If the {@link ThreadPoolPlugin}, {@link ThreadPoolPluginRegistrar}, and {@link ThreadPoolPluginSupport} is set to lazy load, + * The processor will not perceive the bean unless the user actively triggers the initialization of the bean. + * + * @see ThreadPoolPluginSupport + * @see ThreadPoolPluginRegistrar + * @see ThreadPoolPlugin + * @see GlobalThreadPoolPluginManager + * @see DefaultGlobalThreadPoolPluginManager + */ +@Slf4j +public class ThreadPoolPluginRegisterPostProcessor extends DefaultGlobalThreadPoolPluginManager implements BeanPostProcessor, ApplicationContextAware { + + /** + * application context + */ + private ConfigurableListableBeanFactory beanFactory; + + /** + *
Post process bean, if bean is instance of {@link ThreadPoolPlugin}, + * {@link ThreadPoolPluginRegistrar} or {@link ThreadPoolPluginSupport}, + * then take beans as an available component and register to {@link GlobalThreadPoolPluginManager}. + * + * @param bean the new bean instance + * @param beanName the name of the bean + * @return the bean instance to use, either the original or a wrapped one; + * if {@code null}, no subsequent BeanPostProcessors will be invoked + * @throws BeansException in case of errors + * @see GlobalThreadPoolPluginManager#enableThreadPoolPlugin + * @see GlobalThreadPoolPluginManager#enableThreadPoolPluginRegistrar + * @see GlobalThreadPoolPluginManager#registerThreadPoolPluginSupport + */ + @Override + public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { + Class> beanType = null; + try { + beanType = AutoProxyUtils.determineTargetClass(beanFactory, beanName); + } catch (NoSuchBeanDefinitionException ex) { + if (log.isDebugEnabled()) { + log.debug("Could not resolve target class for bean with name '" + beanName + "'", ex); + } + } + if (Objects.isNull(beanType)) { + log.warn("cannot resolve type for bean [{}]", beanName); + return bean; + } + + // register bean if necessary + registerThreadPoolPluginRegistrarIfNecessary(bean, beanType); + registerThreadPoolPluginIfNecessary(bean, beanType); + registerThreadPoolPluginSupportIfNecessary(bean, beanType); + return bean; + } + + private void registerThreadPoolPluginSupportIfNecessary(Object bean, Class> beanType) { + if (ThreadPoolPluginSupport.class.isAssignableFrom(beanType)) { + ThreadPoolPluginSupport support = (ThreadPoolPluginSupport) bean; + if (registerThreadPoolPluginSupport(support) && log.isDebugEnabled()) { + log.info("register ThreadPoolPluginSupport [{}]", support.getThreadPoolId()); + } + } + } + + private void registerThreadPoolPluginIfNecessary(Object bean, Class> beanType) { + if (ThreadPoolPlugin.class.isAssignableFrom(beanType)) { + ThreadPoolPlugin plugin = (ThreadPoolPlugin) bean; + if (enableThreadPoolPlugin(plugin) && log.isDebugEnabled()) { + log.info("register ThreadPoolPlugin [{}]", plugin.getId()); + } + } + } + + private void registerThreadPoolPluginRegistrarIfNecessary(Object bean, Class> beanType) { + if (ThreadPoolPluginRegistrar.class.isAssignableFrom(beanType)) { + ThreadPoolPluginRegistrar registrar = (ThreadPoolPluginRegistrar) bean; + if (enableThreadPoolPluginRegistrar(registrar) && log.isDebugEnabled()) { + log.info("register ThreadPoolPluginRegistrar [{}]", registrar.getId()); + } + } + } + + /** + * Set the ApplicationContext that this object runs in. + * Normally this call will be used to initialize the object. + *
Invoked after population of normal bean properties but before an init callback such
+ * as {@link InitializingBean#afterPropertiesSet()}
+ * or a custom init-method. Invoked after {@link ResourceLoaderAware#setResourceLoader},
+ * {@link ApplicationEventPublisherAware#setApplicationEventPublisher} and
+ * {@link MessageSourceAware}, if applicable.
+ *
+ * @param applicationContext the ApplicationContext object to be used by this object
+ * @throws ApplicationContextException in case of context initialization errors
+ * @throws BeansException if thrown by application context methods
+ * @see BeanInitializationException
+ */
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ AutowireCapableBeanFactory factory = applicationContext.getAutowireCapableBeanFactory();
+ Assert.isTrue(
+ factory instanceof ConfigurableListableBeanFactory,
+ "factory cannot cast to ConfigurableListableBeanFactory");
+ this.beanFactory = (ConfigurableListableBeanFactory) factory;
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index 47f2fc35..7b1bca48 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,12 +22,7 @@