From 425f27aef25f7ff34affba215f9de726dc99ef72 Mon Sep 17 00:00:00 2001 From: huangchengxing <841396397@qq.com> Date: Tue, 2 May 2023 19:51:20 +0800 Subject: [PATCH] feat(MaximumActiveThreadCountChecker): add new plugin that checks whether the maximum number of threads in the thread pool is exceeded(#1208) --- .../impl/MaximumActiveThreadCountChecker.java | 174 ++++++++++++++++ .../DefaultThreadPoolPluginRegistrar.java | 2 + .../MaximumActiveThreadCountCheckerTest.java | 192 ++++++++++++++++++ 3 files changed, 368 insertions(+) create mode 100644 hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/MaximumActiveThreadCountChecker.java create mode 100644 hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/MaximumActiveThreadCountCheckerTest.java diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/MaximumActiveThreadCountChecker.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/MaximumActiveThreadCountChecker.java new file mode 100644 index 00000000..17fc8730 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/MaximumActiveThreadCountChecker.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.impl; + +import cn.hippo4j.core.plugin.ExecuteAwarePlugin; +import cn.hippo4j.core.plugin.PluginRuntime; +import cn.hippo4j.core.plugin.manager.ThreadPoolPluginSupport; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + *

A plugin that checks whether the maximum number of threads in the thread pool is exceeded. + * + *

When task is submitted to the thread pool, before the current worker thread executes the task: + *

    + *
  1. check whether the maximum number of threads in the thread pool is exceeded;
  2. + *
  3. + * if already exceeded, interrupt the worker thread, + * and throw an {@link IllegalMaximumActiveCountException} exception to destroy the worker thread; + *
  4. + *
  5. if {@link #enableSubmitTaskAfterCheckFail} is true, re submit the task to the thread pool after check fail;
  6. + *
+ * + *

NOTE: if custom {@link Thread.UncaughtExceptionHandler} is set for the thread pool, + * it may catch the {@link IllegalMaximumActiveCountException} exception and cause the worker thread to not be destroyed. + * + * @author huangchengxing + * @see IllegalMaximumActiveCountException + */ +@Slf4j +public class MaximumActiveThreadCountChecker implements ExecuteAwarePlugin { + + /** + * Thread pool. + */ + public final ThreadPoolPluginSupport threadPoolPluginSupport; + + /** + * Whether to re-deliver the task to the thread pool after the maximum number of threads is exceeded. + */ + private final AtomicBoolean enableSubmitTaskAfterCheckFail; + + /** + * Create {@link MaximumActiveThreadCountChecker}. + * + * @param threadPoolPluginSupport thread pool + * @param enableSubmitTaskAfterCheckFail whether to re-deliver the task to the thread pool after the maximum number of threads is exceeded. + */ + public MaximumActiveThreadCountChecker(ThreadPoolPluginSupport threadPoolPluginSupport, boolean enableSubmitTaskAfterCheckFail) { + this.threadPoolPluginSupport = threadPoolPluginSupport; + this.enableSubmitTaskAfterCheckFail = new AtomicBoolean(enableSubmitTaskAfterCheckFail); + } + + /** + * Set whether to re-deliver the task to the thread pool after the maximum number of threads is exceeded. + * + * @param enableSubmitTaskAfterCheckFail whether to re-deliver the task to the thread pool after the maximum number of threads is exceeded. + */ + public void setEnableSubmitTaskAfterCheckFail(boolean enableSubmitTaskAfterCheckFail) { + this.enableSubmitTaskAfterCheckFail.set(enableSubmitTaskAfterCheckFail); + } + + /** + * Get whether to re-deliver the task to the thread pool after the maximum number of threads is exceeded. + * + * @return whether to re-deliver the task to the thread pool after the maximum number of threads is exceeded. + */ + public boolean isEnableSubmitTaskAfterCheckFail() { + return enableSubmitTaskAfterCheckFail.get(); + } + + /** + * Get plugin runtime info. + * + * @return plugin runtime info + */ + @Override + public PluginRuntime getPluginRuntime() { + return new PluginRuntime(getId()) + .addInfo("enableSubmitTaskAfterCheckFail", enableSubmitTaskAfterCheckFail.get()); + } + + /** + *

Check the maximum number of threads in the thread pool before the task is executed, + * if the maximum number of threads is exceeded, + * an {@link IllegalMaximumActiveCountException} will be thrown. + * + * @param thread thread of executing task + * @param runnable task + * @throws IllegalMaximumActiveCountException thread if the maximum number of threads is exceeded + * @see ThreadPoolExecutor#beforeExecute + */ + @Override + public void beforeExecute(Thread thread, Runnable runnable) { + ThreadPoolExecutor threadPoolExecutor = threadPoolPluginSupport.getThreadPoolExecutor(); + int activeCount = threadPoolExecutor.getActiveCount(); + int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize(); + if (activeCount > maximumPoolSize) { + // redelivery task if necessary + if (enableSubmitTaskAfterCheckFail.get()) { + log.warn( + "The maximum number of threads in the thread pool '{}' has been exceeded(activeCount={}, maximumPoolSize={}), task '{}' will redelivery", + threadPoolPluginSupport.getThreadPoolId(), activeCount, maximumPoolSize, runnable); + submitTaskAfterCheckFail(runnable); + } else { + log.warn( + "The maximum number of threads in the thread pool '{}' has been exceeded(activeCount={}, maximumPoolSize={}), task '{}' will be discarded.", + threadPoolPluginSupport.getThreadPoolId(), activeCount, maximumPoolSize, runnable); + } + interruptAndThrowException(thread, activeCount, maximumPoolSize); + } + } + + /** + *

Submit task to thread pool after check fail. + * + * @param runnable task + */ + protected void submitTaskAfterCheckFail(Runnable runnable) { + if (runnable instanceof RunnableFuture) { + RunnableFuture future = (RunnableFuture) runnable; + if (future.isDone()) { + return; + } + if (future.isCancelled()) { + return; + } + } + threadPoolPluginSupport.getThreadPoolExecutor().execute(runnable); + } + + private void interruptAndThrowException(Thread thread, int activeCount, int maximumPoolSize) { + thread.interrupt(); + throw new IllegalMaximumActiveCountException( + "The maximum number of threads in the thread pool '" + threadPoolPluginSupport.getThreadPoolId() + + "' has been exceeded(activeCount=" + activeCount + ", maximumPoolSize=" + maximumPoolSize + ")."); + } + + /** + * A {@link RuntimeException} that indicates that the maximum number of threads in the thread pool has been exceeded. + */ + protected static class IllegalMaximumActiveCountException extends RuntimeException { + + /** + * Constructs a new runtime exception with the specified detail message. + * The cause is not initialized, and may subsequently be initialized by a + * call to {@link #initCause}. + * + * @param message the detail message. The detail message is saved for + * later retrieval by the {@link #getMessage()} method. + */ + public IllegalMaximumActiveCountException(String message) { + super(message); + } + } +} diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/DefaultThreadPoolPluginRegistrar.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/DefaultThreadPoolPluginRegistrar.java index 356782ee..a044fa93 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/DefaultThreadPoolPluginRegistrar.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/manager/DefaultThreadPoolPluginRegistrar.java @@ -18,6 +18,7 @@ package cn.hippo4j.core.plugin.manager; import cn.hippo4j.core.plugin.ThreadPoolPlugin; +import cn.hippo4j.core.plugin.impl.MaximumActiveThreadCountChecker; import cn.hippo4j.core.plugin.impl.TaskDecoratorPlugin; import cn.hippo4j.core.plugin.impl.TaskRejectCountRecordPlugin; import cn.hippo4j.core.plugin.impl.TaskRejectNotifyAlarmPlugin; @@ -61,5 +62,6 @@ public class DefaultThreadPoolPluginRegistrar implements ThreadPoolPluginRegistr support.register(new TaskRejectCountRecordPlugin()); support.register(new TaskRejectNotifyAlarmPlugin()); support.register(new ThreadPoolExecutorShutdownPlugin(awaitTerminationMillis)); + support.register(new MaximumActiveThreadCountChecker(support, true)); } } diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/MaximumActiveThreadCountCheckerTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/MaximumActiveThreadCountCheckerTest.java new file mode 100644 index 00000000..d4130212 --- /dev/null +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/MaximumActiveThreadCountCheckerTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.plugin.impl; + +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; +import cn.hippo4j.core.plugin.ExecuteAwarePlugin; +import cn.hippo4j.core.plugin.PluginRuntime; +import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager; +import lombok.AllArgsConstructor; +import lombok.Setter; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * test for {@link MaximumActiveThreadCountChecker} + * + * @author huangchengxing + */ +public class MaximumActiveThreadCountCheckerTest { + + @Test + public void testGetPluginRuntime() { + ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor( + "test", new DefaultThreadPoolPluginManager(), + 5, 5, 0L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(10), t -> new Thread(t, UUID.randomUUID().toString()), new ThreadPoolExecutor.AbortPolicy()); + MaximumActiveThreadCountChecker checker = new MaximumActiveThreadCountChecker(executor, true); + Assert.assertEquals(checker.getClass().getSimpleName(), checker.getPluginRuntime().getPluginId()); + // check plugin info + List infoList = checker.getPluginRuntime().getInfoList(); + Assert.assertEquals(1, infoList.size()); + Assert.assertEquals("enableSubmitTaskAfterCheckFail", infoList.get(0).getName()); + Assert.assertEquals(true, infoList.get(0).getValue()); + } + + @Test + public void testWhenEnableSubmitTaskAfterCheckFail() { + int maximumThreadNum = 3; + BlockingQueue queue = new ArrayBlockingQueue<>(1); + ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor( + "test", new DefaultThreadPoolPluginManager(), + maximumThreadNum, maximumThreadNum, 0L, TimeUnit.MILLISECONDS, + queue, t -> new Thread(t, UUID.randomUUID().toString()), new ThreadPoolExecutor.AbortPolicy()); + MaximumActiveThreadCountChecker checker = new MaximumActiveThreadCountChecker(executor, true); + Assert.assertTrue(checker.isEnableSubmitTaskAfterCheckFail()); + executor.register(checker); + WaitBeforeExecute waitBeforeExecute = new WaitBeforeExecute(0L); + executor.register(waitBeforeExecute); + + // create 2 workers and block them + CountDownLatch latch1 = submitTaskForBlockingThread(maximumThreadNum - 1, executor); + try { + // wait for the 2 workers to be executed task + Thread.sleep(500L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + /* + * after 2 worker blocked, submit task and create the last worker, then change the maximum number of pool before the last task actually executed by worker, make plugin throw exception and + * re-deliver the task to the queue + */ + waitBeforeExecute.setWaitBeforeExecute(200L); + CountDownLatch latch2 = submitTaskForBlockingThread(1, executor); + executor.setCorePoolSize(maximumThreadNum - 1); + executor.setMaximumPoolSize(maximumThreadNum - 1); + + // wait for plugin to re-deliver the task to the queue + try { + Thread.sleep(500L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + Assert.assertEquals(1, queue.size()); + // last worker destroyed due to the exception which thrown by plugin + Assert.assertEquals(2, executor.getActiveCount()); + + // free resources + latch1.countDown(); + latch2.countDown(); + executor.shutdown(); + while (executor.isTerminated()) { + } + } + + @Test + public void testWhenNotEnableSubmitTaskAfterCheckFail() { + int maximumThreadNum = 3; + BlockingQueue queue = new ArrayBlockingQueue<>(1); + ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor( + "test", new DefaultThreadPoolPluginManager(), + maximumThreadNum, maximumThreadNum, 0L, TimeUnit.MILLISECONDS, + queue, t -> new Thread(t, UUID.randomUUID().toString()), new ThreadPoolExecutor.AbortPolicy()); + MaximumActiveThreadCountChecker checker = new MaximumActiveThreadCountChecker(executor, true); + checker.setEnableSubmitTaskAfterCheckFail(false); + Assert.assertFalse(checker.isEnableSubmitTaskAfterCheckFail()); + executor.register(checker); + WaitBeforeExecute waitBeforeExecute = new WaitBeforeExecute(0L); + executor.register(waitBeforeExecute); + + // create 2 workers and block them + CountDownLatch latch1 = submitTaskForBlockingThread(maximumThreadNum - 1, executor); + try { + // wait for the 2 workers to be executed task + Thread.sleep(500L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + /* + * after 2 worker blocked, submit task and create the last worker, then change the maximum number of pool before the last task actually executed by worker, make plugin throw exception and + * re-deliver the task to the queue + */ + waitBeforeExecute.setWaitBeforeExecute(200L); + CountDownLatch latch2 = submitTaskForBlockingThread(1, executor); + executor.setCorePoolSize(maximumThreadNum - 1); + executor.setMaximumPoolSize(maximumThreadNum - 1); + + // wait for plugin abort the task + try { + Thread.sleep(500L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + Assert.assertEquals(0, queue.size()); + // last worker destroyed due to the exception which thrown by plugin + Assert.assertEquals(2, executor.getActiveCount()); + + // free resources + latch1.countDown(); + latch2.countDown(); + executor.shutdown(); + while (executor.isTerminated()) { + } + } + + private CountDownLatch submitTaskForBlockingThread(int num, ThreadPoolExecutor executor) { + CountDownLatch latch = new CountDownLatch(1); + for (int i = 0; i < num; i++) { + Runnable runnable = () -> { + System.out.println(Thread.currentThread().getName() + " start"); + try { + latch.await(); + } catch (InterruptedException e) { + System.out.println(Thread.currentThread().getName() + " has been interrupted"); + } + System.out.println(Thread.currentThread().getName() + " completed"); + }; + System.out.println("submit task@" + runnable.hashCode()); + executor.execute(runnable); + } + return latch; + } + + @Setter + @AllArgsConstructor + private static class WaitBeforeExecute implements ExecuteAwarePlugin { + + private long waitBeforeExecute; + @Override + public void beforeExecute(Thread thread, Runnable runnable) { + try { + Thread.sleep(waitBeforeExecute); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } +}