From 21519deaf39092e8f8a83bbcc7e07e849d344a70 Mon Sep 17 00:00:00 2001 From: huangchengxing <841396397@qq.com> Date: Sun, 28 May 2023 18:54:46 +0800 Subject: [PATCH] refcator(MaximumThreadNumChecker): checking for overflow threads through cas --- .../impl/MaximumActiveThreadCountChecker.java | 168 +++++++++++++----- .../DefaultThreadPoolPluginRegistrar.java | 2 - .../MaximumActiveThreadCountCheckerTest.java | 66 +------ 3 files changed, 128 insertions(+), 108 deletions(-) diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/MaximumActiveThreadCountChecker.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/MaximumActiveThreadCountChecker.java index 17fc8730..73fb2d7c 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/MaximumActiveThreadCountChecker.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/MaximumActiveThreadCountChecker.java @@ -20,11 +20,13 @@ package cn.hippo4j.core.plugin.impl; import cn.hippo4j.core.plugin.ExecuteAwarePlugin; import cn.hippo4j.core.plugin.PluginRuntime; import cn.hippo4j.core.plugin.manager.ThreadPoolPluginSupport; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.RunnableFuture; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; /** *

A plugin that checks whether the maximum number of threads in the thread pool is exceeded. @@ -36,7 +38,6 @@ import java.util.concurrent.atomic.AtomicBoolean; * if already exceeded, interrupt the worker thread, * and throw an {@link IllegalMaximumActiveCountException} exception to destroy the worker thread; * - *

  • if {@link #enableSubmitTaskAfterCheckFail} is true, re submit the task to the thread pool after check fail;
  • * * *

    NOTE: if custom {@link Thread.UncaughtExceptionHandler} is set for the thread pool, @@ -45,6 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean; * @author huangchengxing * @see IllegalMaximumActiveCountException */ +@RequiredArgsConstructor @Slf4j public class MaximumActiveThreadCountChecker implements ExecuteAwarePlugin { @@ -54,37 +56,43 @@ public class MaximumActiveThreadCountChecker implements ExecuteAwarePlugin { public final ThreadPoolPluginSupport threadPoolPluginSupport; /** - * Whether to re-deliver the task to the thread pool after the maximum number of threads is exceeded. - */ - private final AtomicBoolean enableSubmitTaskAfterCheckFail; - - /** - * Create {@link MaximumActiveThreadCountChecker}. + *

    A pointer to a number object used to record the number of threads in the thread pool + * that exceeds the maximum thread count. + * There are two possible scenarios: + *

    * - * @param threadPoolPluginSupport thread pool - * @param enableSubmitTaskAfterCheckFail whether to re-deliver the task to the thread pool after the maximum number of threads is exceeded. + *

    After performing operations based on the original number object pointed by the pointer, + * the pointer may change: + *

    */ - public MaximumActiveThreadCountChecker(ThreadPoolPluginSupport threadPoolPluginSupport, boolean enableSubmitTaskAfterCheckFail) { - this.threadPoolPluginSupport = threadPoolPluginSupport; - this.enableSubmitTaskAfterCheckFail = new AtomicBoolean(enableSubmitTaskAfterCheckFail); - } + public final AtomicReference overflowThreadNumber = new AtomicReference<>(null); /** - * Set whether to re-deliver the task to the thread pool after the maximum number of threads is exceeded. + *

    Set the number of overflow threads. * - * @param enableSubmitTaskAfterCheckFail whether to re-deliver the task to the thread pool after the maximum number of threads is exceeded. + * @param number number of overflow threads */ - public void setEnableSubmitTaskAfterCheckFail(boolean enableSubmitTaskAfterCheckFail) { - this.enableSubmitTaskAfterCheckFail.set(enableSubmitTaskAfterCheckFail); - } - - /** - * Get whether to re-deliver the task to the thread pool after the maximum number of threads is exceeded. - * - * @return whether to re-deliver the task to the thread pool after the maximum number of threads is exceeded. - */ - public boolean isEnableSubmitTaskAfterCheckFail() { - return enableSubmitTaskAfterCheckFail.get(); + public void setOverflowThreadNumber(int number) { + AtomicInteger currentOverflowThreads = (number > 0) ? new AtomicInteger(number) : null; + overflowThreadNumber.set(currentOverflowThreads); } /** @@ -94,8 +102,7 @@ public class MaximumActiveThreadCountChecker implements ExecuteAwarePlugin { */ @Override public PluginRuntime getPluginRuntime() { - return new PluginRuntime(getId()) - .addInfo("enableSubmitTaskAfterCheckFail", enableSubmitTaskAfterCheckFail.get()); + return new PluginRuntime(getId()); } /** @@ -110,23 +117,92 @@ public class MaximumActiveThreadCountChecker implements ExecuteAwarePlugin { */ @Override public void beforeExecute(Thread thread, Runnable runnable) { - ThreadPoolExecutor threadPoolExecutor = threadPoolPluginSupport.getThreadPoolExecutor(); - int activeCount = threadPoolExecutor.getActiveCount(); - int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize(); - if (activeCount > maximumPoolSize) { - // redelivery task if necessary - if (enableSubmitTaskAfterCheckFail.get()) { - log.warn( - "The maximum number of threads in the thread pool '{}' has been exceeded(activeCount={}, maximumPoolSize={}), task '{}' will redelivery", - threadPoolPluginSupport.getThreadPoolId(), activeCount, maximumPoolSize, runnable); - submitTaskAfterCheckFail(runnable); - } else { - log.warn( - "The maximum number of threads in the thread pool '{}' has been exceeded(activeCount={}, maximumPoolSize={}), task '{}' will be discarded.", - threadPoolPluginSupport.getThreadPoolId(), activeCount, maximumPoolSize, runnable); + if (!hasOverflowThread()) { + return; + } + log.warn( + "The maximum number of threads in the thread pool '{}' has been exceeded, task '{}' will redelivery", + threadPoolPluginSupport.getThreadPoolId(), runnable); + submitTaskAfterCheckFail(runnable); + interruptAndThrowException(thread); + } + + /** + *

    check the maximum number of threads in the thread pool after the task is executed. + * + * @return true if the maximum number of threads in the thread pool has been exceeded + */ + @SuppressWarnings("all") + private boolean hasOverflowThread() { + AtomicInteger current; + + // has overflow thread? + while ((current = overflowThreadNumber.get()) != null) { + int curr = current.get(); + + // it's aready no overflow thread + if (curr < 1) { + // check whether the current value is effective, if not, try again + if (current == overflowThreadNumber.get()) { + continue; + } + return false; + } + + // has overflow thread, try to cas to decrease the number of overflow threads + int expect = curr - 1; + if (!current.compareAndSet(curr, expect)) { + // cas fail, the current value is uneffective, try again + continue; + } + + /* + cas success, decrease the number of overflow threads, + then, use cas to check the current value is effective: + + 1.if already no overflow thread, set the overflowThreadNumber to null; + 2.if still has other overflow thread, keep the overflowThreadNumber reference unchanged + */ + AtomicInteger newRef = (expect > 0) ? current : null; + if (overflowThreadNumber.compareAndSet(current, newRef)) { + + /* + cas success, it's means the current value is effective. + + if this thread is the last overflow thread, + and already set the overflowThreadNumber to null, + then we return true to throw an exception. + */ + if (expect == 0) { + return true; + } + + /* + if this thread is not the last overflow thread, + the value may still be decreased by other threads before this step. + + 1.if current value still greater than 0, + we must try again to check whether the current value is effective. + + 2.if current value is set to null, + it's means that after the current value is decremented to 0 by other threads, it is set to null. + + in addition, here may be a ABA problem in low probability: + when cas success, if the current value is still greater than 0, + its means still has other overflow thread can set it to null after the value is cas to 0, + we can't determine whether the null value is set by same check, + so we may make an error and throw an exception in the next check after the current check. + */ + AtomicInteger nc = overflowThreadNumber.get(); + if (nc == current || nc == null) { + return true; + } } - interruptAndThrowException(thread, activeCount, maximumPoolSize); + // the current value is uneffective, try again } + + // no overflow thread + return false; } /** @@ -147,11 +223,11 @@ public class MaximumActiveThreadCountChecker implements ExecuteAwarePlugin { threadPoolPluginSupport.getThreadPoolExecutor().execute(runnable); } - private void interruptAndThrowException(Thread thread, int activeCount, int maximumPoolSize) { + private void interruptAndThrowException(Thread thread) { thread.interrupt(); throw new IllegalMaximumActiveCountException( "The maximum number of threads in the thread pool '" + threadPoolPluginSupport.getThreadPoolId() - + "' has been exceeded(activeCount=" + activeCount + ", maximumPoolSize=" + maximumPoolSize + ")."); + + "' has been exceeded."); } /** diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/DefaultThreadPoolPluginRegistrar.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/DefaultThreadPoolPluginRegistrar.java index a044fa93..356782ee 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/DefaultThreadPoolPluginRegistrar.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/DefaultThreadPoolPluginRegistrar.java @@ -18,7 +18,6 @@ package cn.hippo4j.core.plugin.manager; import cn.hippo4j.core.plugin.ThreadPoolPlugin; -import cn.hippo4j.core.plugin.impl.MaximumActiveThreadCountChecker; import cn.hippo4j.core.plugin.impl.TaskDecoratorPlugin; import cn.hippo4j.core.plugin.impl.TaskRejectCountRecordPlugin; import cn.hippo4j.core.plugin.impl.TaskRejectNotifyAlarmPlugin; @@ -62,6 +61,5 @@ public class DefaultThreadPoolPluginRegistrar implements ThreadPoolPluginRegistr support.register(new TaskRejectCountRecordPlugin()); support.register(new TaskRejectNotifyAlarmPlugin()); support.register(new ThreadPoolExecutorShutdownPlugin(awaitTerminationMillis)); - support.register(new MaximumActiveThreadCountChecker(support, true)); } } diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/MaximumActiveThreadCountCheckerTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/MaximumActiveThreadCountCheckerTest.java index d4130212..1a7245f9 100644 --- a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/MaximumActiveThreadCountCheckerTest.java +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/MaximumActiveThreadCountCheckerTest.java @@ -44,16 +44,14 @@ public class MaximumActiveThreadCountCheckerTest { @Test public void testGetPluginRuntime() { ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor( - "test", new DefaultThreadPoolPluginManager(), - 5, 5, 0L, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue<>(10), t -> new Thread(t, UUID.randomUUID().toString()), new ThreadPoolExecutor.AbortPolicy()); - MaximumActiveThreadCountChecker checker = new MaximumActiveThreadCountChecker(executor, true); + "test", new DefaultThreadPoolPluginManager(), + 5, 5, 0L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(10), t -> new Thread(t, UUID.randomUUID().toString()), new ThreadPoolExecutor.AbortPolicy()); + MaximumActiveThreadCountChecker checker = new MaximumActiveThreadCountChecker(executor); Assert.assertEquals(checker.getClass().getSimpleName(), checker.getPluginRuntime().getPluginId()); // check plugin info List infoList = checker.getPluginRuntime().getInfoList(); - Assert.assertEquals(1, infoList.size()); - Assert.assertEquals("enableSubmitTaskAfterCheckFail", infoList.get(0).getName()); - Assert.assertEquals(true, infoList.get(0).getValue()); + Assert.assertEquals(0, infoList.size()); } @Test @@ -64,8 +62,7 @@ public class MaximumActiveThreadCountCheckerTest { "test", new DefaultThreadPoolPluginManager(), maximumThreadNum, maximumThreadNum, 0L, TimeUnit.MILLISECONDS, queue, t -> new Thread(t, UUID.randomUUID().toString()), new ThreadPoolExecutor.AbortPolicy()); - MaximumActiveThreadCountChecker checker = new MaximumActiveThreadCountChecker(executor, true); - Assert.assertTrue(checker.isEnableSubmitTaskAfterCheckFail()); + MaximumActiveThreadCountChecker checker = new MaximumActiveThreadCountChecker(executor); executor.register(checker); WaitBeforeExecute waitBeforeExecute = new WaitBeforeExecute(0L); executor.register(waitBeforeExecute); @@ -106,57 +103,6 @@ public class MaximumActiveThreadCountCheckerTest { } } - @Test - public void testWhenNotEnableSubmitTaskAfterCheckFail() { - int maximumThreadNum = 3; - BlockingQueue queue = new ArrayBlockingQueue<>(1); - ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor( - "test", new DefaultThreadPoolPluginManager(), - maximumThreadNum, maximumThreadNum, 0L, TimeUnit.MILLISECONDS, - queue, t -> new Thread(t, UUID.randomUUID().toString()), new ThreadPoolExecutor.AbortPolicy()); - MaximumActiveThreadCountChecker checker = new MaximumActiveThreadCountChecker(executor, true); - checker.setEnableSubmitTaskAfterCheckFail(false); - Assert.assertFalse(checker.isEnableSubmitTaskAfterCheckFail()); - executor.register(checker); - WaitBeforeExecute waitBeforeExecute = new WaitBeforeExecute(0L); - executor.register(waitBeforeExecute); - - // create 2 workers and block them - CountDownLatch latch1 = submitTaskForBlockingThread(maximumThreadNum - 1, executor); - try { - // wait for the 2 workers to be executed task - Thread.sleep(500L); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - - /* - * after 2 worker blocked, submit task and create the last worker, then change the maximum number of pool before the last task actually executed by worker, make plugin throw exception and - * re-deliver the task to the queue - */ - waitBeforeExecute.setWaitBeforeExecute(200L); - CountDownLatch latch2 = submitTaskForBlockingThread(1, executor); - executor.setCorePoolSize(maximumThreadNum - 1); - executor.setMaximumPoolSize(maximumThreadNum - 1); - - // wait for plugin abort the task - try { - Thread.sleep(500L); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - Assert.assertEquals(0, queue.size()); - // last worker destroyed due to the exception which thrown by plugin - Assert.assertEquals(2, executor.getActiveCount()); - - // free resources - latch1.countDown(); - latch2.countDown(); - executor.shutdown(); - while (executor.isTerminated()) { - } - } - private CountDownLatch submitTaskForBlockingThread(int num, ThreadPoolExecutor executor) { CountDownLatch latch = new CountDownLatch(1); for (int i = 0; i < num; i++) {