refcator(MaximumThreadNumChecker): checking for overflow threads through cas

pull/1211/head
huangchengxing 2 years ago
parent 425f27aef2
commit 21519deaf3

@ -20,11 +20,13 @@ package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.core.plugin.ExecuteAwarePlugin;
import cn.hippo4j.core.plugin.PluginRuntime;
import cn.hippo4j.core.plugin.manager.ThreadPoolPluginSupport;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
* <p>A plugin that checks whether the maximum number of threads in the thread pool is exceeded.
@ -36,7 +38,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
* if already exceeded, interrupt the worker thread,
* and throw an {@link IllegalMaximumActiveCountException} exception to destroy the worker thread;
* </li>
* <li>if {@link #enableSubmitTaskAfterCheckFail} is true, re submit the task to the thread pool after check fail;</li>
* </ol>
*
* <p><b>NOTE</b>: if custom {@link Thread.UncaughtExceptionHandler} is set for the thread pool,
@ -45,6 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
* @author huangchengxing
* @see IllegalMaximumActiveCountException
*/
@RequiredArgsConstructor
@Slf4j
public class MaximumActiveThreadCountChecker implements ExecuteAwarePlugin {
@ -54,37 +56,43 @@ public class MaximumActiveThreadCountChecker implements ExecuteAwarePlugin {
public final ThreadPoolPluginSupport threadPoolPluginSupport;
/**
* Whether to re-deliver the task to the thread pool after the maximum number of threads is exceeded.
*/
private final AtomicBoolean enableSubmitTaskAfterCheckFail;
/**
* Create {@link MaximumActiveThreadCountChecker}.
*
* @param threadPoolPluginSupport thread pool
* @param enableSubmitTaskAfterCheckFail whether to re-deliver the task to the thread pool after the maximum number of threads is exceeded.
*/
public MaximumActiveThreadCountChecker(ThreadPoolPluginSupport threadPoolPluginSupport, boolean enableSubmitTaskAfterCheckFail) {
this.threadPoolPluginSupport = threadPoolPluginSupport;
this.enableSubmitTaskAfterCheckFail = new AtomicBoolean(enableSubmitTaskAfterCheckFail);
}
/**
* Set whether to re-deliver the task to the thread pool after the maximum number of threads is exceeded.
* <p>A pointer to a number object used to record the number of threads in the thread pool
* that exceeds the maximum thread count.
* There are two possible scenarios:
* <ul>
* <li>Pointing to {@code null}: indicating that there are no threads in the thread pool exceeding the maximum count;</li>
* <li>
* Pointing to a number object, indicating that there are threads in the thread pool
* exceeding the maximum count, and the number represents the overflow thread count;
* </li>
* </ul>
*
* @param enableSubmitTaskAfterCheckFail whether to re-deliver the task to the thread pool after the maximum number of threads is exceeded.
* <p>After performing operations based on the original number object pointed by the pointer,
* the pointer may change:
* <ul>
* <li>The pointer remains unchanged, indicating the operations performed on the original number object are valid;</li>
* <li>
* The pointer points to another number object, indicating that the operations performed
* on the original number object should be considered invalid.
* It requires obtaining the number object again and retrying the operations
* </li>
* <li>
* The pointer points to {@code null}, indicating that there are no threads
* in the thread pool exceeding the maximum count.
* The operations based on the old reference are still effective;
* </li>
* </ul>
*/
public void setEnableSubmitTaskAfterCheckFail(boolean enableSubmitTaskAfterCheckFail) {
this.enableSubmitTaskAfterCheckFail.set(enableSubmitTaskAfterCheckFail);
}
public final AtomicReference<AtomicInteger> overflowThreadNumber = new AtomicReference<>(null);
/**
* Get whether to re-deliver the task to the thread pool after the maximum number of threads is exceeded.
* <p>Set the number of overflow threads.
*
* @return whether to re-deliver the task to the thread pool after the maximum number of threads is exceeded.
* @param number number of overflow threads
*/
public boolean isEnableSubmitTaskAfterCheckFail() {
return enableSubmitTaskAfterCheckFail.get();
public void setOverflowThreadNumber(int number) {
AtomicInteger currentOverflowThreads = (number > 0) ? new AtomicInteger(number) : null;
overflowThreadNumber.set(currentOverflowThreads);
}
/**
@ -94,8 +102,7 @@ public class MaximumActiveThreadCountChecker implements ExecuteAwarePlugin {
*/
@Override
public PluginRuntime getPluginRuntime() {
return new PluginRuntime(getId())
.addInfo("enableSubmitTaskAfterCheckFail", enableSubmitTaskAfterCheckFail.get());
return new PluginRuntime(getId());
}
/**
@ -110,23 +117,92 @@ public class MaximumActiveThreadCountChecker implements ExecuteAwarePlugin {
*/
@Override
public void beforeExecute(Thread thread, Runnable runnable) {
ThreadPoolExecutor threadPoolExecutor = threadPoolPluginSupport.getThreadPoolExecutor();
int activeCount = threadPoolExecutor.getActiveCount();
int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
if (activeCount > maximumPoolSize) {
// redelivery task if necessary
if (enableSubmitTaskAfterCheckFail.get()) {
if (!hasOverflowThread()) {
return;
}
log.warn(
"The maximum number of threads in the thread pool '{}' has been exceeded(activeCount={}, maximumPoolSize={}), task '{}' will redelivery",
threadPoolPluginSupport.getThreadPoolId(), activeCount, maximumPoolSize, runnable);
"The maximum number of threads in the thread pool '{}' has been exceeded, task '{}' will redelivery",
threadPoolPluginSupport.getThreadPoolId(), runnable);
submitTaskAfterCheckFail(runnable);
} else {
log.warn(
"The maximum number of threads in the thread pool '{}' has been exceeded(activeCount={}, maximumPoolSize={}), task '{}' will be discarded.",
threadPoolPluginSupport.getThreadPoolId(), activeCount, maximumPoolSize, runnable);
interruptAndThrowException(thread);
}
/**
* <p>check the maximum number of threads in the thread pool after the task is executed.
*
* @return true if the maximum number of threads in the thread pool has been exceeded
*/
@SuppressWarnings("all")
private boolean hasOverflowThread() {
AtomicInteger current;
// has overflow thread?
while ((current = overflowThreadNumber.get()) != null) {
int curr = current.get();
// it's aready no overflow thread
if (curr < 1) {
// check whether the current value is effective, if not, try again
if (current == overflowThreadNumber.get()) {
continue;
}
return false;
}
interruptAndThrowException(thread, activeCount, maximumPoolSize);
// has overflow thread, try to cas to decrease the number of overflow threads
int expect = curr - 1;
if (!current.compareAndSet(curr, expect)) {
// cas fail, the current value is uneffective, try again
continue;
}
/*
cas success, decrease the number of overflow threads,
then, use cas to check the current value is effective:
1.if already no overflow thread, set the overflowThreadNumber to null;
2.if still has other overflow thread, keep the overflowThreadNumber reference unchanged
*/
AtomicInteger newRef = (expect > 0) ? current : null;
if (overflowThreadNumber.compareAndSet(current, newRef)) {
/*
cas success, it's means the current value is effective.
if this thread is the last overflow thread,
and already set the overflowThreadNumber to null,
then we return true to throw an exception.
*/
if (expect == 0) {
return true;
}
/*
if this thread is not the last overflow thread,
the value may still be decreased by other threads before this step.
1.if current value still greater than 0,
we must try again to check whether the current value is effective.
2.if current value is set to null,
it's means that after the current value is decremented to 0 by other threads, it is set to null.
in addition, here may be a ABA problem in low probability:
when cas success, if the current value is still greater than 0,
its means still has other overflow thread can set it to null after the value is cas to 0,
we can't determine whether the null value is set by same check,
so we may make an error and throw an exception in the next check after the current check.
*/
AtomicInteger nc = overflowThreadNumber.get();
if (nc == current || nc == null) {
return true;
}
}
// the current value is uneffective, try again
}
// no overflow thread
return false;
}
/**
@ -147,11 +223,11 @@ public class MaximumActiveThreadCountChecker implements ExecuteAwarePlugin {
threadPoolPluginSupport.getThreadPoolExecutor().execute(runnable);
}
private void interruptAndThrowException(Thread thread, int activeCount, int maximumPoolSize) {
private void interruptAndThrowException(Thread thread) {
thread.interrupt();
throw new IllegalMaximumActiveCountException(
"The maximum number of threads in the thread pool '" + threadPoolPluginSupport.getThreadPoolId()
+ "' has been exceeded(activeCount=" + activeCount + ", maximumPoolSize=" + maximumPoolSize + ").");
+ "' has been exceeded.");
}
/**

@ -18,7 +18,6 @@
package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.core.plugin.ThreadPoolPlugin;
import cn.hippo4j.core.plugin.impl.MaximumActiveThreadCountChecker;
import cn.hippo4j.core.plugin.impl.TaskDecoratorPlugin;
import cn.hippo4j.core.plugin.impl.TaskRejectCountRecordPlugin;
import cn.hippo4j.core.plugin.impl.TaskRejectNotifyAlarmPlugin;
@ -62,6 +61,5 @@ public class DefaultThreadPoolPluginRegistrar implements ThreadPoolPluginRegistr
support.register(new TaskRejectCountRecordPlugin());
support.register(new TaskRejectNotifyAlarmPlugin());
support.register(new ThreadPoolExecutorShutdownPlugin(awaitTerminationMillis));
support.register(new MaximumActiveThreadCountChecker(support, true));
}
}

@ -47,13 +47,11 @@ public class MaximumActiveThreadCountCheckerTest {
"test", new DefaultThreadPoolPluginManager(),
5, 5, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(10), t -> new Thread(t, UUID.randomUUID().toString()), new ThreadPoolExecutor.AbortPolicy());
MaximumActiveThreadCountChecker checker = new MaximumActiveThreadCountChecker(executor, true);
MaximumActiveThreadCountChecker checker = new MaximumActiveThreadCountChecker(executor);
Assert.assertEquals(checker.getClass().getSimpleName(), checker.getPluginRuntime().getPluginId());
// check plugin info
List<PluginRuntime.Info> infoList = checker.getPluginRuntime().getInfoList();
Assert.assertEquals(1, infoList.size());
Assert.assertEquals("enableSubmitTaskAfterCheckFail", infoList.get(0).getName());
Assert.assertEquals(true, infoList.get(0).getValue());
Assert.assertEquals(0, infoList.size());
}
@Test
@ -64,8 +62,7 @@ public class MaximumActiveThreadCountCheckerTest {
"test", new DefaultThreadPoolPluginManager(),
maximumThreadNum, maximumThreadNum, 0L, TimeUnit.MILLISECONDS,
queue, t -> new Thread(t, UUID.randomUUID().toString()), new ThreadPoolExecutor.AbortPolicy());
MaximumActiveThreadCountChecker checker = new MaximumActiveThreadCountChecker(executor, true);
Assert.assertTrue(checker.isEnableSubmitTaskAfterCheckFail());
MaximumActiveThreadCountChecker checker = new MaximumActiveThreadCountChecker(executor);
executor.register(checker);
WaitBeforeExecute waitBeforeExecute = new WaitBeforeExecute(0L);
executor.register(waitBeforeExecute);
@ -106,57 +103,6 @@ public class MaximumActiveThreadCountCheckerTest {
}
}
@Test
public void testWhenNotEnableSubmitTaskAfterCheckFail() {
int maximumThreadNum = 3;
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1);
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginManager(),
maximumThreadNum, maximumThreadNum, 0L, TimeUnit.MILLISECONDS,
queue, t -> new Thread(t, UUID.randomUUID().toString()), new ThreadPoolExecutor.AbortPolicy());
MaximumActiveThreadCountChecker checker = new MaximumActiveThreadCountChecker(executor, true);
checker.setEnableSubmitTaskAfterCheckFail(false);
Assert.assertFalse(checker.isEnableSubmitTaskAfterCheckFail());
executor.register(checker);
WaitBeforeExecute waitBeforeExecute = new WaitBeforeExecute(0L);
executor.register(waitBeforeExecute);
// create 2 workers and block them
CountDownLatch latch1 = submitTaskForBlockingThread(maximumThreadNum - 1, executor);
try {
// wait for the 2 workers to be executed task
Thread.sleep(500L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
/*
* after 2 worker blocked, submit task and create the last worker, then change the maximum number of pool before the last task actually executed by worker, make plugin throw exception and
* re-deliver the task to the queue
*/
waitBeforeExecute.setWaitBeforeExecute(200L);
CountDownLatch latch2 = submitTaskForBlockingThread(1, executor);
executor.setCorePoolSize(maximumThreadNum - 1);
executor.setMaximumPoolSize(maximumThreadNum - 1);
// wait for plugin abort the task
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Assert.assertEquals(0, queue.size());
// last worker destroyed due to the exception which thrown by plugin
Assert.assertEquals(2, executor.getActiveCount());
// free resources
latch1.countDown();
latch2.countDown();
executor.shutdown();
while (executor.isTerminated()) {
}
}
private CountDownLatch submitTaskForBlockingThread(int num, ThreadPoolExecutor executor) {
CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < num; i++) {

Loading…
Cancel
Save