From 1bb7965f253f541ee9ab7cbe3ba1424026c250e4 Mon Sep 17 00:00:00 2001 From: huangchengxing <841396397@qq.com> Date: Mon, 29 May 2023 22:35:01 +0800 Subject: [PATCH] feat(MaximumActiveThreadCountChecker): add new plugin that checks whether the maximum number of threads in the thread pool is exceeded(#1208) --- .../impl/MaximumActiveThreadCountChecker.java | 279 ++++++++++++++++++ ...imumActiveThreadCountCheckerRegistrar.java | 87 ++++++ .../MaximumActiveThreadCountCheckerTest.java | 118 ++++++++ 3 files changed, 484 insertions(+) create mode 100644 threadpool/core/src/main/java/cn/hippo4j/core/executor/plugin/impl/MaximumActiveThreadCountChecker.java create mode 100644 threadpool/core/src/main/java/cn/hippo4j/core/executor/plugin/manager/MaximumActiveThreadCountCheckerRegistrar.java create mode 100644 threadpool/core/src/test/java/cn/hippo4j/core/executor/plugin/impl/MaximumActiveThreadCountCheckerTest.java diff --git a/threadpool/core/src/main/java/cn/hippo4j/core/executor/plugin/impl/MaximumActiveThreadCountChecker.java b/threadpool/core/src/main/java/cn/hippo4j/core/executor/plugin/impl/MaximumActiveThreadCountChecker.java new file mode 100644 index 00000000..210f8a39 --- /dev/null +++ b/threadpool/core/src/main/java/cn/hippo4j/core/executor/plugin/impl/MaximumActiveThreadCountChecker.java @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.executor.plugin.impl; + +import cn.hippo4j.core.executor.plugin.ExecuteAwarePlugin; +import cn.hippo4j.core.executor.plugin.PluginRuntime; +import cn.hippo4j.core.executor.plugin.manager.MaximumActiveThreadCountCheckerRegistrar; +import cn.hippo4j.core.executor.plugin.manager.ThreadPoolPluginSupport; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + *

A plugin that checks whether the number of active threads in a thread pool exceeds the maximum thread count. + * + *

When task is submitted to the thread pool, before the current worker thread executes the task: + *

    + *
  1. check whether the maximum number of threads in the thread pool is exceeded;
  2. + *
  3. + * if already exceeded, interrupt the worker thread, + * and throw an {@link IllegalMaximumActiveCountException} exception to destroy the worker thread; + *
  4. + *
+ * + *

NOTE: if custom {@link Thread.UncaughtExceptionHandler} is set for the thread pool, + * it may catch the {@link IllegalMaximumActiveCountException} exception and cause the worker thread to not be destroyed. + * + * @see MaximumActiveThreadCountCheckerRegistrar + * @see IllegalMaximumActiveCountException + */ +@RequiredArgsConstructor +@Slf4j +public class MaximumActiveThreadCountChecker implements ExecuteAwarePlugin { + + /** + * Thread pool. + */ + public final ThreadPoolPluginSupport threadPoolPluginSupport; + + /** + *

A pointer to a number object used to record the number of threads in the thread pool + * that exceeds the maximum thread count. + * There are two possible scenarios: + *

+ * + *

After performing operations based on the original number object pointed by the pointer, + * the pointer may change: + *

+ */ + public final AtomicReference overflowThreadNumber = new AtomicReference<>(null); + + /** + *

Set the number of overflow threads. + * + * @param number number of overflow threads + */ + public void setOverflowThreadNumber(int number) { + AtomicInteger currentOverflowThreads = (number > 0) ? new AtomicInteger(number) : null; + overflowThreadNumber.set(currentOverflowThreads); + } + + /** + *

Get the number of overflow threads. + */ + public Integer getOverflowThreadNumber() { + AtomicInteger currentOverflowThreads = overflowThreadNumber.get(); + return currentOverflowThreads == null ? 0 : currentOverflowThreads.get(); + } + + /** + *

Check {@link ThreadPoolExecutor#getActiveCount()} whether greater than {@link ThreadPoolExecutor#getMaximumPoolSize()}. + * if the number of threads in the thread pool exceeds the maximum thread count, set the number of overflow threads. + * + * @return number of overflow threads + * @see #setOverflowThreadNumber(int) + */ + public Integer checkOverflowThreads() { + ThreadPoolExecutor executor = threadPoolPluginSupport.getThreadPoolExecutor(); + int activeCount = executor.getActiveCount(); + int maximumActiveCount = executor.getMaximumPoolSize(); + int number = activeCount > maximumActiveCount ? activeCount - maximumActiveCount : 0; + if (number > 0) { + setOverflowThreadNumber(number); + } + return number; + } + + /** + * Get plugin runtime info. + * + * @return plugin runtime info + */ + @Override + public PluginRuntime getPluginRuntime() { + return new PluginRuntime(getId()) + .addInfo("overflowThreadNumber", getOverflowThreadNumber()); + } + + /** + *

Check the maximum number of threads in the thread pool before the task is executed, + * if the maximum number of threads is exceeded, + * an {@link IllegalMaximumActiveCountException} will be thrown. + * + * @param thread thread of executing task + * @param runnable task + * @throws IllegalMaximumActiveCountException thread if the maximum number of threads is exceeded + * @see ThreadPoolExecutor#beforeExecute + */ + @Override + public void beforeExecute(Thread thread, Runnable runnable) { + if (!hasOverflowThread()) { + return; + } + log.warn( + "The maximum number of threads in the thread pool '{}' has been exceeded, task '{}' will redelivery", + threadPoolPluginSupport.getThreadPoolId(), runnable); + submitTaskAfterCheckFail(runnable); + interruptAndThrowException(thread); + } + + /** + *

check the maximum number of threads in the thread pool after the task is executed. + * + * @return true if the maximum number of threads in the thread pool has been exceeded + */ + @SuppressWarnings("all") + private boolean hasOverflowThread() { + AtomicInteger current; + + // has overflow thread? + while ((current = overflowThreadNumber.get()) != null) { + int curr = current.get(); + + // it's aready no overflow thread + if (curr < 1) { + // check whether the current value is effective, if not, try again + if (overflowThreadNumber.compareAndSet(current, null)) { + continue; + } + return false; + } + + // has overflow thread, try to cas to decrease the number of overflow threads + int expect = curr - 1; + if (!current.compareAndSet(curr, expect)) { + // cas fail, the current value is uneffective, try again + continue; + } + + /* + cas success, decrease the number of overflow threads, + then, use cas to check the current value is effective: + + 1.if already no overflow thread, set the overflowThreadNumber to null; + 2.if still has other overflow thread, keep the overflowThreadNumber reference unchanged + */ + AtomicInteger newRef = (expect > 0) ? current : null; + if (overflowThreadNumber.compareAndSet(current, newRef)) { + + /* + cas success, it's means the current value is effective. + + if this thread is the last overflow thread, + and already set the overflowThreadNumber to null, + then we return true to throw an exception. + */ + if (expect == 0) { + return true; + } + + /* + if this thread is not the last overflow thread, + the value may still be decreased by other threads before this step. + + 1.if current value still greater than 0, + we must try again to check whether the current value is effective. + + 2.if current value is set to null, + it's means that after the current value is decremented to 0 by other threads, it is set to null. + + in addition, here may be a ABA problem: + when cas success, if the current value is still greater than 0, + its means still has other overflow thread can set it to null after the value is cas to 0, + we can't determine whether the null value is set by same check, + so we may make an error and throw an exception in the next check after the current check. + it's low probability, and the error is acceptable. + */ + AtomicInteger nc = overflowThreadNumber.get(); + if (nc == current || nc == null) { + return true; + } + } + // the current value is uneffective, try again + } + + // no overflow thread + return false; + } + + /** + *

Submit task to thread pool after check fail. + * + * @param runnable task + */ + protected void submitTaskAfterCheckFail(Runnable runnable) { + if (runnable instanceof RunnableFuture) { + RunnableFuture future = (RunnableFuture) runnable; + if (future.isDone()) { + return; + } + if (future.isCancelled()) { + return; + } + } + threadPoolPluginSupport.getThreadPoolExecutor().execute(runnable); + } + + private void interruptAndThrowException(Thread thread) { + thread.interrupt(); + throw new IllegalMaximumActiveCountException( + "The maximum number of threads in the thread pool '" + threadPoolPluginSupport.getThreadPoolId() + + "' has been exceeded."); + } + + /** + * A {@link RuntimeException} that indicates that the maximum number of threads in the thread pool has been exceeded. + */ + public static class IllegalMaximumActiveCountException extends RuntimeException { + + /** + * Constructs a new runtime exception with the specified detail message. + * The cause is not initialized, and may subsequently be initialized by a + * call to {@link #initCause}. + * + * @param message the detail message. The detail message is saved for + * later retrieval by the {@link #getMessage()} method. + */ + public IllegalMaximumActiveCountException(String message) { + super(message); + } + } +} diff --git a/threadpool/core/src/main/java/cn/hippo4j/core/executor/plugin/manager/MaximumActiveThreadCountCheckerRegistrar.java b/threadpool/core/src/main/java/cn/hippo4j/core/executor/plugin/manager/MaximumActiveThreadCountCheckerRegistrar.java new file mode 100644 index 00000000..1f651eae --- /dev/null +++ b/threadpool/core/src/main/java/cn/hippo4j/core/executor/plugin/manager/MaximumActiveThreadCountCheckerRegistrar.java @@ -0,0 +1,87 @@ +package cn.hippo4j.core.executor.plugin.manager; + +import cn.hippo4j.common.toolkit.Assert; +import cn.hippo4j.core.executor.plugin.impl.MaximumActiveThreadCountChecker; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + *

A register for {@link MaximumActiveThreadCountChecker}.
+ * After {@link #startCheckOverflowThreads} has been invoked, + * it will start a task for check whether the thread pool has overflow threads in the specified interval. + * + * @author huangchengxing + * @see MaximumActiveThreadCountChecker + */ +@Slf4j +@RequiredArgsConstructor +public class MaximumActiveThreadCountCheckerRegistrar implements ThreadPoolPluginRegistrar { + + /** + * registered checkers + */ + private final List checkers = new CopyOnWriteArrayList<>(); + + /** + * scheduled executor service + */ + private final ScheduledExecutorService scheduledExecutorService; + + /** + * task for check whether the thread pool has overflow threads. + */ + private ScheduledFuture task; + + /** + *

Start task for check whether the thread pool has overflow threads. + * if task is already running, it will be canceled and restarted. + */ + public void startCheckOverflowThreads(long checkInterval, TimeUnit checkIntervalTimeUnit) { + Assert.isTrue(checkInterval > 0, "checkInterval must be greater than 0"); + Assert.notNull(checkIntervalTimeUnit, "checkIntervalTimeUnit must not be null"); + if (task != null) { + stopCheckOverflowThreads(); + } + log.info("start check overflow threads task, checkInterval: {}, checkIntervalTimeUnit: {}", checkInterval, checkIntervalTimeUnit); + this.task = scheduledExecutorService.scheduleAtFixedRate( + () -> checkers.forEach(MaximumActiveThreadCountChecker::checkOverflowThreads), + checkInterval, checkInterval, checkIntervalTimeUnit + ); + } + + /** + * Cancel task for check whether the thread pool has overflow threads. + * + * @return true if the task was canceled before it completed normally + */ + public boolean stopCheckOverflowThreads() { + // is acceptable to cancel a task even if it is already running, + // because this behavior will not affect the normal operation of the thread pool + if (task != null && task.cancel(true)) { + log.info("cancel check overflow threads task"); + return true; + } + return false; + } + + /** + * Create and register plugin for the specified thread-pool instance. + * + * @param support thread pool plugin manager delegate + */ + @Override + public void doRegister(ThreadPoolPluginSupport support) { + MaximumActiveThreadCountChecker checker = new MaximumActiveThreadCountChecker(support); + if (log.isDebugEnabled()) { + log.debug("register maximum active thread count checker for thread pool: {}", support.getThreadPoolId()); + } + support.register(checker); + checkers.add(checker); + } +} diff --git a/threadpool/core/src/test/java/cn/hippo4j/core/executor/plugin/impl/MaximumActiveThreadCountCheckerTest.java b/threadpool/core/src/test/java/cn/hippo4j/core/executor/plugin/impl/MaximumActiveThreadCountCheckerTest.java new file mode 100644 index 00000000..07490033 --- /dev/null +++ b/threadpool/core/src/test/java/cn/hippo4j/core/executor/plugin/impl/MaximumActiveThreadCountCheckerTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.executor.plugin.impl; + +import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; +import cn.hippo4j.core.executor.plugin.PluginRuntime; +import cn.hippo4j.core.executor.plugin.manager.DefaultThreadPoolPluginManager; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.*; + +/** + * test for {@link cn.hippo4j.core.executor.plugin.impl.MaximumActiveThreadCountChecker} + * + * @author huangchengxing + */ +public class MaximumActiveThreadCountCheckerTest { + + @Test + public void testGetPluginRuntime() { + ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor( + "test", new DefaultThreadPoolPluginManager(), + 5, 5, 0L, TimeUnit.MILLISECONDS, + new ArrayBlockingQueue<>(10), t -> new Thread(t, UUID.randomUUID().toString()), new ThreadPoolExecutor.AbortPolicy()); + MaximumActiveThreadCountChecker checker = new MaximumActiveThreadCountChecker(executor); + Assert.assertEquals(checker.getClass().getSimpleName(), checker.getPluginRuntime().getPluginId()); + // check plugin info + checker.checkOverflowThreads(); + List infoList = checker.getPluginRuntime().getInfoList(); + Assert.assertEquals(1, infoList.size()); + Assert.assertEquals("overflowThreadNumber", infoList.get(0).getName()); + Assert.assertEquals(0, infoList.get(0).getValue()); + } + + @Test + public void testWhenEnableSubmitTaskAfterCheckFail() { + int maximumThreadNum = 3; + BlockingQueue queue = new ArrayBlockingQueue<>(1); + ExtensibleThreadPoolExecutor executor = new ExtensibleThreadPoolExecutor( + "test", new DefaultThreadPoolPluginManager(), + maximumThreadNum, maximumThreadNum, 0L, TimeUnit.MILLISECONDS, + queue, t -> new Thread(t, UUID.randomUUID().toString()), new ThreadPoolExecutor.AbortPolicy()); + MaximumActiveThreadCountChecker checker = new MaximumActiveThreadCountChecker(executor); + executor.register(checker); + + // create 2 workers and block them + CountDownLatch latch1 = submitTaskForBlockingThread(maximumThreadNum - 1, executor); + try { + // wait for the 2 workers to be executed task + Thread.sleep(500L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + /* + * after 2 worker blocked, + * submit task and create the last worker, + * make overflow thread number to 1 + */ + CountDownLatch latch2 = submitTaskForBlockingThread(1, executor); + executor.setCorePoolSize(maximumThreadNum - 1); + executor.setMaximumPoolSize(maximumThreadNum - 1); + checker.setOverflowThreadNumber(1); + + // wait for plugin to re-deliver the task to the queue + try { + Thread.sleep(500L); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + Assert.assertEquals(1, queue.size()); + // last worker destroyed due to the exception which thrown by plugin + Assert.assertEquals(2, executor.getActiveCount()); + + // free resources + latch1.countDown(); + latch2.countDown(); + executor.shutdown(); + while (executor.isTerminated()) { + } + } + + private CountDownLatch submitTaskForBlockingThread(int num, ThreadPoolExecutor executor) { + CountDownLatch latch = new CountDownLatch(1); + for (int i = 0; i < num; i++) { + Runnable runnable = () -> { + System.out.println(Thread.currentThread().getName() + " start"); + try { + latch.await(); + } catch (InterruptedException e) { + System.out.println(Thread.currentThread().getName() + " has been interrupted"); + } + System.out.println(Thread.currentThread().getName() + " completed"); + }; + System.out.println("submit task@" + runnable.hashCode()); + executor.execute(runnable); + } + return latch; + } +}