feat(MaximumActiveThreadCountCheckerRegister): add a registrar for registering MaximumActiveThreadCountChecker and support regular check of the thread pool registered with the plugin (#1208)

pull/1361/head
huangchengxing 2 years ago
parent 488311c295
commit 052de51a84

@ -19,6 +19,7 @@ package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.core.plugin.ExecuteAwarePlugin; import cn.hippo4j.core.plugin.ExecuteAwarePlugin;
import cn.hippo4j.core.plugin.PluginRuntime; import cn.hippo4j.core.plugin.PluginRuntime;
import cn.hippo4j.core.plugin.manager.MaximumActiveThreadCountCheckerRegistrar;
import cn.hippo4j.core.plugin.manager.ThreadPoolPluginSupport; import cn.hippo4j.core.plugin.manager.ThreadPoolPluginSupport;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -43,6 +44,7 @@ import java.util.concurrent.atomic.AtomicReference;
* <p><b>NOTE</b>: if custom {@link Thread.UncaughtExceptionHandler} is set for the thread pool, * <p><b>NOTE</b>: if custom {@link Thread.UncaughtExceptionHandler} is set for the thread pool,
* it may catch the {@link IllegalMaximumActiveCountException} exception and cause the worker thread to not be destroyed. * it may catch the {@link IllegalMaximumActiveCountException} exception and cause the worker thread to not be destroyed.
* *
* @see MaximumActiveThreadCountCheckerRegistrar
* @see IllegalMaximumActiveCountException * @see IllegalMaximumActiveCountException
*/ */
@RequiredArgsConstructor @RequiredArgsConstructor
@ -102,6 +104,24 @@ public class MaximumActiveThreadCountChecker implements ExecuteAwarePlugin {
return currentOverflowThreads == null ? 0 : currentOverflowThreads.get(); return currentOverflowThreads == null ? 0 : currentOverflowThreads.get();
} }
/**
* <p>Check {@link ThreadPoolExecutor#getActiveCount()} whether greater than {@link ThreadPoolExecutor#getMaximumPoolSize()}.
* if the number of threads in the thread pool exceeds the maximum thread count, set the number of overflow threads.
*
* @return number of overflow threads
* @see #setOverflowThreadNumber(int)
*/
public Integer checkOverflowThreads() {
ThreadPoolExecutor executor = threadPoolPluginSupport.getThreadPoolExecutor();
int activeCount = executor.getActiveCount();
int maximumActiveCount = executor.getMaximumPoolSize();
int number = activeCount > maximumActiveCount ? activeCount - maximumActiveCount : 0;
if (number > 0) {
setOverflowThreadNumber(number);
}
return number;
}
/** /**
* Get plugin runtime info. * Get plugin runtime info.
* *

@ -0,0 +1,89 @@
package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.core.executor.plugin.manager.ThreadPoolPluginRegistrar;
import cn.hippo4j.core.executor.plugin.manager.ThreadPoolPluginSupport;
import cn.hippo4j.core.plugin.impl.MaximumActiveThreadCountChecker;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* <p>A register for {@link MaximumActiveThreadCountChecker}.<br />
* After {@link #startCheckOverflowThreads} has been invoked,
* it will start a task for check whether the thread pool has overflow threads in the specified interval.
*
* @author huangchengxing
* @see MaximumActiveThreadCountChecker
*/
@Slf4j
@RequiredArgsConstructor
public class MaximumActiveThreadCountCheckerRegistrar implements ThreadPoolPluginRegistrar {
/**
* registered checkers
*/
private final List<MaximumActiveThreadCountChecker> checkers = new CopyOnWriteArrayList<>();
/**
* scheduled executor service
*/
private final ScheduledExecutorService scheduledExecutorService;
/**
* task for check whether the thread pool has overflow threads.
*/
private ScheduledFuture<?> task;
/**
* <p>Start task for check whether the thread pool has overflow threads.
* if task is already running, it will be canceled and restarted.
*/
public void startCheckOverflowThreads(long checkInterval, TimeUnit checkIntervalTimeUnit) {
Assert.isTrue(checkInterval > 0, "checkInterval must be greater than 0");
Assert.notNull(checkIntervalTimeUnit, "checkIntervalTimeUnit must not be null");
if (task != null) {
stopCheckOverflowThreads();
}
log.info("start check overflow threads task, checkInterval: {}, checkIntervalTimeUnit: {}", checkInterval, checkIntervalTimeUnit);
this.task = scheduledExecutorService.scheduleAtFixedRate(
() -> checkers.forEach(MaximumActiveThreadCountChecker::checkOverflowThreads),
checkInterval, checkInterval, checkIntervalTimeUnit
);
}
/**
* Cancel task for check whether the thread pool has overflow threads.
*
* @return true if the task was canceled before it completed normally
*/
public boolean stopCheckOverflowThreads() {
// is acceptable to cancel a task even if it is already running,
// because this behavior will not affect the normal operation of the thread pool
if (task != null && task.cancel(true)) {
log.info("cancel check overflow threads task");
return true;
}
return false;
}
/**
* Create and register plugin for the specified thread-pool instance.
*
* @param support thread pool plugin manager delegate
*/
@Override
public void doRegister(ThreadPoolPluginSupport support) {
MaximumActiveThreadCountChecker checker = new MaximumActiveThreadCountChecker(support);
if (log.isDebugEnabled()) {
log.debug("register maximum active thread count checker for thread pool: {}", support.getThreadPoolId());
}
support.register(checker);
checkers.add(checker);
}
}

