diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java index 2249fabc..fe10a09b 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolExecutor.java @@ -17,107 +17,231 @@ package cn.hippo4j.core.executor; -import cn.hippo4j.common.config.ApplicationContextHolder; -import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport; -import cn.hippo4j.core.proxy.RejectedProxyUtil; -import cn.hippo4j.core.toolkit.SystemClock; -import lombok.Getter; +import cn.hippo4j.common.toolkit.CollectionUtil; +import cn.hippo4j.core.plugin.impl.TaskDecoratorPlugin; +import cn.hippo4j.core.plugin.impl.TaskRejectCountRecordPlugin; +import cn.hippo4j.core.plugin.impl.TaskTimeoutNotifyAlarmPlugin; +import cn.hippo4j.core.plugin.impl.ThreadPoolExecutorShutdownPlugin; +import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager; +import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginRegistrar; import lombok.NonNull; -import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.DisposableBean; import org.springframework.core.task.TaskDecorator; -import java.util.concurrent.*; +import java.util.Objects; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** * Enhanced dynamic and monitored thread pool. */ -public class DynamicThreadPoolExecutor extends AbstractDynamicExecutorSupport { - - @Getter - @Setter - private Long executeTimeOut; - - @Getter - @Setter - private TaskDecorator taskDecorator; - - @Getter - @Setter - private RejectedExecutionHandler redundancyHandler; - - @Getter - private final String threadPoolId; - - @Getter - private final AtomicLong rejectCount = new AtomicLong(); - - private final ThreadLocal startTimeThreadLocal = new ThreadLocal<>(); - - public DynamicThreadPoolExecutor(int corePoolSize, - int maximumPoolSize, - long keepAliveTime, - TimeUnit unit, - long executeTimeOut, - boolean waitForTasksToCompleteOnShutdown, - long awaitTerminationMillis, +@Slf4j +public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor implements DisposableBean { + + /** + * Creates a new {@code DynamicThreadPoolExecutor} with the given initial parameters. + * + * @param threadPoolId thread-pool id + * @param executeTimeOut execute time out + * @param waitForTasksToCompleteOnShutdown wait for tasks to complete on shutdown + * @param awaitTerminationMillis await termination millis + * @param corePoolSize the number of threads to keep in the pool, even + * if they are idle, unless {@code allowCoreThreadTimeOut} is set + * @param maximumPoolSize the maximum number of threads to allow in the + * pool + * @param keepAliveTime when the number of threads is greater than + * the core, this is the maximum time that excess idle threads + * will wait for new tasks before terminating. + * @param unit the time unit for the {@code keepAliveTime} argument + * @param blockingQueue the queue to use for holding tasks before they are + * executed. This queue will hold only the {@code Runnable} + * tasks submitted by the {@code execute} method. + * @param threadFactory the factory to use when the executor creates a new thread + * @param rejectedExecutionHandler the handler to use when execution is blocked because the thread bounds and queue capacities are reached + * @throws IllegalArgumentException if one of the following holds:
+ * {@code corePoolSize < 0}
+ * {@code keepAliveTime < 0}
+ * {@code maximumPoolSize <= 0}
+ * {@code maximumPoolSize < corePoolSize} + * @throws NullPointerException if {@code workQueue} + * or {@code threadFactory} or {@code handler} is null + */ + public DynamicThreadPoolExecutor( + int corePoolSize, int maximumPoolSize, + long keepAliveTime, TimeUnit unit, + long executeTimeOut, boolean waitForTasksToCompleteOnShutdown, long awaitTerminationMillis, @NonNull BlockingQueue blockingQueue, @NonNull String threadPoolId, @NonNull ThreadFactory threadFactory, @NonNull RejectedExecutionHandler rejectedExecutionHandler) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, waitForTasksToCompleteOnShutdown, awaitTerminationMillis, blockingQueue, threadPoolId, threadFactory, rejectedExecutionHandler); - this.threadPoolId = threadPoolId; - this.executeTimeOut = executeTimeOut; - // Number of dynamic proxy denial policies. - RejectedExecutionHandler rejectedProxy = RejectedProxyUtil.createProxy(rejectedExecutionHandler, threadPoolId, rejectCount); - setRejectedExecutionHandler(rejectedProxy); - // Redundant fields to avoid reflecting the acquired fields when sending change information. - redundancyHandler = rejectedExecutionHandler; + super( + threadPoolId, new DefaultThreadPoolPluginManager(), + corePoolSize, maximumPoolSize, keepAliveTime, unit, + blockingQueue, threadFactory, rejectedExecutionHandler); + log.info("Initializing ExecutorService" + threadPoolId); + + // init default plugins + new DefaultThreadPoolPluginRegistrar(executeTimeOut, awaitTerminationMillis, waitForTasksToCompleteOnShutdown) + .doRegister(this); } + /** + * Invoked by the containing {@code BeanFactory} on destruction of a bean. + * + */ @Override - public void execute(@NonNull Runnable command) { - if (taskDecorator != null) { - command = taskDecorator.decorate(command); + public void destroy() { + if (isWaitForTasksToCompleteOnShutdown()) { + super.shutdown(); + } else { + super.shutdownNow(); } - super.execute(command); + getThreadPoolPluginManager().clear(); } - @Override - protected void beforeExecute(Thread t, Runnable r) { - if (executeTimeOut == null || executeTimeOut <= 0) { - return; - } - startTimeThreadLocal.set(SystemClock.now()); + /** + * Get await termination millis. + * + * @return await termination millis. + * @deprecated use {@link ThreadPoolExecutorShutdownPlugin} + */ + @Deprecated + public long getAwaitTerminationMillis() { + return getPluginOfType(ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME, ThreadPoolExecutorShutdownPlugin.class) + .map(ThreadPoolExecutorShutdownPlugin::getAwaitTerminationMillis) + .orElse(-1L); } - @Override - protected void afterExecute(Runnable r, Throwable t) { - Long startTime; - if ((startTime = startTimeThreadLocal.get()) == null) { - return; - } - try { - long endTime = SystemClock.now(); - long executeTime; - boolean executeTimeAlarm = (executeTime = (endTime - startTime)) > executeTimeOut; - if (executeTimeAlarm && ApplicationContextHolder.getInstance() != null) { - ThreadPoolNotifyAlarmHandler notifyAlarmHandler = ApplicationContextHolder.getBean(ThreadPoolNotifyAlarmHandler.class); - if (notifyAlarmHandler != null) { - notifyAlarmHandler.asyncSendExecuteTimeOutAlarm(threadPoolId, executeTime, executeTimeOut, this); - } - } - } finally { - startTimeThreadLocal.remove(); - } + /** + * Is wait for tasks to complete on shutdown. + * + * @return true if instance wait for tasks to complete on shutdown, false other otherwise. + * @deprecated use {@link ThreadPoolExecutorShutdownPlugin} + */ + @Deprecated + public boolean isWaitForTasksToCompleteOnShutdown() { + return getPluginOfType(ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME, ThreadPoolExecutorShutdownPlugin.class) + .map(ThreadPoolExecutorShutdownPlugin::isWaitForTasksToCompleteOnShutdown) + .orElse(false); } - @Override - protected ExecutorService initializeExecutor() { - return this; + /** + * Set support param. + * + * @param awaitTerminationMillis await termination millis + * @param waitForTasksToCompleteOnShutdown wait for tasks to complete on shutdown + * @deprecated use {@link ThreadPoolExecutorShutdownPlugin} + */ + @Deprecated + public void setSupportParam(long awaitTerminationMillis, boolean waitForTasksToCompleteOnShutdown) { + getPluginOfType(ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME, ThreadPoolExecutorShutdownPlugin.class) + .ifPresent(processor -> processor + .setAwaitTerminationMillis(awaitTerminationMillis) + .setWaitForTasksToCompleteOnShutdown(waitForTasksToCompleteOnShutdown)); } + /** + * Get reject count num. + * + * @return reject count num + * @deprecated use {@link TaskRejectCountRecordPlugin} + */ + @Deprecated public Long getRejectCountNum() { - return rejectCount.get(); + return getPluginOfType(TaskRejectCountRecordPlugin.PLUGIN_NAME, TaskRejectCountRecordPlugin.class) + .map(TaskRejectCountRecordPlugin::getRejectCountNum) + .orElse(-1L); + } + + /** + * Get reject count. + * + * @return reject count num + * @deprecated use {@link TaskRejectCountRecordPlugin} + */ + @Deprecated + public AtomicLong getRejectCount() { + return getPluginOfType(TaskRejectCountRecordPlugin.PLUGIN_NAME, TaskRejectCountRecordPlugin.class) + .map(TaskRejectCountRecordPlugin::getRejectCount) + .orElse(new AtomicLong(0)); + } + + /** + * Get execute time out. + * + * @deprecated use {@link TaskTimeoutNotifyAlarmPlugin} + */ + @Deprecated + public Long getExecuteTimeOut() { + return getPluginOfType(TaskTimeoutNotifyAlarmPlugin.PLUGIN_NAME, TaskTimeoutNotifyAlarmPlugin.class) + .map(TaskTimeoutNotifyAlarmPlugin::getExecuteTimeOut) + .orElse(-1L); + } + + /** + * Set execute time out. + * + * @param executeTimeOut execute time out + * @deprecated use {@link TaskTimeoutNotifyAlarmPlugin} + */ + @Deprecated + public void setExecuteTimeOut(Long executeTimeOut) { + getPluginOfType(TaskTimeoutNotifyAlarmPlugin.PLUGIN_NAME, TaskTimeoutNotifyAlarmPlugin.class) + .ifPresent(processor -> processor.setExecuteTimeOut(executeTimeOut)); + } + + /** + * Get {@link TaskDecorator}. + * + * @deprecated use {@link TaskDecoratorPlugin} + */ + @Deprecated + public TaskDecorator getTaskDecorator() { + return getPluginOfType(TaskDecoratorPlugin.PLUGIN_NAME, TaskDecoratorPlugin.class) + .map(processor -> CollectionUtil.getFirst(processor.getDecorators())) + .orElse(null); } + + /** + * Set {@link TaskDecorator}. + * + * @param taskDecorator task decorator + * @deprecated use {@link TaskDecoratorPlugin} + */ + @Deprecated + public void setTaskDecorator(TaskDecorator taskDecorator) { + getPluginOfType(TaskDecoratorPlugin.PLUGIN_NAME, TaskDecoratorPlugin.class) + .ifPresent(processor -> { + if (Objects.nonNull(taskDecorator)) { + processor.clearDecorators(); + processor.addDecorator(taskDecorator); + } + }); + } + + /** + * Get rejected execution handler. + * + * @deprecated use {@link DynamicThreadPoolExecutor#getRejectedExecutionHandler} + */ + @Deprecated + public RejectedExecutionHandler getRedundancyHandler() { + return getRejectedExecutionHandler(); + } + + /** + * Set rejected execution handler. + * + * @param handler handler + * @deprecated use {@link DynamicThreadPoolExecutor#setRejectedExecutionHandler} + */ + @Deprecated + public void setRedundancyHandler(RejectedExecutionHandler handler) { + setRejectedExecutionHandler(handler); + } + } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java index a8827549..2e2abc4e 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/DynamicThreadPoolWrapper.java @@ -17,7 +17,6 @@ package cn.hippo4j.core.executor; -import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport; import cn.hippo4j.core.executor.support.CommonDynamicThreadPool; import lombok.AllArgsConstructor; import lombok.Builder; @@ -67,8 +66,8 @@ public class DynamicThreadPoolWrapper implements DisposableBean { @Override public void destroy() throws Exception { - if (executor != null && executor instanceof AbstractDynamicExecutorSupport) { - ((AbstractDynamicExecutorSupport) executor).destroy(); + if (executor != null && executor instanceof DynamicThreadPoolExecutor) { + ((DynamicThreadPoolExecutor) executor).destroy(); } } } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java index 77237dd4..7b832473 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/ThreadPoolNotifyAlarmHandler.java @@ -22,13 +22,13 @@ import cn.hippo4j.common.toolkit.StringUtil; import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.support.ThreadPoolBuilder; -import cn.hippo4j.core.toolkit.IdentifyUtil; import cn.hippo4j.core.toolkit.ExecutorTraceContextUtil; -import cn.hippo4j.message.service.Hippo4jSendMessageService; +import cn.hippo4j.core.toolkit.IdentifyUtil; import cn.hippo4j.message.enums.NotifyTypeEnum; -import cn.hippo4j.message.service.ThreadPoolNotifyAlarm; import cn.hippo4j.message.request.AlarmNotifyRequest; import cn.hippo4j.message.request.ChangeParameterNotifyRequest; +import cn.hippo4j.message.service.Hippo4jSendMessageService; +import cn.hippo4j.message.service.ThreadPoolNotifyAlarm; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -211,9 +211,7 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner */ public AlarmNotifyRequest buildAlarmNotifyRequest(ThreadPoolExecutor threadPoolExecutor) { BlockingQueue blockingQueue = threadPoolExecutor.getQueue(); - RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor instanceof DynamicThreadPoolExecutor - ? ((DynamicThreadPoolExecutor) threadPoolExecutor).getRedundancyHandler() - : threadPoolExecutor.getRejectedExecutionHandler(); + RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler(); long rejectCount = threadPoolExecutor instanceof DynamicThreadPoolExecutor ? ((DynamicThreadPoolExecutor) threadPoolExecutor).getRejectCountNum() : -1L; diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/ThreadPoolRunStateHandler.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/ThreadPoolRunStateHandler.java index ca9d7eaf..a0405a6e 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/ThreadPoolRunStateHandler.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/state/ThreadPoolRunStateHandler.java @@ -23,10 +23,8 @@ import cn.hippo4j.common.toolkit.BeanUtil; import cn.hippo4j.common.toolkit.ByteConvertUtil; import cn.hippo4j.common.toolkit.MemoryUtil; import cn.hippo4j.common.toolkit.StringUtil; -import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; -import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport; import cn.hippo4j.core.toolkit.inet.InetUtils; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -66,11 +64,7 @@ public class ThreadPoolRunStateHandler extends AbstractThreadPoolRuntime { DynamicThreadPoolWrapper executorService = GlobalThreadPoolManage.getExecutorService(threadPoolId); ThreadPoolExecutor pool = executorService.getExecutor(); String rejectedName; - if (pool instanceof AbstractDynamicExecutorSupport) { - rejectedName = ((DynamicThreadPoolExecutor) pool).getRedundancyHandler().getClass().getSimpleName(); - } else { - rejectedName = pool.getRejectedExecutionHandler().getClass().getSimpleName(); - } + rejectedName = pool.getRejectedExecutionHandler().getClass().getSimpleName(); poolRunStateInfo.setRejectedName(rejectedName); ManyThreadPoolRunStateInfo manyThreadPoolRunStateInfo = BeanUtil.convert(poolRunStateInfo, ManyThreadPoolRunStateInfo.class); manyThreadPoolRunStateInfo.setIdentify(CLIENT_IDENTIFICATION_VALUE); diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/AbstractDynamicExecutorSupport.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/AbstractDynamicExecutorSupport.java index cff8087c..8dcf65fa 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/AbstractDynamicExecutorSupport.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/AbstractDynamicExecutorSupport.java @@ -17,6 +17,7 @@ package cn.hippo4j.core.executor.support; +import cn.hippo4j.core.plugin.impl.ThreadPoolExecutorShutdownPlugin; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; @@ -25,7 +26,10 @@ import java.util.concurrent.*; /** * Dynamic executor configuration support. + * + * @deprecated use {@link ThreadPoolExecutorShutdownPlugin} to get thread-pool shutdown support */ +@Deprecated @Slf4j public abstract class AbstractDynamicExecutorSupport extends ThreadPoolExecutor implements InitializingBean, DisposableBean { diff --git a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java index e570caa4..f2569c7e 100644 --- a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java +++ b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/config/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java @@ -29,8 +29,6 @@ import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; -import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport; -import cn.hippo4j.core.proxy.RejectedProxyUtil; import cn.hippo4j.message.dto.NotifyConfigDTO; import cn.hippo4j.message.request.ChangeParameterNotifyRequest; import cn.hippo4j.message.service.Hippo4jBaseSendMessageService; @@ -43,7 +41,6 @@ import java.util.*; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER; import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT; @@ -182,6 +179,7 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener