feat(MaximumActiveThreadCountChecker): add new plugin that checks whether the maximum number of threads in the thread pool is exceeded(#1208)

pull/1211/head
huangchengxing 2 years ago
parent 46334f8927
commit 425f27aef2

@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.core.plugin.ExecuteAwarePlugin;
import cn.hippo4j.core.plugin.PluginRuntime;
import cn.hippo4j.core.plugin.manager.ThreadPoolPluginSupport;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* <p>A plugin that checks whether the maximum number of threads in the thread pool is exceeded.
*
* <p>When task is submitted to the thread pool, before the current worker thread executes the task:
* <ol>
* <li>check whether the maximum number of threads in the thread pool is exceeded;</li>
* <li>
* if already exceeded, interrupt the worker thread,
* and throw an {@link IllegalMaximumActiveCountException} exception to destroy the worker thread;
* </li>
* <li>if {@link #enableSubmitTaskAfterCheckFail} is true, re submit the task to the thread pool after check fail;</li>
* </ol>
*
* <p><b>NOTE</b>: if custom {@link Thread.UncaughtExceptionHandler} is set for the thread pool,
* it may catch the {@link IllegalMaximumActiveCountException} exception and cause the worker thread to not be destroyed.
*
* @author huangchengxing
* @see IllegalMaximumActiveCountException
*/
@Slf4j
public class MaximumActiveThreadCountChecker implements ExecuteAwarePlugin {
/**
* Thread pool.
*/
public final ThreadPoolPluginSupport threadPoolPluginSupport;
/**
* Whether to re-deliver the task to the thread pool after the maximum number of threads is exceeded.
*/
private final AtomicBoolean enableSubmitTaskAfterCheckFail;
/**
* Create {@link MaximumActiveThreadCountChecker}.
*
* @param threadPoolPluginSupport thread pool
* @param enableSubmitTaskAfterCheckFail whether to re-deliver the task to the thread pool after the maximum number of threads is exceeded.
*/
public MaximumActiveThreadCountChecker(ThreadPoolPluginSupport threadPoolPluginSupport, boolean enableSubmitTaskAfterCheckFail) {
this.threadPoolPluginSupport = threadPoolPluginSupport;
this.enableSubmitTaskAfterCheckFail = new AtomicBoolean(enableSubmitTaskAfterCheckFail);
}
/**
* Set whether to re-deliver the task to the thread pool after the maximum number of threads is exceeded.
*
* @param enableSubmitTaskAfterCheckFail whether to re-deliver the task to the thread pool after the maximum number of threads is exceeded.
*/
public void setEnableSubmitTaskAfterCheckFail(boolean enableSubmitTaskAfterCheckFail) {
this.enableSubmitTaskAfterCheckFail.set(enableSubmitTaskAfterCheckFail);
}
/**
* Get whether to re-deliver the task to the thread pool after the maximum number of threads is exceeded.
*
* @return whether to re-deliver the task to the thread pool after the maximum number of threads is exceeded.
*/
public boolean isEnableSubmitTaskAfterCheckFail() {
return enableSubmitTaskAfterCheckFail.get();
}
/**
* Get plugin runtime info.
*
* @return plugin runtime info
*/
@Override
public PluginRuntime getPluginRuntime() {
return new PluginRuntime(getId())
.addInfo("enableSubmitTaskAfterCheckFail", enableSubmitTaskAfterCheckFail.get());
}
/**
* <p>Check the maximum number of threads in the thread pool before the task is executed,
* if the maximum number of threads is exceeded,
* an {@link IllegalMaximumActiveCountException} will be thrown.
*
* @param thread thread of executing task
* @param runnable task
* @throws IllegalMaximumActiveCountException thread if the maximum number of threads is exceeded
* @see ThreadPoolExecutor#beforeExecute
*/
@Override
public void beforeExecute(Thread thread, Runnable runnable) {
ThreadPoolExecutor threadPoolExecutor = threadPoolPluginSupport.getThreadPoolExecutor();
int activeCount = threadPoolExecutor.getActiveCount();
int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
if (activeCount > maximumPoolSize) {
// redelivery task if necessary
if (enableSubmitTaskAfterCheckFail.get()) {
log.warn(
"The maximum number of threads in the thread pool '{}' has been exceeded(activeCount={}, maximumPoolSize={}), task '{}' will redelivery",
threadPoolPluginSupport.getThreadPoolId(), activeCount, maximumPoolSize, runnable);
submitTaskAfterCheckFail(runnable);
} else {
log.warn(
"The maximum number of threads in the thread pool '{}' has been exceeded(activeCount={}, maximumPoolSize={}), task '{}' will be discarded.",
threadPoolPluginSupport.getThreadPoolId(), activeCount, maximumPoolSize, runnable);
}
interruptAndThrowException(thread, activeCount, maximumPoolSize);
}
}
/**
* <p>Submit task to thread pool after check fail.
*
* @param runnable task
*/
protected void submitTaskAfterCheckFail(Runnable runnable) {
if (runnable instanceof RunnableFuture) {
RunnableFuture<?> future = (RunnableFuture<?>) runnable;
if (future.isDone()) {
return;
}
if (future.isCancelled()) {
return;
}
}
threadPoolPluginSupport.getThreadPoolExecutor().execute(runnable);
}
private void interruptAndThrowException(Thread thread, int activeCount, int maximumPoolSize) {
thread.interrupt();
throw new IllegalMaximumActiveCountException(
"The maximum number of threads in the thread pool '" + threadPoolPluginSupport.getThreadPoolId()
+ "' has been exceeded(activeCount=" + activeCount + ", maximumPoolSize=" + maximumPoolSize + ").");
}
/**
* A {@link RuntimeException} that indicates that the maximum number of threads in the thread pool has been exceeded.
*/
protected static class IllegalMaximumActiveCountException extends RuntimeException {
/**
* Constructs a new runtime exception with the specified detail message.
* The cause is not initialized, and may subsequently be initialized by a
* call to {@link #initCause}.
*
* @param message the detail message. The detail message is saved for
* later retrieval by the {@link #getMessage()} method.
*/
public IllegalMaximumActiveCountException(String message) {
super(message);
}
}
}

@ -18,6 +18,7 @@
package cn.hippo4j.core.plugin.manager;
import cn.hippo4j.core.plugin.ThreadPoolPlugin;
import cn.hippo4j.core.plugin.impl.MaximumActiveThreadCountChecker;
import cn.hippo4j.core.plugin.impl.TaskDecoratorPlugin;
import cn.hippo4j.core.plugin.impl.TaskRejectCountRecordPlugin;
import cn.hippo4j.core.plugin.impl.TaskRejectNotifyAlarmPlugin;
@ -61,5 +62,6 @@ public class DefaultThreadPoolPluginRegistrar implements ThreadPoolPluginRegistr
support.register(new TaskRejectCountRecordPlugin());
support.register(new TaskRejectNotifyAlarmPlugin());
support.register(new ThreadPoolExecutorShutdownPlugin(awaitTerminationMillis));
support.register(new MaximumActiveThreadCountChecker(support, true));
}
}

@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.ExecuteAwarePlugin;
import cn.hippo4j.core.plugin.PluginRuntime;
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager;
import lombok.AllArgsConstructor;
import lombok.Setter;
import org.junit.Assert;
import org.junit.Test;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* test for {@link MaximumActiveThreadCountChecker}
*
* @author huangchengxing
*/
public class MaximumActiveThreadCountCheckerTest {
@Test
public void testGetPluginRuntime() {
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginManager(),
5, 5, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(10), t -> new Thread(t, UUID.randomUUID().toString()), new ThreadPoolExecutor.AbortPolicy());
MaximumActiveThreadCountChecker checker = new MaximumActiveThreadCountChecker(executor, true);
Assert.assertEquals(checker.getClass().getSimpleName(), checker.getPluginRuntime().getPluginId());
// check plugin info
List<PluginRuntime.Info> infoList = checker.getPluginRuntime().getInfoList();
Assert.assertEquals(1, infoList.size());
Assert.assertEquals("enableSubmitTaskAfterCheckFail", infoList.get(0).getName());
Assert.assertEquals(true, infoList.get(0).getValue());
}
@Test
public void testWhenEnableSubmitTaskAfterCheckFail() {
int maximumThreadNum = 3;
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1);
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginManager(),
maximumThreadNum, maximumThreadNum, 0L, TimeUnit.MILLISECONDS,
queue, t -> new Thread(t, UUID.randomUUID().toString()), new ThreadPoolExecutor.AbortPolicy());
MaximumActiveThreadCountChecker checker = new MaximumActiveThreadCountChecker(executor, true);
Assert.assertTrue(checker.isEnableSubmitTaskAfterCheckFail());
executor.register(checker);
WaitBeforeExecute waitBeforeExecute = new WaitBeforeExecute(0L);
executor.register(waitBeforeExecute);
// create 2 workers and block them
CountDownLatch latch1 = submitTaskForBlockingThread(maximumThreadNum - 1, executor);
try {
// wait for the 2 workers to be executed task
Thread.sleep(500L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
/*
* after 2 worker blocked, submit task and create the last worker, then change the maximum number of pool before the last task actually executed by worker, make plugin throw exception and
* re-deliver the task to the queue
*/
waitBeforeExecute.setWaitBeforeExecute(200L);
CountDownLatch latch2 = submitTaskForBlockingThread(1, executor);
executor.setCorePoolSize(maximumThreadNum - 1);
executor.setMaximumPoolSize(maximumThreadNum - 1);
// wait for plugin to re-deliver the task to the queue
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Assert.assertEquals(1, queue.size());
// last worker destroyed due to the exception which thrown by plugin
Assert.assertEquals(2, executor.getActiveCount());
// free resources
latch1.countDown();
latch2.countDown();
executor.shutdown();
while (executor.isTerminated()) {
}
}
@Test
public void testWhenNotEnableSubmitTaskAfterCheckFail() {
int maximumThreadNum = 3;
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1);
ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor(
"test", new DefaultThreadPoolPluginManager(),
maximumThreadNum, maximumThreadNum, 0L, TimeUnit.MILLISECONDS,
queue, t -> new Thread(t, UUID.randomUUID().toString()), new ThreadPoolExecutor.AbortPolicy());
MaximumActiveThreadCountChecker checker = new MaximumActiveThreadCountChecker(executor, true);
checker.setEnableSubmitTaskAfterCheckFail(false);
Assert.assertFalse(checker.isEnableSubmitTaskAfterCheckFail());
executor.register(checker);
WaitBeforeExecute waitBeforeExecute = new WaitBeforeExecute(0L);
executor.register(waitBeforeExecute);
// create 2 workers and block them
CountDownLatch latch1 = submitTaskForBlockingThread(maximumThreadNum - 1, executor);
try {
// wait for the 2 workers to be executed task
Thread.sleep(500L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
/*
* after 2 worker blocked, submit task and create the last worker, then change the maximum number of pool before the last task actually executed by worker, make plugin throw exception and
* re-deliver the task to the queue
*/
waitBeforeExecute.setWaitBeforeExecute(200L);
CountDownLatch latch2 = submitTaskForBlockingThread(1, executor);
executor.setCorePoolSize(maximumThreadNum - 1);
executor.setMaximumPoolSize(maximumThreadNum - 1);
// wait for plugin abort the task
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Assert.assertEquals(0, queue.size());
// last worker destroyed due to the exception which thrown by plugin
Assert.assertEquals(2, executor.getActiveCount());
// free resources
latch1.countDown();
latch2.countDown();
executor.shutdown();
while (executor.isTerminated()) {
}
}
private CountDownLatch submitTaskForBlockingThread(int num, ThreadPoolExecutor executor) {
CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < num; i++) {
Runnable runnable = () -> {
System.out.println(Thread.currentThread().getName() + " start");
try {
latch.await();
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " has been interrupted");
}
System.out.println(Thread.currentThread().getName() + " completed");
};
System.out.println("submit task@" + runnable.hashCode());
executor.execute(runnable);
}
return latch;
}
@Setter
@AllArgsConstructor
private static class WaitBeforeExecute implements ExecuteAwarePlugin {
private long waitBeforeExecute;
@Override
public void beforeExecute(Thread thread, Runnable runnable) {
try {
Thread.sleep(waitBeforeExecute);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
Loading…
Cancel
Save