@ -28,7 +28,7 @@ import java.util.UUID;
import java.util.concurrent.*; import java.util.concurrent.*;
/** /**
* test for {@link MaximumActiveThreadCountChecker} * test for {@link cn.hippo4j.core.plugin.impl.MaximumActiveThreadCountChecker}
* *
* @author huangchengxing * @author huangchengxing
*/ */
@ -40,9 +40,10 @@ public class MaximumActiveThreadCountCheckerTest {
"test", new DefaultThreadPoolPluginManager(), "test", new DefaultThreadPoolPluginManager(),
5, 5, 0L, TimeUnit.MILLISECONDS, 5, 5, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(10), t -> new Thread(t, UUID.randomUUID().toString()), new ThreadPoolExecutor.AbortPolicy()); new ArrayBlockingQueue<>(10), t -> new Thread(t, UUID.randomUUID().toString()), new ThreadPoolExecutor.AbortPolicy());
MaximumActiveThreadCountChecker checker = new MaximumActiveThreadCountChecker(executor); cn.hippo4j.core.plugin.impl.MaximumActiveThreadCountChecker checker = new cn.hippo4j.core.plugin.impl.MaximumActiveThreadCountChecker(executor);
Assert.assertEquals(checker.getClass().getSimpleName(), checker.getPluginRuntime().getPluginId()); Assert.assertEquals(checker.getClass().getSimpleName(), checker.getPluginRuntime().getPluginId());
// check plugin info // check plugin info
checker.checkOverflowThreads();
List<PluginRuntime.Info> infoList = checker.getPluginRuntime().getInfoList(); List<PluginRuntime.Info> infoList = checker.getPluginRuntime().getInfoList();
Assert.assertEquals(1, infoList.size()); Assert.assertEquals(1, infoList.size());
Assert.assertEquals("overflowThreadNumber", infoList.get(0).getName()); Assert.assertEquals("overflowThreadNumber", infoList.get(0).getName());
@ -57,7 +58,7 @@ public class MaximumActiveThreadCountCheckerTest {
"test", new DefaultThreadPoolPluginManager(), "test", new DefaultThreadPoolPluginManager(),
maximumThreadNum, maximumThreadNum, 0L, TimeUnit.MILLISECONDS, maximumThreadNum, maximumThreadNum, 0L, TimeUnit.MILLISECONDS,
queue, t -> new Thread(t, UUID.randomUUID().toString()), new ThreadPoolExecutor.AbortPolicy()); queue, t -> new Thread(t, UUID.randomUUID().toString()), new ThreadPoolExecutor.AbortPolicy());
MaximumActiveThreadCountChecker checker = new MaximumActiveThreadCountChecker(executor); cn.hippo4j.core.plugin.impl.MaximumActiveThreadCountChecker checker = new cn.hippo4j.core.plugin.impl.MaximumActiveThreadCountChecker(executor);
executor.register(checker); executor.register(checker);
// create 2 workers and block them // create 2 workers and block them

Loading…
Cancel
Save