From 5f75eb5f3cbe79b6c475061b26a17ec20ef83dc4 Mon Sep 17 00:00:00 2001 From: "chen.ma" Date: Mon, 6 Dec 2021 20:29:00 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=8A=A8=E6=80=81=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E6=B1=A0=E5=88=9B=E5=BB=BA=E4=BB=A5=E5=8F=8A=E7=9B=91?= =?UTF-8?q?=E6=8E=A7=E4=BB=A3=E7=A0=81.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/DynamicThreadPoolExecutor.java | 8 +- .../core/DynamicThreadPoolPostProcessor.java | 12 ++- .../hippo4j/starter/event/EventExecutor.java | 35 -------- .../starter/event/MonitorEventExecutor.java | 45 ++++++++++ .../AbstractBuildThreadPoolTemplate.java | 86 ++++++++++++------- .../toolkit/thread/ThreadPoolBuilder.java | 11 +++ 6 files changed, 123 insertions(+), 74 deletions(-) delete mode 100644 hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/event/EventExecutor.java create mode 100644 hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/event/MonitorEventExecutor.java diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DynamicThreadPoolExecutor.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DynamicThreadPoolExecutor.java index 3d5a33c7..616f5652 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DynamicThreadPoolExecutor.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DynamicThreadPoolExecutor.java @@ -2,7 +2,7 @@ package cn.hippo4j.starter.core; import cn.hippo4j.starter.alarm.ThreadPoolAlarm; import cn.hippo4j.starter.alarm.ThreadPoolAlarmManage; -import cn.hippo4j.starter.event.EventExecutor; +import cn.hippo4j.starter.event.MonitorEventExecutor; import lombok.NoArgsConstructor; import lombok.NonNull; import org.springframework.core.task.TaskDecorator; @@ -312,7 +312,7 @@ public class DynamicThreadPoolExecutor extends DynamicExecutorConfigurationSuppo final void reject(Runnable command) { rejectCount.incrementAndGet(); - EventExecutor.publishEvent( + MonitorEventExecutor.publishEvent( () -> ThreadPoolAlarmManage.checkPoolRejectAlarm(this) ); handler.rejectedExecution(command, this); @@ -367,7 +367,7 @@ public class DynamicThreadPoolExecutor extends DynamicExecutorConfigurationSuppo } } - EventExecutor.publishEvent( + MonitorEventExecutor.publishEvent( () -> ThreadPoolAlarmManage.checkPoolLivenessAlarm(core, this) ); @@ -548,7 +548,7 @@ public class DynamicThreadPoolExecutor extends DynamicExecutorConfigurationSuppo c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { - EventExecutor.publishEvent( + MonitorEventExecutor.publishEvent( () -> ThreadPoolAlarmManage.checkPoolCapacityAlarm(this) ); int recheck = ctl.get(); diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DynamicThreadPoolPostProcessor.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DynamicThreadPoolPostProcessor.java index 9e686f44..095130f3 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DynamicThreadPoolPostProcessor.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DynamicThreadPoolPostProcessor.java @@ -46,10 +46,14 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { private final ThreadPoolOperation threadPoolOperation; private final ExecutorService executorService = ThreadPoolBuilder.builder() - .poolThreadSize(2, 4) - .keepAliveTime(0L, TimeUnit.MILLISECONDS) - .workQueue(QueueTypeEnum.ARRAY_BLOCKING_QUEUE, 1) - .threadFactory("dynamic-threadPool-config") + .corePoolSize(2) + .maxPoolNum(4) + .keepAliveTime(2000) + .timeUnit(TimeUnit.MILLISECONDS) + .workQueue(QueueTypeEnum.ARRAY_BLOCKING_QUEUE) + .capacity(1) + .allowCoreThreadTimeOut(true) + .threadFactory("dynamic-threadPool-init-config") .rejected(new ThreadPoolExecutor.DiscardOldestPolicy()) .build(); diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/event/EventExecutor.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/event/EventExecutor.java deleted file mode 100644 index 8c71bf42..00000000 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/event/EventExecutor.java +++ /dev/null @@ -1,35 +0,0 @@ -package cn.hippo4j.starter.event; - -import cn.hippo4j.common.function.NoArgsConsumer; -import cn.hippo4j.starter.toolkit.thread.QueueTypeEnum; -import cn.hippo4j.starter.toolkit.thread.ThreadPoolBuilder; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; - -/** - * 事件执行器. - * - * @author chen.ma - * @date 2021/11/8 23:44 - */ -public class EventExecutor { - - private static final ExecutorService EVENT_EXECUTOR = ThreadPoolBuilder.builder() - .threadFactory("event-executor") - .corePoolSize(Runtime.getRuntime().availableProcessors()) - .maxPoolNum(Runtime.getRuntime().availableProcessors()) - .workQueue(QueueTypeEnum.ARRAY_BLOCKING_QUEUE, 2048) - .rejected(new ThreadPoolExecutor.DiscardPolicy()) - .build(); - - /** - * 发布事件. - * - * @param consumer - */ - public static void publishEvent(NoArgsConsumer consumer) { - EVENT_EXECUTOR.execute(consumer::accept); - } - -} diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/event/MonitorEventExecutor.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/event/MonitorEventExecutor.java new file mode 100644 index 00000000..b29274b2 --- /dev/null +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/event/MonitorEventExecutor.java @@ -0,0 +1,45 @@ +package cn.hippo4j.starter.event; + +import cn.hippo4j.common.function.NoArgsConsumer; +import cn.hippo4j.starter.toolkit.thread.QueueTypeEnum; +import cn.hippo4j.starter.toolkit.thread.ThreadPoolBuilder; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; + +import static cn.hippo4j.common.constant.Constants.AVAILABLE_PROCESSORS; + +/** + * 动态线程池监控事件执行器. + * + * @author chen.ma + * @date 2021/11/8 23:44 + */ +@Slf4j +public class MonitorEventExecutor { + + private static final ExecutorService EVENT_EXECUTOR = ThreadPoolBuilder.builder() + .threadFactory("monitor-event-executor") + .corePoolSize(AVAILABLE_PROCESSORS) + .maxPoolNum(AVAILABLE_PROCESSORS) + .workQueue(QueueTypeEnum.LINKED_BLOCKING_QUEUE) + .capacity(4096) + .rejected(new ThreadPoolExecutor.AbortPolicy()) + .build(); + + /** + * 发布事件. + * + * @param consumer + */ + public static void publishEvent(NoArgsConsumer consumer) { + try { + EVENT_EXECUTOR.execute(consumer::accept); + } catch (RejectedExecutionException ex) { + log.error("Monitoring thread pool run events exceeded load."); + } + } + +} diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/toolkit/thread/AbstractBuildThreadPoolTemplate.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/toolkit/thread/AbstractBuildThreadPoolTemplate.java index 7260baf2..b1d05a46 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/toolkit/thread/AbstractBuildThreadPoolTemplate.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/toolkit/thread/AbstractBuildThreadPoolTemplate.java @@ -49,14 +49,20 @@ public class AbstractBuildThreadPoolTemplate { */ public static ThreadPoolExecutor buildPool(ThreadPoolInitParam initParam) { Assert.notNull(initParam); - ThreadPoolExecutor executorService = - new ThreadPoolExecutorTemplate(initParam.getCorePoolNum(), - initParam.getMaxPoolNum(), - initParam.getKeepAliveTime(), - initParam.getTimeUnit(), - initParam.getWorkQueue(), - initParam.getThreadFactory(), - initParam.rejectedExecutionHandler); + ThreadPoolExecutor executorService; + try { + executorService = new ThreadPoolExecutorTemplate(initParam.getCorePoolNum(), + initParam.getMaxPoolNum(), + initParam.getKeepAliveTime(), + initParam.getTimeUnit(), + initParam.getWorkQueue(), + initParam.getThreadFactory(), + initParam.rejectedExecutionHandler); + } catch (IllegalArgumentException ex) { + throw new IllegalArgumentException("Error creating thread pool parameter.", ex); + } + + executorService.allowCoreThreadTimeOut(initParam.allowCoreThreadTimeOut); return executorService; } @@ -77,15 +83,21 @@ public class AbstractBuildThreadPoolTemplate { */ public static ThreadPoolExecutor buildFastPool(ThreadPoolInitParam initParam) { TaskQueue taskQueue = new TaskQueue(initParam.getCapacity()); - FastThreadPoolExecutor fastThreadPoolExecutor = - new FastThreadPoolExecutor(initParam.getCorePoolNum(), - initParam.getMaxPoolNum(), - initParam.getKeepAliveTime(), - initParam.getTimeUnit(), - taskQueue, - initParam.getThreadFactory(), - initParam.rejectedExecutionHandler); + FastThreadPoolExecutor fastThreadPoolExecutor; + try { + fastThreadPoolExecutor = new FastThreadPoolExecutor(initParam.getCorePoolNum(), + initParam.getMaxPoolNum(), + initParam.getKeepAliveTime(), + initParam.getTimeUnit(), + taskQueue, + initParam.getThreadFactory(), + initParam.rejectedExecutionHandler); + } catch (IllegalArgumentException ex) { + throw new IllegalArgumentException("Error creating thread pool parameter.", ex); + } + taskQueue.setExecutor(fastThreadPoolExecutor); + fastThreadPoolExecutor.allowCoreThreadTimeOut(initParam.allowCoreThreadTimeOut); return fastThreadPoolExecutor; } @@ -97,21 +109,28 @@ public class AbstractBuildThreadPoolTemplate { */ public static DynamicThreadPoolExecutor buildDynamicPool(ThreadPoolInitParam initParam) { Assert.notNull(initParam); - DynamicThreadPoolExecutor executorService = - new DynamicThreadPoolExecutor(initParam.getCorePoolNum(), - initParam.getMaxPoolNum(), - initParam.getKeepAliveTime(), - initParam.getTimeUnit(), - initParam.getWaitForTasksToCompleteOnShutdown(), - initParam.getAwaitTerminationMillis(), - initParam.getWorkQueue(), - initParam.getThreadPoolId(), - initParam.getThreadFactory(), - initParam.getThreadPoolAlarm(), - initParam.getRejectedExecutionHandler()); - - executorService.setTaskDecorator(initParam.getTaskDecorator()); - return executorService; + DynamicThreadPoolExecutor dynamicThreadPoolExecutor; + try { + dynamicThreadPoolExecutor = new DynamicThreadPoolExecutor( + initParam.getCorePoolNum(), + initParam.getMaxPoolNum(), + initParam.getKeepAliveTime(), + initParam.getTimeUnit(), + initParam.getWaitForTasksToCompleteOnShutdown(), + initParam.getAwaitTerminationMillis(), + initParam.getWorkQueue(), + initParam.getThreadPoolId(), + initParam.getThreadFactory(), + initParam.getThreadPoolAlarm(), + initParam.getRejectedExecutionHandler() + ); + } catch (IllegalArgumentException ex) { + throw new IllegalArgumentException(String.format("Error creating thread pool parameter. threadPool id :: %s", initParam.getThreadPoolId()), ex); + } + + dynamicThreadPoolExecutor.setTaskDecorator(initParam.getTaskDecorator()); + dynamicThreadPoolExecutor.allowCoreThreadTimeOut(initParam.allowCoreThreadTimeOut); + return dynamicThreadPoolExecutor; } @Data @@ -183,6 +202,11 @@ public class AbstractBuildThreadPoolTemplate { */ private Boolean waitForTasksToCompleteOnShutdown; + /** + * 允许核心线程超时 + */ + private Boolean allowCoreThreadTimeOut = false; + public ThreadPoolInitParam(String threadNamePrefix, boolean isDaemon) { this.threadPoolId = threadNamePrefix; this.threadFactory = ThreadFactoryBuilder.builder() diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/toolkit/thread/ThreadPoolBuilder.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/toolkit/thread/ThreadPoolBuilder.java index a711af66..91943e28 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/toolkit/thread/ThreadPoolBuilder.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/toolkit/thread/ThreadPoolBuilder.java @@ -112,6 +112,11 @@ public class ThreadPoolBuilder implements Builder { */ private Boolean waitForTasksToCompleteOnShutdown = true; + /** + * 允许核心线程超时 + */ + private Boolean allowCoreThreadTimeOut = false; + /** * 计算公式:CPU 核数 / (1 - 阻塞系数 0.8) * @@ -234,6 +239,11 @@ public class ThreadPoolBuilder implements Builder { return this; } + public ThreadPoolBuilder allowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) { + this.allowCoreThreadTimeOut = allowCoreThreadTimeOut; + return this; + } + /** * 构建 * @@ -303,6 +313,7 @@ public class ThreadPoolBuilder implements Builder { .setCapacity(builder.capacity) .setRejectedExecutionHandler(builder.rejectedExecutionHandler) .setTimeUnit(builder.timeUnit) + .setAllowCoreThreadTimeOut(builder.allowCoreThreadTimeOut) .setTaskDecorator(builder.taskDecorator); if (builder.isDynamicPool) {