From 7f4763ff512cb5adec534c6197a0a88f5463f0fa Mon Sep 17 00:00:00 2001 From: "chen.ma" Date: Sun, 28 Nov 2021 18:11:00 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=A8=E6=80=81=E7=BA=BF=E7=A8=8B=E6=B1=A0?= =?UTF-8?q?=E5=8A=9F=E8=83=BD=E5=A2=9E=E5=BC=BA.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../DynamicExecutorConfigurationSupport.java | 147 ++++++++++++++++++ .../core/DynamicThreadPoolExecutor.java | 24 ++- .../core/DynamicThreadPoolPostProcessor.java | 16 +- .../AbstractBuildThreadPoolTemplate.java | 20 +++ .../toolkit/thread/ThreadPoolBuilder.java | 42 ++++- .../wrapper/DynamicThreadPoolWrapper.java | 11 +- 6 files changed, 251 insertions(+), 9 deletions(-) create mode 100644 hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DynamicExecutorConfigurationSupport.java diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DynamicExecutorConfigurationSupport.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DynamicExecutorConfigurationSupport.java new file mode 100644 index 00000000..8e24f74e --- /dev/null +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DynamicExecutorConfigurationSupport.java @@ -0,0 +1,147 @@ +package cn.hippo4j.starter.core; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.BeanNameAware; +import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; + +import java.util.concurrent.*; + +/** + * Dynamic executor configuration support. + * + * @author chen.ma + * @date 2021/11/28 12:17 + */ +@Slf4j +public abstract class DynamicExecutorConfigurationSupport extends ThreadPoolExecutor + implements BeanNameAware, InitializingBean, DisposableBean { + + private String beanName; + + private ExecutorService executor; + + private long awaitTerminationMillis = 0; + + private boolean waitForTasksToCompleteOnShutdown = false; + + public DynamicExecutorConfigurationSupport(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + boolean waitForTasksToCompleteOnShutdown, + long awaitTerminationMillis, + BlockingQueue workQueue, + ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + this.waitForTasksToCompleteOnShutdown = waitForTasksToCompleteOnShutdown; + this.awaitTerminationMillis = awaitTerminationMillis; + } + + /** + * Create the target {@link java.util.concurrent.ExecutorService} instance. + * Called by {@code afterPropertiesSet}. + * + * @return a new ExecutorService instance + * @see #afterPropertiesSet() + */ + protected abstract ExecutorService initializeExecutor(); + + @Override + public void setBeanName(String name) { + this.beanName = name; + } + + /** + * Calls {@code initialize()} after the container applied all property values. + * + * @see #initialize() + */ + @Override + public void afterPropertiesSet() { + initialize(); + } + + /** + * Calls {@code shutdown} when the BeanFactory destroys. + * the task executor instance. + * + * @see #shutdown() + */ + @Override + public void destroy() { + shutdownSupport(); + } + + /** + * Set up the ExecutorService. + */ + public void initialize() { + if (log.isInfoEnabled()) { + log.info("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : "")); + } + + this.executor = initializeExecutor(); + } + + /** + * Perform a shutdown on the underlying ExecutorService. + * + * @see java.util.concurrent.ExecutorService#shutdown() + * @see java.util.concurrent.ExecutorService#shutdownNow() + */ + public void shutdownSupport() { + if (log.isInfoEnabled()) { + log.info("Shutting down ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : "")); + } + if (this.executor != null) { + if (this.waitForTasksToCompleteOnShutdown) { + this.executor.shutdown(); + } else { + for (Runnable remainingTask : this.executor.shutdownNow()) { + cancelRemainingTask(remainingTask); + } + } + awaitTerminationIfNecessary(this.executor); + } + } + + /** + * Cancel the given remaining task which never commended execution, + * as returned from {@link ExecutorService#shutdownNow()}. + * + * @param task the task to cancel (typically a {@link RunnableFuture}) + * @see #shutdown() + * @see RunnableFuture#cancel(boolean) + * @since 5.0.5 + */ + protected void cancelRemainingTask(Runnable task) { + if (task instanceof Future) { + ((Future) task).cancel(true); + } + } + + /** + * Wait for the executor to terminate, according to the value of the. + */ + private void awaitTerminationIfNecessary(ExecutorService executor) { + if (this.awaitTerminationMillis > 0) { + try { + if (!executor.awaitTermination(this.awaitTerminationMillis, TimeUnit.MILLISECONDS)) { + if (log.isWarnEnabled()) { + log.warn("Timed out while waiting for executor" + + (this.beanName != null ? " '" + this.beanName + "'" : "") + " to terminate"); + } + } + } catch (InterruptedException ex) { + if (log.isWarnEnabled()) { + log.warn("Interrupted while waiting for executor" + + (this.beanName != null ? " '" + this.beanName + "'" : "") + " to terminate"); + } + Thread.currentThread().interrupt(); + } + } + } + +} diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DynamicThreadPoolExecutor.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DynamicThreadPoolExecutor.java index 41fd8d46..75bd1064 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DynamicThreadPoolExecutor.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DynamicThreadPoolExecutor.java @@ -5,6 +5,7 @@ import cn.hippo4j.starter.alarm.ThreadPoolAlarmManage; import cn.hippo4j.starter.event.EventExecutor; import lombok.NoArgsConstructor; import lombok.NonNull; +import org.springframework.core.task.TaskDecorator; import java.security.AccessControlContext; import java.security.AccessController; @@ -24,7 +25,7 @@ import static cn.hippo4j.common.constant.Constants.MAP_INITIAL_CAPACITY; * @author chen.ma * @date 2021/7/8 21:47 */ -public class DynamicThreadPoolExecutor extends ThreadPoolExecutor { +public class DynamicThreadPoolExecutor extends DynamicExecutorConfigurationSupport { private final AtomicInteger rejectCount = new AtomicInteger(); private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); @@ -38,6 +39,7 @@ public class DynamicThreadPoolExecutor extends ThreadPoolExecutor { private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; + private TaskDecorator taskDecorator; private final BlockingQueue workQueue; private final ReentrantLock mainLock = new ReentrantLock(); private final HashSet workers = new HashSet(); @@ -63,12 +65,14 @@ public class DynamicThreadPoolExecutor extends ThreadPoolExecutor { int maximumPoolSize, long keepAliveTime, TimeUnit unit, + boolean waitForTasksToCompleteOnShutdown, + long awaitTerminationMillis, @NonNull BlockingQueue workQueue, @NonNull String threadPoolId, @NonNull ThreadFactory threadFactory, @NonNull ThreadPoolAlarm threadPoolAlarm, @NonNull RejectedExecutionHandler handler) { - super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, waitForTasksToCompleteOnShutdown, awaitTerminationMillis, workQueue, threadFactory, handler); if (corePoolSize < 0 || maximumPoolSize <= 0 || @@ -533,6 +537,9 @@ public class DynamicThreadPoolExecutor extends ThreadPoolExecutor { @Override public void execute(@NonNull Runnable command) { + if (taskDecorator != null) { + command = taskDecorator.decorate(command); + } int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) { @@ -555,6 +562,11 @@ public class DynamicThreadPoolExecutor extends ThreadPoolExecutor { } } + @Override + protected ExecutorService initializeExecutor() { + return this; + } + @Override public void shutdown() { final ReentrantLock mainLock = this.mainLock; @@ -688,6 +700,14 @@ public class DynamicThreadPoolExecutor extends ThreadPoolExecutor { addWorker(null, true); } + public TaskDecorator getTaskDecorator() { + return taskDecorator; + } + + public void setTaskDecorator(TaskDecorator taskDecorator) { + this.taskDecorator = taskDecorator; + } + void ensurePrestart() { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) { diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DynamicThreadPoolPostProcessor.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DynamicThreadPoolPostProcessor.java index b4605888..9e686f44 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DynamicThreadPoolPostProcessor.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DynamicThreadPoolPostProcessor.java @@ -1,5 +1,9 @@ package cn.hippo4j.starter.core; +import cn.hippo4j.common.config.ApplicationContextHolder; +import cn.hippo4j.common.constant.Constants; +import cn.hippo4j.common.model.PoolParameterInfo; +import cn.hippo4j.common.web.base.Result; import cn.hippo4j.starter.common.CommonDynamicThreadPool; import cn.hippo4j.starter.config.BootstrapProperties; import cn.hippo4j.starter.remote.HttpAgent; @@ -8,15 +12,12 @@ import cn.hippo4j.starter.toolkit.thread.RejectedTypeEnum; import cn.hippo4j.starter.toolkit.thread.ThreadPoolBuilder; import cn.hippo4j.starter.wrapper.DynamicThreadPoolWrapper; import com.alibaba.fastjson.JSON; -import cn.hippo4j.common.config.ApplicationContextHolder; -import cn.hippo4j.common.constant.Constants; -import cn.hippo4j.common.model.PoolParameterInfo; -import cn.hippo4j.common.web.base.Result; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import lombok.var; import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.core.task.TaskDecorator; import java.util.HashMap; import java.util.Map; @@ -106,7 +107,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { if (result.isSuccess() && result.getData() != null && (ppi = JSON.toJavaObject((JSON) result.getData(), PoolParameterInfo.class)) != null) { // 使用相关参数创建线程池 BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(ppi.getQueueType(), ppi.getCapacity()); - poolExecutor = (DynamicThreadPoolExecutor) ThreadPoolBuilder.builder() + poolExecutor = ThreadPoolBuilder.builder() .dynamicPool() .workQueue(workQueue) .threadFactory(tpId) @@ -116,6 +117,11 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { .alarmConfig(ppi.getIsAlarm(), ppi.getCapacityAlarm(), ppi.getLivenessAlarm()) .build(); + if (poolExecutor instanceof DynamicExecutorConfigurationSupport) { + TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).getTaskDecorator(); + ((DynamicThreadPoolExecutor) poolExecutor).setTaskDecorator(taskDecorator); + } + dynamicThreadPoolWrap.setExecutor(poolExecutor); isSubscribe = true; } diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/toolkit/thread/AbstractBuildThreadPoolTemplate.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/toolkit/thread/AbstractBuildThreadPoolTemplate.java index 991da953..7260baf2 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/toolkit/thread/AbstractBuildThreadPoolTemplate.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/toolkit/thread/AbstractBuildThreadPoolTemplate.java @@ -6,6 +6,7 @@ import cn.hippo4j.starter.core.DynamicThreadPoolExecutor; import lombok.Data; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; +import org.springframework.core.task.TaskDecorator; import java.util.concurrent.*; import java.util.concurrent.locks.AbstractQueuedSynchronizer; @@ -101,11 +102,15 @@ public class AbstractBuildThreadPoolTemplate { initParam.getMaxPoolNum(), initParam.getKeepAliveTime(), initParam.getTimeUnit(), + initParam.getWaitForTasksToCompleteOnShutdown(), + initParam.getAwaitTerminationMillis(), initParam.getWorkQueue(), initParam.getThreadPoolId(), initParam.getThreadFactory(), initParam.getThreadPoolAlarm(), initParam.getRejectedExecutionHandler()); + + executorService.setTaskDecorator(initParam.getTaskDecorator()); return executorService; } @@ -163,6 +168,21 @@ public class AbstractBuildThreadPoolTemplate { */ private ThreadPoolAlarm threadPoolAlarm; + /** + * 线程任务装饰器 + */ + private TaskDecorator taskDecorator; + + /** + * 等待终止毫秒 + */ + private Long awaitTerminationMillis; + + /** + * 等待任务在关机时完成 + */ + private Boolean waitForTasksToCompleteOnShutdown; + public ThreadPoolInitParam(String threadNamePrefix, boolean isDaemon) { this.threadPoolId = threadNamePrefix; this.threadFactory = ThreadFactoryBuilder.builder() diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/toolkit/thread/ThreadPoolBuilder.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/toolkit/thread/ThreadPoolBuilder.java index cecf5348..c4741ac4 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/toolkit/thread/ThreadPoolBuilder.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/toolkit/thread/ThreadPoolBuilder.java @@ -3,6 +3,7 @@ package cn.hippo4j.starter.toolkit.thread; import cn.hippo4j.common.toolkit.Assert; import cn.hippo4j.starter.alarm.ThreadPoolAlarm; +import org.springframework.core.task.TaskDecorator; import java.math.BigDecimal; import java.util.Optional; @@ -96,6 +97,21 @@ public class ThreadPoolBuilder implements Builder { */ private Integer livenessAlarm; + /** + * 线程任务装饰器 + */ + private TaskDecorator taskDecorator; + + /** + * 等待终止毫秒 + */ + private Long awaitTerminationMillis = 0L; + + /** + * 等待任务在关机时完成 + */ + private Boolean waitForTasksToCompleteOnShutdown = false; + /** * 计算公式:CPU 核数 / (1 - 阻塞系数 0.8) * @@ -197,6 +213,27 @@ public class ThreadPoolBuilder implements Builder { return this; } + public ThreadPoolBuilder taskDecorator(TaskDecorator taskDecorator) { + this.taskDecorator = taskDecorator; + return this; + } + + public ThreadPoolBuilder awaitTerminationMillis(long awaitTerminationMillis) { + this.awaitTerminationMillis = awaitTerminationMillis; + return this; + } + + public ThreadPoolBuilder waitForTasksToCompleteOnShutdown(boolean waitForTasksToCompleteOnShutdown) { + this.waitForTasksToCompleteOnShutdown = waitForTasksToCompleteOnShutdown; + return this; + } + + public ThreadPoolBuilder dynamicSupport(boolean waitForTasksToCompleteOnShutdown, long awaitTerminationMillis) { + this.awaitTerminationMillis = awaitTerminationMillis; + this.waitForTasksToCompleteOnShutdown = waitForTasksToCompleteOnShutdown; + return this; + } + /** * 构建 * @@ -265,13 +302,16 @@ public class ThreadPoolBuilder implements Builder { .setKeepAliveTime(builder.keepAliveTime) .setCapacity(builder.capacity) .setRejectedExecutionHandler(builder.rejectedExecutionHandler) - .setTimeUnit(builder.timeUnit); + .setTimeUnit(builder.timeUnit) + .setTaskDecorator(builder.taskDecorator); if (builder.isDynamicPool) { String threadPoolId = Optional.ofNullable(builder.threadPoolId).orElse(builder.threadNamePrefix); initParam.setThreadPoolId(threadPoolId); ThreadPoolAlarm threadPoolAlarm = new ThreadPoolAlarm(builder.isAlarm, builder.capacityAlarm, builder.livenessAlarm); initParam.setThreadPoolAlarm(threadPoolAlarm); + initParam.setWaitForTasksToCompleteOnShutdown(builder.waitForTasksToCompleteOnShutdown); + initParam.setAwaitTerminationMillis(builder.awaitTerminationMillis); } if (!builder.isFastPool) { diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/wrapper/DynamicThreadPoolWrapper.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/wrapper/DynamicThreadPoolWrapper.java index 3c8f37a3..c9152684 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/wrapper/DynamicThreadPoolWrapper.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/wrapper/DynamicThreadPoolWrapper.java @@ -1,7 +1,9 @@ package cn.hippo4j.starter.wrapper; import cn.hippo4j.starter.common.CommonDynamicThreadPool; +import cn.hippo4j.starter.core.DynamicExecutorConfigurationSupport; import lombok.Data; +import org.springframework.beans.factory.DisposableBean; import java.util.concurrent.Callable; import java.util.concurrent.Future; @@ -14,7 +16,7 @@ import java.util.concurrent.ThreadPoolExecutor; * @date 2021/6/20 16:55 */ @Data -public class DynamicThreadPoolWrapper { +public class DynamicThreadPoolWrapper implements DisposableBean { private String tenantId; @@ -76,4 +78,11 @@ public class DynamicThreadPoolWrapper { return executor.submit(task); } + @Override + public void destroy() throws Exception { + if (executor != null && executor instanceof DynamicExecutorConfigurationSupport) { + ((DynamicExecutorConfigurationSupport) executor).destroy(); + } + } + }