blockingQueue,
- @NonNull String threadPoolId,
- @NonNull ThreadFactory threadFactory,
- @NonNull RejectedExecutionHandler rejectedExecutionHandler) {
- super(corePoolSize, maximumPoolSize, keepAliveTime, unit, waitForTasksToCompleteOnShutdown, awaitTerminationMillis, blockingQueue, threadPoolId, threadFactory, rejectedExecutionHandler);
- this.threadPoolId = threadPoolId;
- this.executeTimeOut = executeTimeOut;
- // Number of dynamic proxy denial policies.
- RejectedExecutionHandler rejectedProxy = RejectedProxyUtil.createProxy(rejectedExecutionHandler, threadPoolId, rejectCount);
- setRejectedExecutionHandler(rejectedProxy);
- // Redundant fields to avoid reflecting the acquired fields when sending change information.
- redundancyHandler = rejectedExecutionHandler;
+@Slf4j
+public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor implements DisposableBean {
+
+ /**
+ * Creates a new {@code DynamicThreadPoolExecutor} with the given initial parameters.
+ *
+ * @param threadPoolId thread-pool id
+ * @param executeTimeOut execute time out
+ * @param waitForTasksToCompleteOnShutdown wait for tasks to complete on shutdown
+ * @param awaitTerminationMillis await termination millis
+ * @param corePoolSize the number of threads to keep in the pool, even
+ * if they are idle, unless {@code allowCoreThreadTimeOut} is set
+ * @param maximumPoolSize the maximum number of threads to allow in the
+ * pool
+ * @param keepAliveTime when the number of threads is greater than
+ * the core, this is the maximum time that excess idle threads
+ * will wait for new tasks before terminating.
+ * @param unit the time unit for the {@code keepAliveTime} argument
+ * @param blockingQueue the queue to use for holding tasks before they are
+ * executed. This queue will hold only the {@code Runnable}
+ * tasks submitted by the {@code execute} method.
+ * @param threadFactory the factory to use when the executor creates a new thread
+ * @param rejectedExecutionHandler the handler to use when execution is blocked because the thread bounds and queue capacities are reached
+ * @throws IllegalArgumentException if one of the following holds:
+ * {@code corePoolSize < 0}
+ * {@code keepAliveTime < 0}
+ * {@code maximumPoolSize <= 0}
+ * {@code maximumPoolSize < corePoolSize}
+ * @throws NullPointerException if {@code workQueue}
+ * or {@code threadFactory} or {@code handler} is null
+ */
+ public DynamicThreadPoolExecutor(
+ int corePoolSize, int maximumPoolSize,
+ long keepAliveTime, TimeUnit unit,
+ long executeTimeOut, boolean waitForTasksToCompleteOnShutdown, long awaitTerminationMillis,
+ @NonNull BlockingQueue blockingQueue,
+ @NonNull String threadPoolId,
+ @NonNull ThreadFactory threadFactory,
+ @NonNull RejectedExecutionHandler rejectedExecutionHandler) {
+ super(
+ threadPoolId, new DefaultThreadPoolPluginRegistry(),
+ corePoolSize, maximumPoolSize, keepAliveTime, unit,
+ blockingQueue, threadFactory, rejectedExecutionHandler
+ );
+ log.info("Initializing ExecutorService" + threadPoolId);
+
+ // init default aware processor
+ new DefaultThreadPoolPluginRegistrar(executeTimeOut, awaitTerminationMillis, waitForTasksToCompleteOnShutdown)
+ .doRegister(this, this);
}
+ /**
+ * Invoked by the containing {@code BeanFactory} on destruction of a bean.
+ *
+ * @throws Exception in case of shutdown errors. Exceptions will get logged
+ * but not rethrown to allow other beans to release their resources as well.
+ */
@Override
- public void execute(@NonNull Runnable command) {
- if (taskDecorator != null) {
- command = taskDecorator.decorate(command);
- }
- super.execute(command);
+ public void destroy() throws Exception {
+ getAndThen(
+ ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME,
+ ThreadPoolExecutorShutdownPlugin.class,
+ processor -> {
+ if (processor.isWaitForTasksToCompleteOnShutdown()) {
+ super.shutdown();
+ } else {
+ super.shutdownNow();
+ }
+ });
}
- @Override
- protected void beforeExecute(Thread t, Runnable r) {
- if (executeTimeOut == null || executeTimeOut <= 0) {
- return;
- }
- startTimeThreadLocal.set(SystemClock.now());
+ public long getAwaitTerminationMillis() {
+ return getAndThen(
+ ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME,
+ ThreadPoolExecutorShutdownPlugin.class,
+ ThreadPoolExecutorShutdownPlugin::getAwaitTerminationMillis, -1L
+ );
}
- @Override
- protected void afterExecute(Runnable r, Throwable t) {
- Long startTime;
- if ((startTime = startTimeThreadLocal.get()) == null) {
- return;
- }
- try {
- long endTime = SystemClock.now();
- long executeTime;
- boolean executeTimeAlarm = (executeTime = (endTime - startTime)) > executeTimeOut;
- if (executeTimeAlarm && ApplicationContextHolder.getInstance() != null) {
- ThreadPoolNotifyAlarmHandler notifyAlarmHandler = ApplicationContextHolder.getBean(ThreadPoolNotifyAlarmHandler.class);
- if (notifyAlarmHandler != null) {
- notifyAlarmHandler.asyncSendExecuteTimeOutAlarm(threadPoolId, executeTime, executeTimeOut, this);
+ public boolean isWaitForTasksToCompleteOnShutdown() {
+ return getAndThen(
+ ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME,
+ ThreadPoolExecutorShutdownPlugin.class,
+ ThreadPoolExecutorShutdownPlugin::isWaitForTasksToCompleteOnShutdown, false
+ );
+ }
+
+ /**
+ * Set support param.
+ *
+ * @param awaitTerminationMillis await termination millis
+ * @param waitForTasksToCompleteOnShutdown wait for tasks to complete on shutdown
+ */
+ public void setSupportParam(long awaitTerminationMillis, boolean waitForTasksToCompleteOnShutdown) {
+ getAndThen(
+ ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME,
+ ThreadPoolExecutorShutdownPlugin.class,
+ processor -> processor.setAwaitTerminationMillis(awaitTerminationMillis)
+ .setWaitForTasksToCompleteOnShutdown(waitForTasksToCompleteOnShutdown)
+ );
+ }
+
+ public Long getRejectCountNum() {
+ return getAndThen(
+ TaskRejectCountRecordPlugin.PLUGIN_NAME,
+ TaskRejectCountRecordPlugin.class,
+ TaskRejectCountRecordPlugin::getRejectCountNum, -1L
+ );
+ }
+
+ public Long getExecuteTimeOut() {
+ return getAndThen(
+ TaskTimeoutNotifyAlarmPlugin.PLUGIN_NAME,
+ TaskTimeoutNotifyAlarmPlugin.class,
+ TaskTimeoutNotifyAlarmPlugin::getExecuteTimeOut, -1L
+ );
+ }
+
+ public void setExecuteTimeOut(Long executeTimeOut) {
+ getAndThen(
+ TaskTimeoutNotifyAlarmPlugin.PLUGIN_NAME,
+ TaskTimeoutNotifyAlarmPlugin.class,
+ processor -> processor.setExecuteTimeOut(executeTimeOut)
+ );
+ }
+
+ public TaskDecorator getTaskDecorator() {
+ return getAndThen(
+ TaskDecoratorPlugin.PLUGIN_NAME,
+ TaskDecoratorPlugin.class,
+ processor -> CollectionUtil.getFirst(processor.getDecorators()), null
+ );
+ }
+
+ public void setTaskDecorator(TaskDecorator taskDecorator) {
+ if (Objects.nonNull(taskDecorator)) {
+ getAndThen(
+ TaskDecoratorPlugin.PLUGIN_NAME,
+ TaskDecoratorPlugin.class,
+ processor -> {
+ processor.clearDecorators();
+ processor.addDecorator(taskDecorator);
}
- }
- } finally {
- startTimeThreadLocal.remove();
+ );
}
}
- @Override
- protected ExecutorService initializeExecutor() {
- return this;
+ public RejectedExecutionHandler getRedundancyHandler() {
+ return getRejectedExecutionHandler();
}
- public Long getRejectCountNum() {
- return rejectCount.get();
+ public void getRedundancyHandler(RejectedExecutionHandler handler) {
+ setRejectedExecutionHandler(handler);
}
+
}
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java
index a8827549..2e2abc4e 100644
--- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java
@@ -17,7 +17,6 @@
package cn.hippo4j.core.executor;
-import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.core.executor.support.CommonDynamicThreadPool;
import lombok.AllArgsConstructor;
import lombok.Builder;
@@ -67,8 +66,8 @@ public class DynamicThreadPoolWrapper implements DisposableBean {
@Override
public void destroy() throws Exception {
- if (executor != null && executor instanceof AbstractDynamicExecutorSupport) {
- ((AbstractDynamicExecutorSupport) executor).destroy();
+ if (executor != null && executor instanceof DynamicThreadPoolExecutor) {
+ ((DynamicThreadPoolExecutor) executor).destroy();
}
}
}
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutor.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutor.java
new file mode 100644
index 00000000..ac683582
--- /dev/null
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ExtensibleThreadPoolExecutor.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.core.executor;
+
+import cn.hippo4j.core.plugin.*;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.Setter;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.*;
+
+/**
+ * Extensible thread-pool executor.
+ * Support the callback extension points provided on the basis of {@link ThreadPoolExecutor}.
+ * Each extension point corresponds to a different {@link ThreadPoolPlugin} interface,
+ * users can customize plug-ins and implement one or more {@link ThreadPoolPlugin} interface
+ * to enable plugins to sense thread pool behavior and provide extended functions.
+ *
+ * @author huangchengxing
+ */
+public class ExtensibleThreadPoolExecutor
+ extends ThreadPoolExecutor implements ThreadPoolPluginRegistryDelegate {
+
+ /**
+ * thread pool id
+ */
+ @Getter
+ @NonNull
+ private final String threadPoolId;
+
+ /**
+ * action aware registry
+ */
+ @Getter
+ private final ThreadPoolPluginRegistry threadPoolPluginRegistry;
+
+ /**
+ * handler wrapper, any changes to the current instance {@link RejectedExecutionHandler} should be made through this wrapper
+ */
+ private final RejectedAwareHandlerWrapper handlerWrapper;
+
+ /**
+ * Creates a new {@code ExtensibleThreadPoolExecutor} with the given initial parameters.
+ *
+ * @param threadPoolId thread-pool id
+ * @param threadPoolPluginRegistry action aware registry
+ * @param corePoolSize the number of threads to keep in the pool, even
+ * if they are idle, unless {@code allowCoreThreadTimeOut} is set
+ * @param maximumPoolSize the maximum number of threads to allow in the
+ * pool
+ * @param keepAliveTime when the number of threads is greater than
+ * the core, this is the maximum time that excess idle threads
+ * will wait for new tasks before terminating.
+ * @param unit the time unit for the {@code keepAliveTime} argument
+ * @param workQueue the queue to use for holding tasks before they are
+ * executed. This queue will hold only the {@code Runnable}
+ * tasks submitted by the {@code execute} method.
+ * @param threadFactory the factory to use when the executor
+ * creates a new thread
+ * @param handler the handler to use when execution is blocked
+ * because the thread bounds and queue capacities are reached
+ * @throws IllegalArgumentException if one of the following holds:
+ * {@code corePoolSize < 0}
+ * {@code keepAliveTime < 0}
+ * {@code maximumPoolSize <= 0}
+ * {@code maximumPoolSize < corePoolSize}
+ * @throws NullPointerException if {@code workQueue}
+ * or {@code threadFactory} or {@code handler} is null
+ */
+ public ExtensibleThreadPoolExecutor(
+ @NonNull String threadPoolId,
+ @NonNull ThreadPoolPluginRegistry threadPoolPluginRegistry,
+ int corePoolSize, int maximumPoolSize,
+ long keepAliveTime, TimeUnit unit,
+ @NonNull BlockingQueue workQueue,
+ @NonNull ThreadFactory threadFactory,
+ @NonNull RejectedExecutionHandler handler) {
+ super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
+
+ // pool extended info
+ this.threadPoolId = threadPoolId;
+ this.threadPoolPluginRegistry = threadPoolPluginRegistry;
+
+ // proxy handler to support Aware callback
+ while (handler instanceof RejectedAwareHandlerWrapper) {
+ handler = ((RejectedAwareHandlerWrapper) handler).getHandler();
+ }
+ this.handlerWrapper = new RejectedAwareHandlerWrapper(threadPoolPluginRegistry, handler);
+ super.setRejectedExecutionHandler(handlerWrapper);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * Before calling the parent class method, {@link ExecuteAwarePlugin#beforeExecute} will be called first.
+ *
+ * @param thread the thread that will run task {@code r}
+ * @param runnable the task that will be executed
+ */
+ @Override
+ protected void beforeExecute(Thread thread, Runnable runnable) {
+ Collection executeAwarePluginList = threadPoolPluginRegistry.getExecuteAwareList();
+ executeAwarePluginList.forEach(aware -> aware.beforeExecute(thread, runnable));
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * Before calling the parent class method, {@link ExecuteAwarePlugin#execute} will be called first.
+ *
+ * @param runnable the task to execute
+ */
+ @Override
+ public void execute(@NonNull Runnable runnable) {
+ Collection executeAwarePluginList = threadPoolPluginRegistry.getExecuteAwareList();
+ for (ExecuteAwarePlugin executeAwarePlugin : executeAwarePluginList) {
+ runnable = executeAwarePlugin.execute(runnable);
+ }
+ super.execute(runnable);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * After calling the parent class method, {@link ExecuteAwarePlugin#afterExecute} will be called last.
+ *
+ * @param runnable the runnable that has completed
+ * @param throwable the exception that caused termination, or null if
+ * execution completed normally
+ */
+ @Override
+ protected void afterExecute(Runnable runnable, Throwable throwable) {
+ Collection executeAwarePluginList = threadPoolPluginRegistry.getExecuteAwareList();
+ executeAwarePluginList.forEach(aware -> aware.afterExecute(runnable, throwable));
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * Before calling the parent class method,
+ * {@link ShutdownAwarePlugin#beforeShutdown} will be called first.
+ * and then will be call {@link ShutdownAwarePlugin#afterShutdown}
+ *
+ * @throws SecurityException {@inheritDoc}
+ */
+ @Override
+ public void shutdown() {
+ Collection shutdownAwarePluginList = threadPoolPluginRegistry.getShutdownAwareList();
+ shutdownAwarePluginList.forEach(aware -> aware.beforeShutdown(this));
+ super.shutdown();
+ shutdownAwarePluginList.forEach(aware -> aware.afterShutdown(this, Collections.emptyList()));
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * Before calling the parent class method,
+ * {@link ShutdownAwarePlugin#beforeShutdown} will be called first.
+ * and then will be call {@link ShutdownAwarePlugin#afterShutdown}
+ *
+ * @throws SecurityException
+ */
+ @Override
+ public List shutdownNow() {
+ Collection shutdownAwarePluginList = threadPoolPluginRegistry.getShutdownAwareList();
+ shutdownAwarePluginList.forEach(aware -> aware.beforeShutdown(this));
+ List tasks = super.shutdownNow();
+ shutdownAwarePluginList.forEach(aware -> aware.afterShutdown(this, tasks));
+ return tasks;
+ }
+
+ /**
+ * {@inheritDoc}.
+ *
+ * Before calling the parent class method, {@link ShutdownAwarePlugin#afterTerminated} will be called first.
+ */
+ @Override
+ protected void terminated() {
+ super.terminated();
+ Collection shutdownAwarePluginList = threadPoolPluginRegistry.getShutdownAwareList();
+ shutdownAwarePluginList.forEach(aware -> aware.afterTerminated(this));
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * Before calling the parent class method, {@link TaskAwarePlugin#beforeTaskCreate} will be called first.
+ *
+ * @param runnable the runnable task being wrapped
+ * @param value the default value for the returned future
+ * @return a {@code RunnableFuture} which, when run, will run the
+ * underlying runnable and which, as a {@code Future}, will yield
+ * the given value as its result and provide for cancellation of
+ * the underlying task
+ * @since 1.6
+ */
+ @Override
+ protected RunnableFuture newTaskFor(Runnable runnable, T value) {
+ Collection taskAwarePluginList = threadPoolPluginRegistry.getTaskAwareList();
+ for (TaskAwarePlugin taskAwarePlugin : taskAwarePluginList) {
+ runnable = taskAwarePlugin.beforeTaskCreate(this, runnable, value);
+ }
+ return super.newTaskFor(runnable, value);
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * Before calling the parent class method, {@link TaskAwarePlugin#beforeTaskCreate} will be called first.
+ *
+ * @param callable the callable task being wrapped
+ * @return a {@code RunnableFuture} which, when run, will call the
+ * underlying callable and which, as a {@code Future}, will yield
+ * the callable's result as its result and provide for
+ * cancellation of the underlying task
+ * @since 1.6
+ */
+ @Override
+ protected RunnableFuture newTaskFor(Callable callable) {
+ Collection taskAwarePluginList = threadPoolPluginRegistry.getTaskAwareList();
+ for (TaskAwarePlugin taskAwarePlugin : taskAwarePluginList) {
+ callable = taskAwarePlugin.beforeTaskCreate(this, callable);
+ }
+ return super.newTaskFor(callable);
+ }
+
+ /**
+ * Sets a new handler for unexecutable tasks.
+ *
+ * @param handler the new handler
+ * @throws NullPointerException if handler is null
+ * @see #getRejectedExecutionHandler
+ */
+ @Override
+ public void setRejectedExecutionHandler(@NonNull RejectedExecutionHandler handler) {
+ while (handler instanceof RejectedAwareHandlerWrapper) {
+ handler = ((RejectedAwareHandlerWrapper) handler).getHandler();
+ }
+ handlerWrapper.setHandler(handler);
+ }
+
+ /**
+ * Returns the current handler for unexecutable tasks.
+ *
+ * @return the current handler
+ * @see #setRejectedExecutionHandler(RejectedExecutionHandler)
+ */
+ @Override
+ public RejectedExecutionHandler getRejectedExecutionHandler() {
+ return handlerWrapper.getHandler();
+ }
+
+ /**
+ * Wrapper of original {@link RejectedExecutionHandler} of {@link ThreadPoolExecutor},
+ * It's used to support the {@link RejectedAwarePlugin} on the basis of the {@link RejectedExecutionHandler}.
+ *
+ * @author huangchengxing
+ * @see RejectedAwarePlugin
+ */
+ @AllArgsConstructor
+ public static class RejectedAwareHandlerWrapper implements RejectedExecutionHandler {
+
+ /**
+ * thread-pool action aware registry
+ */
+ private final ThreadPoolPluginRegistry registry;
+
+ /**
+ * original target
+ */
+ @NonNull
+ @Setter
+ @Getter
+ private RejectedExecutionHandler handler;
+
+ /**
+ * Call {@link RejectedAwarePlugin#beforeRejectedExecution}, then reject the task
+ *
+ * @param r the runnable task requested to be executed
+ * @param executor the executor attempting to execute this task
+ */
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+ Collection rejectedAwarePluginList = registry.getRejectedAwareList();
+ rejectedAwarePluginList.forEach(aware -> aware.beforeRejectedExecution(r, executor));
+ handler.rejectedExecution(r, executor);
+ }
+
+ }
+}
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java
index 77237dd4..7b832473 100644
--- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java
@@ -22,13 +22,13 @@ import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
-import cn.hippo4j.core.toolkit.IdentifyUtil;
import cn.hippo4j.core.toolkit.ExecutorTraceContextUtil;
-import cn.hippo4j.message.service.Hippo4jSendMessageService;
+import cn.hippo4j.core.toolkit.IdentifyUtil;
import cn.hippo4j.message.enums.NotifyTypeEnum;
-import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import cn.hippo4j.message.request.AlarmNotifyRequest;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
+import cn.hippo4j.message.service.Hippo4jSendMessageService;
+import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -211,9 +211,7 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
*/
public AlarmNotifyRequest buildAlarmNotifyRequest(ThreadPoolExecutor threadPoolExecutor) {
BlockingQueue blockingQueue = threadPoolExecutor.getQueue();
- RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor instanceof DynamicThreadPoolExecutor
- ? ((DynamicThreadPoolExecutor) threadPoolExecutor).getRedundancyHandler()
- : threadPoolExecutor.getRejectedExecutionHandler();
+ RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler();
long rejectCount = threadPoolExecutor instanceof DynamicThreadPoolExecutor
? ((DynamicThreadPoolExecutor) threadPoolExecutor).getRejectCountNum()
: -1L;
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/ThreadPoolRunStateHandler.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/ThreadPoolRunStateHandler.java
index 62b9b1ab..81f17859 100644
--- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/ThreadPoolRunStateHandler.java
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/ThreadPoolRunStateHandler.java
@@ -21,10 +21,8 @@ import cn.hippo4j.common.model.ManyThreadPoolRunStateInfo;
import cn.hippo4j.common.model.ThreadPoolRunStateInfo;
import cn.hippo4j.common.toolkit.BeanUtil;
import cn.hippo4j.common.toolkit.ByteConvertUtil;
-import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
-import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.core.toolkit.inet.InetUtils;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -70,11 +68,7 @@ public class ThreadPoolRunStateHandler extends AbstractThreadPoolRuntime {
DynamicThreadPoolWrapper executorService = GlobalThreadPoolManage.getExecutorService(threadPoolId);
ThreadPoolExecutor pool = executorService.getExecutor();
String rejectedName;
- if (pool instanceof AbstractDynamicExecutorSupport) {
- rejectedName = ((DynamicThreadPoolExecutor) pool).getRedundancyHandler().getClass().getSimpleName();
- } else {
- rejectedName = pool.getRejectedExecutionHandler().getClass().getSimpleName();
- }
+ rejectedName = pool.getRejectedExecutionHandler().getClass().getSimpleName();
poolRunStateInfo.setRejectedName(rejectedName);
ManyThreadPoolRunStateInfo manyThreadPoolRunStateInfo = BeanUtil.convert(poolRunStateInfo, ManyThreadPoolRunStateInfo.class);
manyThreadPoolRunStateInfo.setIdentify(CLIENT_IDENTIFICATION_VALUE);
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/AbstractDynamicExecutorSupport.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/AbstractDynamicExecutorSupport.java
index cff8087c..d9aae7d5 100644
--- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/AbstractDynamicExecutorSupport.java
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/AbstractDynamicExecutorSupport.java
@@ -17,6 +17,7 @@
package cn.hippo4j.core.executor.support;
+import cn.hippo4j.core.plugin.impl.ThreadPoolExecutorShutdownPlugin;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
@@ -25,7 +26,10 @@ import java.util.concurrent.*;
/**
* Dynamic executor configuration support.
+ *
+ * @deprecated used {@link ThreadPoolExecutorShutdownPlugin} to get thread pool shutdown support
*/
+@Deprecated
@Slf4j
public abstract class AbstractDynamicExecutorSupport extends ThreadPoolExecutor implements InitializingBean, DisposableBean {
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginRegistrar.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginRegistrar.java
new file mode 100644
index 00000000..adf9b1c4
--- /dev/null
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginRegistrar.java
@@ -0,0 +1,105 @@
+package cn.hippo4j.core.plugin;
+
+import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
+import cn.hippo4j.core.plugin.impl.*;
+import lombok.RequiredArgsConstructor;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.BeanFactoryUtils;
+import org.springframework.beans.factory.BeanNameAware;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.core.AliasRegistry;
+
+import java.util.Objects;
+
+/**
+ * Register default {@link ThreadPoolPlugin}.
+ *
+ * @author huangchengxing
+ * @see TaskDecoratorPlugin
+ * @see TaskTimeoutNotifyAlarmPlugin
+ * @see TaskRejectCountRecordPlugin
+ * @see TaskRejectNotifyAlarmPlugin
+ * @see ThreadPoolExecutorShutdownPlugin
+ */
+@RequiredArgsConstructor
+public class DefaultThreadPoolPluginRegistrar
+ implements ThreadPoolPluginRegistrar, ApplicationContextAware, BeanNameAware {
+
+ public static final String REGISTRAR_NAME = "DefaultThreadPoolPluginRegistrar";
+
+ /**
+ * aliasRegistry
+ */
+ private AliasRegistry aliasRegistry;
+
+ /**
+ * execute time out
+ */
+ private final long executeTimeOut;
+
+ /**
+ * await termination millis
+ */
+ private final long awaitTerminationMillis;
+
+ /**
+ * wait for tasks to complete on shutdown
+ */
+ private final boolean waitForTasksToCompleteOnShutdown;
+
+ /**
+ * Get id.
+ *
+ * @return id
+ */
+ @Override
+ public String getId() {
+ return REGISTRAR_NAME;
+ }
+
+ /**
+ * Create and register plugin for the specified thread-pool instance
+ *
+ * @param registry thread pool plugin registry
+ * @param executor executor
+ */
+ @Override
+ public void doRegister(ThreadPoolPluginRegistry registry, ExtensibleThreadPoolExecutor executor) {
+ // callback when task execute
+ registry.register(new TaskDecoratorPlugin());
+ registry.register(new TaskTimeoutNotifyAlarmPlugin(executeTimeOut, executor));
+ // callback when task rejected
+ registry.register(new TaskRejectCountRecordPlugin());
+ registry.register(new TaskRejectNotifyAlarmPlugin());
+ // callback when pool shutdown
+ registry.register(new ThreadPoolExecutorShutdownPlugin(awaitTerminationMillis, waitForTasksToCompleteOnShutdown));
+ }
+
+ /**
+ * Set the name of the bean in the bean factory that created this bean.
+ * Invoked after population of normal bean properties but before an
+ * init callback such as {@link InitializingBean#afterPropertiesSet()}
+ * or a custom init-method.
+ *
+ * @param name the name of the bean in the factory.
+ * Note that this name is the actual bean name used in the factory, which may
+ * differ from the originally specified name: in particular for inner bean
+ * names, the actual bean name might have been made unique through appending
+ * "#..." suffixes. Use the {@link BeanFactoryUtils#originalBeanName(String)}
+ * method to extract the original bean name (without suffix), if desired.
+ */
+ @Override
+ public void setBeanName(String name) {
+ if (Objects.nonNull(aliasRegistry)) {
+ aliasRegistry.registerAlias(name, getId());
+ }
+ }
+
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ this.aliasRegistry = applicationContext.getBean(AliasRegistry.class);
+ }
+
+}
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginRegistry.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginRegistry.java
new file mode 100644
index 00000000..86c238d0
--- /dev/null
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginRegistry.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.core.plugin;
+
+import cn.hippo4j.common.toolkit.Assert;
+import lombok.NonNull;
+
+import java.util.*;
+
+/**
+ * The default implementation of {@link ThreadPoolPluginRegistry}.
+ *
+ * @author huangchengxing
+ */
+public class DefaultThreadPoolPluginRegistry implements ThreadPoolPluginRegistry {
+
+ /**
+ * Registered {@link ThreadPoolPlugin}.
+ */
+ private final Map registeredPlugins = new HashMap<>(16);
+
+ /**
+ * Registered {@link TaskAwarePlugin}.
+ */
+ private final List taskAwarePluginList = new ArrayList<>();
+
+ /**
+ * Registered {@link ExecuteAwarePlugin}.
+ */
+ private final List executeAwarePluginList = new ArrayList<>();
+
+ /**
+ * Registered {@link RejectedAwarePlugin}.
+ */
+ private final List rejectedAwarePluginList = new ArrayList<>();
+
+ /**
+ * Registered {@link ShutdownAwarePlugin}.
+ */
+ private final List shutdownAwarePluginList = new ArrayList<>();
+
+ /**
+ * Clear all.
+ */
+ @Override
+ public synchronized void clear() {
+ registeredPlugins.clear();
+ taskAwarePluginList.clear();
+ executeAwarePluginList.clear();
+ rejectedAwarePluginList.clear();
+ shutdownAwarePluginList.clear();
+ }
+
+ /**
+ * Register a {@link ThreadPoolPlugin}
+ *
+ * @param aware aware
+ * @throws IllegalArgumentException thrown when a plugin with the same {@link ThreadPoolPlugin#getId()} already exists in the registry
+ * @see ThreadPoolPlugin#getId()
+ */
+ @Override
+ public synchronized void register(@NonNull ThreadPoolPlugin aware) {
+ String id = aware.getId();
+ Assert.isTrue(!isRegistered(id), "The plug-in with id [" + id + "] has been registered");
+
+ // register aware
+ registeredPlugins.put(id, aware);
+ // quick index
+ if (aware instanceof TaskAwarePlugin) {
+ taskAwarePluginList.add((TaskAwarePlugin) aware);
+ }
+ if (aware instanceof ExecuteAwarePlugin) {
+ executeAwarePluginList.add((ExecuteAwarePlugin) aware);
+ }
+ if (aware instanceof RejectedAwarePlugin) {
+ rejectedAwarePluginList.add((RejectedAwarePlugin) aware);
+ }
+ if (aware instanceof ShutdownAwarePlugin) {
+ shutdownAwarePluginList.add((ShutdownAwarePlugin) aware);
+ }
+ }
+
+ /**
+ * Unregister {@link ThreadPoolPlugin}
+ *
+ * @param id name
+ */
+ @Override
+ public synchronized void unregister(String id) {
+ Optional.ofNullable(id)
+ .map(registeredPlugins::remove)
+ .ifPresent(old -> {
+ if (old instanceof TaskAwarePlugin) {
+ taskAwarePluginList.remove(old);
+ }
+ if (old instanceof ExecuteAwarePlugin) {
+ executeAwarePluginList.remove(old);
+ }
+ if (old instanceof RejectedAwarePlugin) {
+ rejectedAwarePluginList.remove(old);
+ }
+ if (old instanceof ShutdownAwarePlugin) {
+ shutdownAwarePluginList.remove(old);
+ }
+ });
+ }
+
+ /**
+ * Whether the {@link ThreadPoolPlugin} has been registered.
+ *
+ * @param id name
+ * @return ture if target has been registered, false otherwise
+ */
+ @Override
+ public boolean isRegistered(String id) {
+ return registeredPlugins.containsKey(id);
+ }
+
+ /**
+ * Get {@link ThreadPoolPlugin}
+ *
+ * @param id target name
+ * @param target aware type
+ * @return {@link ThreadPoolPlugin}, null if unregister
+ */
+ @Override
+ @SuppressWarnings("unchecked")
+ public A getAware(String id) {
+ return (A) registeredPlugins.get(id);
+ }
+
+ /**
+ * Get execute aware list.
+ *
+ * @return {@link ExecuteAwarePlugin}
+ */
+ @Override
+ public Collection getExecuteAwareList() {
+ return executeAwarePluginList;
+ }
+
+ /**
+ * Get rejected aware list.
+ *
+ * @return {@link RejectedAwarePlugin}
+ */
+ @Override
+ public Collection getRejectedAwareList() {
+ return rejectedAwarePluginList;
+ }
+
+ /**
+ * Get shutdown aware list.
+ *
+ * @return {@link ShutdownAwarePlugin}
+ */
+ @Override
+ public Collection getShutdownAwareList() {
+ return shutdownAwarePluginList;
+ }
+
+ /**
+ * Get shutdown aware list.
+ *
+ * @return {@link ShutdownAwarePlugin}
+ */
+ @Override
+ public Collection getTaskAwareList() {
+ return taskAwarePluginList;
+ }
+
+}
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ExecuteAwarePlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ExecuteAwarePlugin.java
new file mode 100644
index 00000000..413df1f5
--- /dev/null
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ExecuteAwarePlugin.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.core.plugin;
+
+import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
+
+/**
+ * Callback during task execution.
+ *
+ * @author huangchengxing
+ */
+public interface ExecuteAwarePlugin extends ThreadPoolPlugin {
+
+ /**
+ * Callback before task execution.
+ *
+ * @param thread thread of executing task
+ * @param runnable task
+ * @see ExtensibleThreadPoolExecutor#beforeExecute
+ */
+ default void beforeExecute(Thread thread, Runnable runnable) {
+ }
+
+ /**
+ * Callback when task is executed.
+ *
+ * @param runnable runnable
+ * @return tasks to be execute
+ * @see ExtensibleThreadPoolExecutor#execute
+ */
+ default Runnable execute(Runnable runnable) {
+ return runnable;
+ }
+
+ /**
+ * Callback after task execution.
+ *
+ * @param runnable runnable
+ * @param throwable exception thrown during execution
+ * @see ExtensibleThreadPoolExecutor#afterExecute
+ */
+ default void afterExecute(Runnable runnable, Throwable throwable) {
+ // do nothing
+ }
+
+}
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/RejectedAwarePlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/RejectedAwarePlugin.java
new file mode 100644
index 00000000..75bbaa60
--- /dev/null
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/RejectedAwarePlugin.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.core.plugin;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * Callback when task is rejected.
+ *
+ * @author huangchengxing
+ */
+public interface RejectedAwarePlugin extends ThreadPoolPlugin {
+
+ /**
+ * Callback before task is rejected.
+ *
+ * @param runnable task
+ * @param executor executor
+ */
+ default void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
+ // do nothing
+ }
+
+}
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ShutdownAwarePlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ShutdownAwarePlugin.java
new file mode 100644
index 00000000..75f99be2
--- /dev/null
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ShutdownAwarePlugin.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.core.plugin;
+
+import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
+
+import java.util.List;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * Callback before thread-pool shutdown.
+ *
+ * @author huangchengxing
+ */
+public interface ShutdownAwarePlugin extends ThreadPoolPlugin {
+
+ /**
+ * Callback before pool shutdown.
+ *
+ * @param executor executor
+ * @see ThreadPoolExecutor#shutdown()
+ * @see ThreadPoolExecutor#shutdownNow()
+ */
+ default void beforeShutdown(ThreadPoolExecutor executor) {
+ // do nothing
+ }
+
+ /**
+ * Callback after pool shutdown.
+ *
+ * @param executor executor
+ * @param remainingTasks remainingTasks, or empty if no tasks left or {@link ThreadPoolExecutor#shutdown()} called
+ * @see ThreadPoolExecutor#shutdown()
+ * @see ThreadPoolExecutor#shutdownNow()
+ */
+ default void afterShutdown(ThreadPoolExecutor executor, List remainingTasks) {
+ // do nothing
+ }
+
+ /**
+ * Callback after pool terminated.
+ *
+ * @param executor executor
+ * @see ThreadPoolExecutor#terminated()
+ */
+ default void afterTerminated(ExtensibleThreadPoolExecutor executor) {
+ // do nothing
+ }
+
+}
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/TaskAwarePlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/TaskAwarePlugin.java
new file mode 100644
index 00000000..3d4969e1
--- /dev/null
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/TaskAwarePlugin.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.core.plugin;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * Callback during task submit in thread-pool.
+ *
+ * @author huangchengxing
+ */
+public interface TaskAwarePlugin extends ThreadPoolPlugin {
+
+ /**
+ * Callback during the {@link java.util.concurrent.RunnableFuture} task create in thread-pool.
+ *
+ * @param executor executor
+ * @param runnable original task
+ * @return Tasks that really need to be performed
+ * @see ThreadPoolExecutor#newTaskFor(Runnable, Object)
+ */
+ default Runnable beforeTaskCreate(ThreadPoolExecutor executor, Runnable runnable, V value) {
+ return runnable;
+ }
+
+ /**
+ * Callback during the {@link java.util.concurrent.RunnableFuture} task create in thread-pool.
+ *
+ * @param executor executor
+ * @param future original task
+ * @return Tasks that really need to be performed
+ * @see ThreadPoolExecutor#newTaskFor(Callable)
+ */
+ default Callable beforeTaskCreate(ThreadPoolExecutor executor, Callable future) {
+ return future;
+ }
+
+}
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPlugin.java
new file mode 100644
index 00000000..499a7600
--- /dev/null
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPlugin.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.core.plugin;
+
+import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
+
+/**
+ * Thread pool action aware.
+ *
+ * @author huangchengxing
+ * @see ExtensibleThreadPoolExecutor
+ * @see ThreadPoolPluginRegistry
+ * @see TaskAwarePlugin
+ * @see ExecuteAwarePlugin
+ * @see ShutdownAwarePlugin
+ * @see RejectedAwarePlugin
+ */
+public interface ThreadPoolPlugin {
+
+ /**
+ * Get id.
+ *
+ * @return id
+ */
+ String getId();
+
+}
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginRegistrar.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginRegistrar.java
new file mode 100644
index 00000000..a45f28c4
--- /dev/null
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginRegistrar.java
@@ -0,0 +1,27 @@
+package cn.hippo4j.core.plugin;
+
+import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
+
+/**
+ * Factory of {@link ThreadPoolPlugin}.
+ *
+ * @author huangchengxing
+ */
+public interface ThreadPoolPluginRegistrar {
+
+ /**
+ * Get id.
+ *
+ * @return id
+ */
+ String getId();
+
+ /**
+ * Create and register plugin for the specified thread-pool instance
+ *
+ * @param registry thread pool plugin registry
+ * @param executor executor
+ */
+ void doRegister(ThreadPoolPluginRegistry registry, ExtensibleThreadPoolExecutor executor);
+
+}
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginRegistry.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginRegistry.java
new file mode 100644
index 00000000..dd89353d
--- /dev/null
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginRegistry.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.core.plugin;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * Registry of {@link ThreadPoolPlugin}
+ *
+ * @author huangchengxing
+ */
+public interface ThreadPoolPluginRegistry {
+
+ /**
+ * Clear all.
+ */
+ void clear();
+
+ /**
+ * Register a {@link ThreadPoolPlugin}
+ *
+ * @param aware aware
+ */
+ void register(ThreadPoolPlugin aware);
+
+ /**
+ * Whether the {@link ThreadPoolPlugin} has been registered.
+ *
+ * @param name name
+ * @return ture if target has been registered, false otherwise
+ */
+ boolean isRegistered(String name);
+
+ /**
+ * Unregister {@link ThreadPoolPlugin}
+ *
+ * @param name name
+ */
+ void unregister(String name);
+
+ /**
+ * Get {@link ThreadPoolPlugin}
+ *
+ * @param name target name
+ * @param target aware type
+ * @return {@link ThreadPoolPlugin}, null if unregister
+ * @throws ClassCastException thrown when the object obtained by name cannot be converted to target type
+ */
+ A getAware(String name);
+
+ /**
+ * Get execute aware list.
+ *
+ * @return {@link ExecuteAwarePlugin}
+ */
+ Collection getExecuteAwareList();
+
+ /**
+ * Get rejected aware list.
+ *
+ * @return {@link RejectedAwarePlugin}
+ */
+ Collection getRejectedAwareList();
+
+ /**
+ * Get shutdown aware list.
+ *
+ * @return {@link ShutdownAwarePlugin}
+ */
+ Collection getShutdownAwareList();
+
+ /**
+ * Get shutdown aware list.
+ *
+ * @return {@link ShutdownAwarePlugin}
+ */
+ Collection getTaskAwareList();
+
+ /**
+ * Try to get target Aware and apply operation, do nothing if is not present.
+ *
+ * @param name aware name
+ * @param targetType target type
+ * @param consumer operation for target aware
+ * @param aware type
+ * @return this instance
+ * @throws ClassCastException thrown when the object obtained by name cannot be converted to target type
+ */
+ default ThreadPoolPluginRegistry getAndThen(
+ String name, Class targetType, Consumer consumer) {
+ Optional.ofNullable(getAware(name))
+ .map(targetType::cast)
+ .ifPresent(consumer);
+ return this;
+ }
+
+ /**
+ * Try to get target Aware and return value of apply function, return default value if is not present.
+ *
+ * @param name aware name
+ * @param targetType target type
+ * @param function operation for target aware
+ * @param defaultValue default value
+ * @param aware type
+ * @return value of apply function, default value if aware is not present
+ * @throws ClassCastException thrown when the object obtained by name cannot be converted to target type
+ */
+ default R getAndThen(String name, Class targetType, Function function, R defaultValue) {
+ return Optional.ofNullable(getAware(name))
+ .map(targetType::cast)
+ .map(function)
+ .orElse(defaultValue);
+ }
+
+}
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginRegistryDelegate.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginRegistryDelegate.java
new file mode 100644
index 00000000..3652d9fd
--- /dev/null
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/ThreadPoolPluginRegistryDelegate.java
@@ -0,0 +1,112 @@
+package cn.hippo4j.core.plugin;
+
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+import java.util.Collection;
+
+/**
+ * Thread pool action aware registry delegate.
+ *
+ * @author huangchengxing
+ */
+public interface ThreadPoolPluginRegistryDelegate extends ThreadPoolPluginRegistry {
+
+ /**
+ * Get thread pool action aware registry.
+ *
+ * @return {@link ThreadPoolPluginRegistry}
+ */
+ @NonNull
+ ThreadPoolPluginRegistry getThreadPoolPluginRegistry();
+
+ /**
+ * Clear all.
+ */
+ @Override
+ default void clear() {
+ getThreadPoolPluginRegistry().clear();
+ }
+
+ /**
+ * Register a {@link ThreadPoolPlugin}
+ *
+ * @param aware aware
+ */
+ @Override
+ default void register(ThreadPoolPlugin aware) {
+ getThreadPoolPluginRegistry().register(aware);
+ }
+
+ /**
+ * Whether the {@link ThreadPoolPlugin} has been registered.
+ *
+ * @param name name
+ * @return ture if target has been registered, false otherwise
+ */
+ @Override
+ default boolean isRegistered(String name) {
+ return getThreadPoolPluginRegistry().isRegistered(name);
+ }
+
+ /**
+ * Unregister {@link ThreadPoolPlugin}
+ *
+ * @param name name
+ */
+ @Override
+ default void unregister(String name) {
+ getThreadPoolPluginRegistry().unregister(name);
+ }
+
+ /**
+ * Get {@link ThreadPoolPlugin}
+ *
+ * @param name target name
+ * @return {@link ThreadPoolPlugin}, null if unregister
+ * @throws ClassCastException thrown when the object obtained by name cannot be converted to target type
+ */
+ @Override
+ default A getAware(String name) {
+ return getThreadPoolPluginRegistry().getAware(name);
+ }
+
+ /**
+ * Get execute aware list.
+ *
+ * @return {@link ExecuteAwarePlugin}
+ */
+ @Override
+ default Collection getExecuteAwareList() {
+ return getThreadPoolPluginRegistry().getExecuteAwareList();
+ }
+
+ /**
+ * Get rejected aware list.
+ *
+ * @return {@link RejectedAwarePlugin}
+ */
+ @Override
+ default Collection getRejectedAwareList() {
+ return getThreadPoolPluginRegistry().getRejectedAwareList();
+ }
+
+ /**
+ * Get shutdown aware list.
+ *
+ * @return {@link ShutdownAwarePlugin}
+ */
+ @Override
+ default Collection getShutdownAwareList() {
+ return getThreadPoolPluginRegistry().getShutdownAwareList();
+ }
+
+ /**
+ * Get shutdown aware list.
+ *
+ * @return {@link ShutdownAwarePlugin}
+ */
+ @Override
+ default Collection getTaskAwareList() {
+ return getThreadPoolPluginRegistry().getTaskAwareList();
+ }
+}
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskDecoratorPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskDecoratorPlugin.java
new file mode 100644
index 00000000..6187a027
--- /dev/null
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskDecoratorPlugin.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.core.plugin.impl;
+
+import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
+import cn.hippo4j.core.plugin.ExecuteAwarePlugin;
+import lombok.Getter;
+import lombok.NonNull;
+import org.springframework.core.task.TaskDecorator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Decorate tasks when they are submitted to thread-pool.
+ *
+ * @author huangchengxing
+ */
+public class TaskDecoratorPlugin implements ExecuteAwarePlugin {
+
+ public static final String PLUGIN_NAME = "task-decorator-plugin";
+
+ /**
+ * Get id.
+ *
+ * @return id
+ */
+ @Override
+ public String getId() {
+ return PLUGIN_NAME;
+ }
+
+ /**
+ * decorators
+ */
+ @Getter
+ private final List decorators = new ArrayList<>();
+
+ /**
+ * Callback when task is executed.
+ *
+ * @param runnable runnable
+ * @return tasks to be execute
+ * @see ExtensibleThreadPoolExecutor#execute
+ */
+ @Override
+ public Runnable execute(Runnable runnable) {
+ for (TaskDecorator decorator : decorators) {
+ runnable = decorator.decorate(runnable);
+ }
+ return runnable;
+ }
+
+ /**
+ * Add a decorator
+ *
+ * @param decorator decorator
+ */
+ public void addDecorator(@NonNull TaskDecorator decorator) {
+ decorators.remove(decorator);
+ decorators.add(decorator);
+ }
+
+ /**
+ * Clear all decorators
+ *
+ */
+ public void clearDecorators() {
+ decorators.clear();
+ }
+
+ /**
+ * Remove decorators
+ *
+ */
+ public void removeDecorator(TaskDecorator decorator) {
+ decorators.remove(decorator);
+ }
+
+}
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectCountRecordPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectCountRecordPlugin.java
new file mode 100644
index 00000000..6d021b92
--- /dev/null
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectCountRecordPlugin.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.core.plugin.impl;
+
+import cn.hippo4j.core.plugin.RejectedAwarePlugin;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Record the number of tasks rejected by the thread pool.
+ *
+ * @author huangchengxing
+ */
+public class TaskRejectCountRecordPlugin implements RejectedAwarePlugin {
+
+ public static final String PLUGIN_NAME = "task-reject-count-record-plugin";
+
+ /**
+ * Get id.
+ *
+ * @return id
+ */
+ @Override
+ public String getId() {
+ return PLUGIN_NAME;
+ }
+
+ /**
+ * rejection count
+ */
+ @Setter
+ @Getter
+ private AtomicLong rejectCount = new AtomicLong(0);
+
+ /**
+ * Record rejection count.
+ *
+ * @param r task
+ * @param executor executor
+ */
+ @Override
+ public void beforeRejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+ rejectCount.incrementAndGet();
+ }
+
+ /**
+ * Get reject count num
+ *
+ * @return reject count num
+ */
+ public Long getRejectCountNum() {
+ return rejectCount.get();
+ }
+
+}
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPlugin.java
new file mode 100644
index 00000000..d4f9eebb
--- /dev/null
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPlugin.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.core.plugin.impl;
+
+import cn.hippo4j.common.config.ApplicationContextHolder;
+import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
+import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
+import cn.hippo4j.core.plugin.RejectedAwarePlugin;
+
+import java.util.Optional;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * Send alert notification when a task is rejected.
+ *
+ * @author huangchengxing
+ */
+public class TaskRejectNotifyAlarmPlugin implements RejectedAwarePlugin {
+
+ public static final String PLUGIN_NAME = "task-reject-notify-alarm-plugin";
+
+ /**
+ * Get id.
+ *
+ * @return id
+ */
+ @Override
+ public String getId() {
+ return PLUGIN_NAME;
+ }
+
+ /**
+ * Callback before task is rejected.
+ *
+ * @param runnable task
+ * @param executor executor
+ */
+ @Override
+ public void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
+ if (!(executor instanceof ExtensibleThreadPoolExecutor)) {
+ return;
+ }
+ String threadPoolId = ((ExtensibleThreadPoolExecutor) executor).getThreadPoolId();
+ Optional.ofNullable(ApplicationContextHolder.getInstance())
+ .map(context -> context.getBean(ThreadPoolNotifyAlarmHandler.class))
+ .ifPresent(handler -> handler.asyncSendRejectedAlarm(threadPoolId));
+ }
+}
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordAwareProcessorPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordAwareProcessorPlugin.java
new file mode 100644
index 00000000..17b89e75
--- /dev/null
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordAwareProcessorPlugin.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.core.plugin.impl;
+
+import cn.hippo4j.common.toolkit.SyncTimeRecorder;
+import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
+import cn.hippo4j.core.plugin.ExecuteAwarePlugin;
+import cn.hippo4j.core.toolkit.SystemClock;
+import lombok.RequiredArgsConstructor;
+
+import java.util.Objects;
+
+/**
+ * Record task execution time indicator.
+ *
+ * @author huangchengxing
+ * @see TaskTimeoutNotifyAlarmPlugin
+ */
+@RequiredArgsConstructor
+public class TaskTimeRecordAwareProcessorPlugin extends SyncTimeRecorder implements ExecuteAwarePlugin {
+
+ public static final String PLUGIN_NAME = "task-time-record-aware-processor";
+
+ /**
+ * Get id.
+ *
+ * @return id
+ */
+ @Override
+ public String getId() {
+ return PLUGIN_NAME;
+ }
+
+ /**
+ * start times of executed tasks
+ */
+ private final ThreadLocal startTimes = new ThreadLocal<>();
+
+ /**
+ * Record the time when the worker thread starts executing the task.
+ *
+ * @param thread thread of executing task
+ * @param runnable task
+ * @see ExtensibleThreadPoolExecutor#beforeExecute
+ */
+ @Override
+ public void beforeExecute(Thread thread, Runnable runnable) {
+ startTimes.set(SystemClock.now());
+ }
+
+ /**
+ * Record the total time for the worker thread to complete the task, and update the time record.
+ *
+ * @param runnable runnable
+ * @param throwable exception thrown during execution
+ */
+ @Override
+ public void afterExecute(Runnable runnable, Throwable throwable) {
+ try {
+ Long startTime = startTimes.get();
+ if (Objects.isNull(startTime)) {
+ return;
+ }
+ long executeTime = SystemClock.now() - startTime;
+ refreshTime(executeTime);
+ afterRefreshTime(executeTime);
+ } finally {
+ startTimes.remove();
+ }
+ }
+
+ /**
+ * The callback function provided to the subclass, which is called after {@link #refreshTime}
+ *
+ * @param executeTime executeTime
+ */
+ protected void afterRefreshTime(long executeTime) {
+ // do nothing
+ }
+
+}
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPlugin.java
new file mode 100644
index 00000000..3ee6ed36
--- /dev/null
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPlugin.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.core.plugin.impl;
+
+import cn.hippo4j.common.config.ApplicationContextHolder;
+import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
+import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.Optional;
+
+/**
+ * Record task execution time indicator,
+ * and send alarm notification when the execution time exceeds the threshold.
+ *
+ * @author huangchengxing
+ */
+@AllArgsConstructor
+public class TaskTimeoutNotifyAlarmPlugin extends TaskTimeRecordAwareProcessorPlugin {
+
+ public static final String PLUGIN_NAME = "task-timeout-notify-alarm-plugin";
+
+ /**
+ * Get id.
+ *
+ * @return id
+ */
+ @Override
+ public String getId() {
+ return PLUGIN_NAME;
+ }
+
+ @Getter
+ @Setter
+ private Long executeTimeOut;
+
+ /**
+ * thread-pool
+ */
+ private final ExtensibleThreadPoolExecutor threadPoolExecutor;
+
+ /**
+ * Check whether the task execution time exceeds {@link #executeTimeOut},
+ * if it exceeds this time, send an alarm notification.
+ *
+ * @param executeTime executeTime in nanosecond
+ */
+ @Override
+ public void afterRefreshTime(long executeTime) {
+ if (executeTime <= executeTimeOut) {
+ return;
+ }
+ Optional.ofNullable(ApplicationContextHolder.getInstance())
+ .map(context -> context.getBean(ThreadPoolNotifyAlarmHandler.class))
+ .ifPresent(handler -> handler.asyncSendExecuteTimeOutAlarm(
+ threadPoolExecutor.getThreadPoolId(), executeTime, executeTimeOut, threadPoolExecutor));
+ }
+}
diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/ThreadPoolExecutorShutdownPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/ThreadPoolExecutorShutdownPlugin.java
new file mode 100644
index 00000000..10c7169f
--- /dev/null
+++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/ThreadPoolExecutorShutdownPlugin.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package cn.hippo4j.core.plugin.impl;
+
+import cn.hippo4j.common.toolkit.CollectionUtil;
+import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
+import cn.hippo4j.core.plugin.ShutdownAwarePlugin;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.experimental.Accessors;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+import java.util.concurrent.*;
+
+/**
+ * After the thread pool calls {@link ThreadPoolExecutor#shutdown()} or {@link ThreadPoolExecutor#shutdownNow()},
+ * if necessary, cancel the remaining tasks in the pool,
+ * and wait for the thread pool to terminate until
+ * the blocked main thread has timed out or the thread pool has completely terminated.
+ *
+ * @author huangchengxing
+ */
+@Accessors(chain = true)
+@Getter
+@Slf4j
+@AllArgsConstructor
+public class ThreadPoolExecutorShutdownPlugin implements ShutdownAwarePlugin {
+
+ public static final String PLUGIN_NAME = "thread-pool-executor-shutdown-plugin";
+
+ /**
+ * Get id.
+ *
+ * @return id
+ */
+ @Override
+ public String getId() {
+ return PLUGIN_NAME;
+ }
+
+ /**
+ * await termination millis
+ */
+ @Setter
+ public long awaitTerminationMillis;
+
+ /**
+ * wait for tasks to complete on shutdown
+ */
+ @Setter
+ public boolean waitForTasksToCompleteOnShutdown;
+
+ /**
+ * Callback before pool shutdown.
+ *
+ * @param executor executor
+ */
+ @Override
+ public void beforeShutdown(ThreadPoolExecutor executor) {
+ if (executor instanceof ExtensibleThreadPoolExecutor) {
+ ExtensibleThreadPoolExecutor dynamicThreadPoolExecutor = (ExtensibleThreadPoolExecutor) executor;
+ String threadPoolId = dynamicThreadPoolExecutor.getThreadPoolId();
+ if (log.isInfoEnabled()) {
+ log.info("Before shutting down ExecutorService" + (threadPoolId != null ? " '" + threadPoolId + "'" : ""));
+ }
+ }
+ }
+
+ /**
+ * Callback after pool shutdown.
+ * if {@link #waitForTasksToCompleteOnShutdown} return {@code true},
+ * cancel the remaining tasks,
+ * then wait for pool to terminate according {@link #awaitTerminationMillis} if necessary.
+ *
+ * @param executor executor
+ * @param remainingTasks remainingTasks
+ */
+ @Override
+ public void afterShutdown(ThreadPoolExecutor executor, List remainingTasks) {
+ if (executor instanceof ExtensibleThreadPoolExecutor && CollectionUtil.isNotEmpty(remainingTasks)) {
+ ExtensibleThreadPoolExecutor pool = (ExtensibleThreadPoolExecutor) executor;
+ if (!waitForTasksToCompleteOnShutdown && CollectionUtil.isNotEmpty(remainingTasks)) {
+ remainingTasks.forEach(this::cancelRemainingTask);
+ }
+ awaitTerminationIfNecessary(pool);
+ }
+ }
+
+ /**
+ * Cancel the given remaining task which never commended execution,
+ * as returned from {@link ExecutorService#shutdownNow()}.
+ *
+ * @param task the task to cancel (typically a {@link RunnableFuture})
+ * @see RunnableFuture#cancel(boolean)
+ * @since 5.0.5
+ */
+ protected void cancelRemainingTask(Runnable task) {
+ if (task instanceof Future) {
+ ((Future>) task).cancel(true);
+ }
+ }
+
+ /**
+ * Wait for the executor to terminate, according to the value of {@link #awaitTerminationMillis}.
+ */
+ private void awaitTerminationIfNecessary(ExtensibleThreadPoolExecutor executor) {
+ String threadPoolId = executor.getThreadPoolId();
+ if (this.awaitTerminationMillis <= 0) {
+ return;
+ }
+ try {
+ boolean isTerminated = executor.awaitTermination(this.awaitTerminationMillis, TimeUnit.MILLISECONDS);
+ if (!isTerminated && log.isWarnEnabled()) {
+ log.warn("Timed out while waiting for executor" + " '" + threadPoolId + "'" + " to terminate.");
+ }
+ } catch (InterruptedException ex) {
+ if (log.isWarnEnabled()) {
+ log.warn("Interrupted while waiting for executor" + " '" + threadPoolId + "'" + " to terminate.");
+ }
+ Thread.currentThread().interrupt();
+ }
+ }
+
+}
diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/executor/ActionAwareThreadPoolExecutorTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/ActionAwareThreadPoolExecutorTest.java
new file mode 100644
index 00000000..b12e8922
--- /dev/null
+++ b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/ActionAwareThreadPoolExecutorTest.java
@@ -0,0 +1,9 @@
+package cn.hippo4j.core.executor;
+
+/**
+ * test for {@link ExtensibleThreadPoolExecutor}
+ *
+ * @author huangchengxing
+ */
+public class ActionAwareThreadPoolExecutorTest {
+}
diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java
new file mode 100644
index 00000000..fde32550
--- /dev/null
+++ b/hippo4j-core/src/test/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutorTest.java
@@ -0,0 +1,9 @@
+package cn.hippo4j.core.executor;
+
+/**
+ * test for {@link DynamicThreadPoolExecutor}
+ *
+ * @author huangchengxing
+ */
+public class DynamicThreadPoolExecutorTest {
+}
diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginRegistryTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginRegistryTest.java
new file mode 100644
index 00000000..36439215
--- /dev/null
+++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/DefaultThreadPoolPluginRegistryTest.java
@@ -0,0 +1,93 @@
+package cn.hippo4j.core.plugin;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * test for {@link DefaultThreadPoolPluginRegistry}
+ *
+ * @author huangchengxing
+ */
+public class DefaultThreadPoolPluginRegistryTest {
+
+ private DefaultThreadPoolPluginRegistry registry;
+
+ @Before
+ public void initRegistry() {
+ registry = new DefaultThreadPoolPluginRegistry();
+ }
+
+ @Test
+ public void testRegister() {
+ TaskAwarePlugin taskAwarePlugin = new TestTaskAwarePlugin();
+ registry.register(taskAwarePlugin);
+ Assert.assertTrue(registry.isRegistered(taskAwarePlugin));
+ Assert.assertEquals(1, registry.getTaskAwareList().size());
+
+ ExecuteAwarePlugin executeAwarePlugin = new TestExecuteAwarePlugin();
+ registry.register(executeAwarePlugin);
+ Assert.assertTrue(registry.isRegistered(executeAwarePlugin));
+ Assert.assertEquals(1, registry.getExecuteAwareList().size());
+
+ RejectedAwarePlugin rejectedAwarePlugin = new TestRejectedAwarePlugin();
+ registry.register(rejectedAwarePlugin);
+ Assert.assertTrue(registry.isRegistered(rejectedAwarePlugin));
+ Assert.assertEquals(1, registry.getRejectedAwareList().size());
+
+ ShutdownAwarePlugin shutdownAwarePlugin = new TestShutdownAwarePlugin();
+ registry.register(shutdownAwarePlugin);
+ Assert.assertTrue(registry.isRegistered(shutdownAwarePlugin));
+ Assert.assertEquals(1, registry.getShutdownAwareList().size());
+ }
+
+ private final static class TestTaskAwarePlugin implements TaskAwarePlugin {
+ /**
+ * Get id.
+ *
+ * @return id
+ */
+ @Override
+ public String getId() {
+ return "TestTaskAwarePlugin";
+ }
+ }
+
+ private final static class TestExecuteAwarePlugin implements ExecuteAwarePlugin {
+ /**
+ * Get id.
+ *
+ * @return id
+ */
+ @Override
+ public String getId() {
+ return "TestExecuteAwarePlugin";
+ }
+ }
+
+ private final static class TestRejectedAwarePlugin implements RejectedAwarePlugin {
+ /**
+ * Get id.
+ *
+ * @return id
+ */
+ @Override
+ public String getId() {
+ return "TestRejectedAwarePlugin";
+ }
+ }
+
+ private final static class TestShutdownAwarePlugin implements ShutdownAwarePlugin {
+ /**
+ * Get id.
+ *
+ * @return id
+ */
+ @Override
+ public String getId() {
+ return "TestShutdownAwarePlugin";
+ }
+ }
+
+
+}
diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskDecoratorPluginTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskDecoratorPluginTest.java
new file mode 100644
index 00000000..6ab7abd7
--- /dev/null
+++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskDecoratorPluginTest.java
@@ -0,0 +1,9 @@
+package cn.hippo4j.core.plugin.impl;
+
+/**
+ * test for {@link TaskDecoratorPlugin}
+ *
+ * @author huangchengxing
+ */
+public class TaskDecoratorPluginTest {
+}
diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskRejectCountRecordPluginTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskRejectCountRecordPluginTest.java
new file mode 100644
index 00000000..05b18e82
--- /dev/null
+++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskRejectCountRecordPluginTest.java
@@ -0,0 +1,9 @@
+package cn.hippo4j.core.plugin.impl;
+
+/**
+ * test for {@link TaskRejectCountRecordPlugin}
+ *
+ * @author huangchengxing
+ */
+public class TaskRejectCountRecordPluginTest {
+}
diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPluginTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPluginTest.java
new file mode 100644
index 00000000..9f997cb4
--- /dev/null
+++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPluginTest.java
@@ -0,0 +1,9 @@
+package cn.hippo4j.core.plugin.impl;
+
+/**
+ * test for {@link TaskRejectNotifyAlarmPlugin}
+ *
+ * @author huangchengxing
+ */
+public class TaskRejectNotifyAlarmPluginTest {
+}
diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPluginTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPluginTest.java
new file mode 100644
index 00000000..2cca2b43
--- /dev/null
+++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeRecordPluginTest.java
@@ -0,0 +1,9 @@
+package cn.hippo4j.core.plugin.impl;
+
+/**
+ * test for {@link TaskTimeRecordAwareProcessorPlugin}
+ *
+ * @author huangchengxing
+ */
+public class TaskTimeRecordPluginTest {
+}
diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/ThreadPoolExecutorShutdownPluginTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/ThreadPoolExecutorShutdownPluginTest.java
new file mode 100644
index 00000000..7a240f8b
--- /dev/null
+++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/ThreadPoolExecutorShutdownPluginTest.java
@@ -0,0 +1,9 @@
+package cn.hippo4j.core.plugin.impl;
+
+/**
+ * test for {@link ThreadPoolExecutorShutdownPlugin}
+ *
+ * @author huangchengxing
+ */
+public class ThreadPoolExecutorShutdownPluginTest {
+}
diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TimeoutNotifyAlarmTaskTimeRecordPluginTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TimeoutNotifyAlarmTaskTimeRecordPluginTest.java
new file mode 100644
index 00000000..e37c4680
--- /dev/null
+++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TimeoutNotifyAlarmTaskTimeRecordPluginTest.java
@@ -0,0 +1,9 @@
+package cn.hippo4j.core.plugin.impl;
+
+/**
+ * test for {@link TaskTimeoutNotifyAlarmPlugin}
+ *
+ * @author huangchengxing
+ */
+public class TimeoutNotifyAlarmTaskTimeRecordPluginTest {
+}
diff --git a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java
index e570caa4..f2569c7e 100644
--- a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java
+++ b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java
@@ -29,8 +29,6 @@ import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
-import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
-import cn.hippo4j.core.proxy.RejectedProxyUtil;
import cn.hippo4j.message.dto.NotifyConfigDTO;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.message.service.Hippo4jBaseSendMessageService;
@@ -43,7 +41,6 @@ import java.util.*;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT;
@@ -182,6 +179,7 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener