From 052de51a8400ef1da2715a0b92da58c144957650 Mon Sep 17 00:00:00 2001 From: huangchengxing <841396397@qq.com> Date: Mon, 29 May 2023 00:23:17 +0800 Subject: [PATCH] feat(MaximumActiveThreadCountCheckerRegister): add a registrar for registering MaximumActiveThreadCountChecker and support regular check of the thread pool registered with the plugin (#1208) --- .../impl/MaximumActiveThreadCountChecker.java | 20 +++++ ...imumActiveThreadCountCheckerRegistrar.java | 89 +++++++++++++++++++ .../MaximumActiveThreadCountCheckerTest.java | 7 +- 3 files changed, 113 insertions(+), 3 deletions(-) create mode 100644 threadpool/core/src/main/java/cn/hippo4j/core/executor/plugin/manager/MaximumActiveThreadCountCheckerRegistrar.java diff --git a/threadpool/core/src/main/java/cn/hippo4j/core/executor/plugin/impl/MaximumActiveThreadCountChecker.java b/threadpool/core/src/main/java/cn/hippo4j/core/executor/plugin/impl/MaximumActiveThreadCountChecker.java index ede714b6..41898705 100644 --- a/threadpool/core/src/main/java/cn/hippo4j/core/executor/plugin/impl/MaximumActiveThreadCountChecker.java +++ b/threadpool/core/src/main/java/cn/hippo4j/core/executor/plugin/impl/MaximumActiveThreadCountChecker.java @@ -19,6 +19,7 @@ package cn.hippo4j.core.plugin.impl; import cn.hippo4j.core.plugin.ExecuteAwarePlugin; import cn.hippo4j.core.plugin.PluginRuntime; +import cn.hippo4j.core.plugin.manager.MaximumActiveThreadCountCheckerRegistrar; import cn.hippo4j.core.plugin.manager.ThreadPoolPluginSupport; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -43,6 +44,7 @@ import java.util.concurrent.atomic.AtomicReference; *

NOTE: if custom {@link Thread.UncaughtExceptionHandler} is set for the thread pool, * it may catch the {@link IllegalMaximumActiveCountException} exception and cause the worker thread to not be destroyed. * + * @see MaximumActiveThreadCountCheckerRegistrar * @see IllegalMaximumActiveCountException */ @RequiredArgsConstructor @@ -102,6 +104,24 @@ public class MaximumActiveThreadCountChecker implements ExecuteAwarePlugin { return currentOverflowThreads == null ? 0 : currentOverflowThreads.get(); } + /** + *

Check {@link ThreadPoolExecutor#getActiveCount()} whether greater than {@link ThreadPoolExecutor#getMaximumPoolSize()}. + * if the number of threads in the thread pool exceeds the maximum thread count, set the number of overflow threads. + * + * @return number of overflow threads + * @see #setOverflowThreadNumber(int) + */ + public Integer checkOverflowThreads() { + ThreadPoolExecutor executor = threadPoolPluginSupport.getThreadPoolExecutor(); + int activeCount = executor.getActiveCount(); + int maximumActiveCount = executor.getMaximumPoolSize(); + int number = activeCount > maximumActiveCount ? activeCount - maximumActiveCount : 0; + if (number > 0) { + setOverflowThreadNumber(number); + } + return number; + } + /** * Get plugin runtime info. * diff --git a/threadpool/core/src/main/java/cn/hippo4j/core/executor/plugin/manager/MaximumActiveThreadCountCheckerRegistrar.java b/threadpool/core/src/main/java/cn/hippo4j/core/executor/plugin/manager/MaximumActiveThreadCountCheckerRegistrar.java new file mode 100644 index 00000000..b8e1d0ea --- /dev/null +++ b/threadpool/core/src/main/java/cn/hippo4j/core/executor/plugin/manager/MaximumActiveThreadCountCheckerRegistrar.java @@ -0,0 +1,89 @@ +package cn.hippo4j.core.plugin.manager; + +import cn.hippo4j.common.toolkit.Assert; +import cn.hippo4j.core.executor.plugin.manager.ThreadPoolPluginRegistrar; +import cn.hippo4j.core.executor.plugin.manager.ThreadPoolPluginSupport; +import cn.hippo4j.core.plugin.impl.MaximumActiveThreadCountChecker; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + *

A register for {@link MaximumActiveThreadCountChecker}.
+ * After {@link #startCheckOverflowThreads} has been invoked, + * it will start a task for check whether the thread pool has overflow threads in the specified interval. + * + * @author huangchengxing + * @see MaximumActiveThreadCountChecker + */ +@Slf4j +@RequiredArgsConstructor +public class MaximumActiveThreadCountCheckerRegistrar implements ThreadPoolPluginRegistrar { + + /** + * registered checkers + */ + private final List checkers = new CopyOnWriteArrayList<>(); + + /** + * scheduled executor service + */ + private final ScheduledExecutorService scheduledExecutorService; + + /** + * task for check whether the thread pool has overflow threads. + */ + private ScheduledFuture task; + + /** + *

Start task for check whether the thread pool has overflow threads. + * if task is already running, it will be canceled and restarted. + */ + public void startCheckOverflowThreads(long checkInterval, TimeUnit checkIntervalTimeUnit) { + Assert.isTrue(checkInterval > 0, "checkInterval must be greater than 0"); + Assert.notNull(checkIntervalTimeUnit, "checkIntervalTimeUnit must not be null"); + if (task != null) { + stopCheckOverflowThreads(); + } + log.info("start check overflow threads task, checkInterval: {}, checkIntervalTimeUnit: {}", checkInterval, checkIntervalTimeUnit); + this.task = scheduledExecutorService.scheduleAtFixedRate( + () -> checkers.forEach(MaximumActiveThreadCountChecker::checkOverflowThreads), + checkInterval, checkInterval, checkIntervalTimeUnit + ); + } + + /** + * Cancel task for check whether the thread pool has overflow threads. + * + * @return true if the task was canceled before it completed normally + */ + public boolean stopCheckOverflowThreads() { + // is acceptable to cancel a task even if it is already running, + // because this behavior will not affect the normal operation of the thread pool + if (task != null && task.cancel(true)) { + log.info("cancel check overflow threads task"); + return true; + } + return false; + } + + /** + * Create and register plugin for the specified thread-pool instance. + * + * @param support thread pool plugin manager delegate + */ + @Override + public void doRegister(ThreadPoolPluginSupport support) { + MaximumActiveThreadCountChecker checker = new MaximumActiveThreadCountChecker(support); + if (log.isDebugEnabled()) { + log.debug("register maximum active thread count checker for thread pool: {}", support.getThreadPoolId()); + } + support.register(checker); + checkers.add(checker); + } +} diff --git a/threadpool/core/src/test/java/cn/hippo4j/core/executor/plugin/impl/MaximumActiveThreadCountCheckerTest.java b/threadpool/core/src/test/java/cn/hippo4j/core/executor/plugin/impl/MaximumActiveThreadCountCheckerTest.java index f19ac02e..7335aa0d 100644 --- a/threadpool/core/src/test/java/cn/hippo4j/core/executor/plugin/impl/MaximumActiveThreadCountCheckerTest.java +++ b/threadpool/core/src/test/java/cn/hippo4j/core/executor/plugin/impl/MaximumActiveThreadCountCheckerTest.java @@ -28,7 +28,7 @@ import java.util.UUID; import java.util.concurrent.*; /** - * test for {@link MaximumActiveThreadCountChecker} + * test for {@link cn.hippo4j.core.plugin.impl.MaximumActiveThreadCountChecker} * * @author huangchengxing */ @@ -40,9 +40,10 @@ public class MaximumActiveThreadCountCheckerTest { "test", new DefaultThreadPoolPluginManager(), 5, 5, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10), t -> new Thread(t, UUID.randomUUID().toString()), new ThreadPoolExecutor.AbortPolicy()); - MaximumActiveThreadCountChecker checker = new MaximumActiveThreadCountChecker(executor); + cn.hippo4j.core.plugin.impl.MaximumActiveThreadCountChecker checker = new cn.hippo4j.core.plugin.impl.MaximumActiveThreadCountChecker(executor); Assert.assertEquals(checker.getClass().getSimpleName(), checker.getPluginRuntime().getPluginId()); // check plugin info + checker.checkOverflowThreads(); List infoList = checker.getPluginRuntime().getInfoList(); Assert.assertEquals(1, infoList.size()); Assert.assertEquals("overflowThreadNumber", infoList.get(0).getName()); @@ -57,7 +58,7 @@ public class MaximumActiveThreadCountCheckerTest { "test", new DefaultThreadPoolPluginManager(), maximumThreadNum, maximumThreadNum, 0L, TimeUnit.MILLISECONDS, queue, t -> new Thread(t, UUID.randomUUID().toString()), new ThreadPoolExecutor.AbortPolicy()); - MaximumActiveThreadCountChecker checker = new MaximumActiveThreadCountChecker(executor); + cn.hippo4j.core.plugin.impl.MaximumActiveThreadCountChecker checker = new cn.hippo4j.core.plugin.impl.MaximumActiveThreadCountChecker(executor); executor.register(checker); // create 2 workers and block them