diff --git a/hippo4j-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/DynamicThreadPoolExecutor.java b/hippo4j-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/DynamicThreadPoolExecutor.java index 7a518f36..01efcedb 100644 --- a/hippo4j-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/DynamicThreadPoolExecutor.java +++ b/hippo4j-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/DynamicThreadPoolExecutor.java @@ -2,6 +2,7 @@ package com.github.dynamic.threadpool.starter.core; import com.github.dynamic.threadpool.starter.alarm.ThreadPoolAlarm; import com.github.dynamic.threadpool.starter.alarm.ThreadPoolAlarmManage; +import com.github.dynamic.threadpool.starter.event.EventExecutor; import lombok.NoArgsConstructor; import lombok.NonNull; @@ -23,7 +24,7 @@ import static com.github.dynamic.threadpool.common.constant.Constants.MAP_INITIA * @author chen.ma * @date 2021/7/8 21:47 */ -public final class DynamicThreadPoolExecutor extends ThreadPoolExecutor { +public class DynamicThreadPoolExecutor extends ThreadPoolExecutor { private final AtomicInteger rejectCount = new AtomicInteger(); private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); @@ -140,9 +141,7 @@ public final class DynamicThreadPoolExecutor extends ThreadPoolExecutor { return this.threadPoolId; } - private final class Worker - extends AbstractQueuedSynchronizer - implements Runnable { + private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; @@ -309,7 +308,9 @@ public final class DynamicThreadPoolExecutor extends ThreadPoolExecutor { final void reject(Runnable command) { rejectCount.incrementAndGet(); - ThreadPoolAlarmManage.checkPoolRejectAlarm(this); + EventExecutor.publishEvent( + () -> ThreadPoolAlarmManage.checkPoolRejectAlarm(this) + ); handler.rejectedExecution(command, this); } @@ -362,7 +363,9 @@ public final class DynamicThreadPoolExecutor extends ThreadPoolExecutor { } } - ThreadPoolAlarmManage.checkPoolLivenessAlarm(core, this); + EventExecutor.publishEvent( + () -> ThreadPoolAlarmManage.checkPoolLivenessAlarm(core, this) + ); boolean workerStarted = false; boolean workerAdded = false; @@ -538,7 +541,9 @@ public final class DynamicThreadPoolExecutor extends ThreadPoolExecutor { c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { - ThreadPoolAlarmManage.checkPoolCapacityAlarm(this); + EventExecutor.publishEvent( + () -> ThreadPoolAlarmManage.checkPoolCapacityAlarm(this) + ); int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) { reject(command); diff --git a/hippo4j-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/event/EventExecutor.java b/hippo4j-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/event/EventExecutor.java new file mode 100644 index 00000000..cba3d38e --- /dev/null +++ b/hippo4j-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/event/EventExecutor.java @@ -0,0 +1,35 @@ +package com.github.dynamic.threadpool.starter.event; + +import com.github.dynamic.threadpool.common.function.NoArgsConsumer; +import com.github.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum; +import com.github.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * 事件执行器. + * + * @author chen.ma + * @date 2021/11/8 23:44 + */ +public class EventExecutor { + + private static final ExecutorService EVENT_EXECUTOR = ThreadPoolBuilder.builder() + .threadFactory("event-executor") + .corePoolSize(Runtime.getRuntime().availableProcessors()) + .maxPoolNum(Runtime.getRuntime().availableProcessors()) + .workQueue(QueueTypeEnum.ARRAY_BLOCKING_QUEUE, 2048) + .rejected(new ThreadPoolExecutor.DiscardPolicy()) + .build(); + + /** + * 发布事件. + * + * @param consumer + */ + public static void publishEvent(NoArgsConsumer consumer) { + EVENT_EXECUTOR.execute(consumer::accept); + } + +}