From 7ab9fc36a5a6b8117714db7fb00acc5efe24ce31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E6=88=90=E5=85=B4?= <49221670+Createsequence@users.noreply.github.com> Date: Mon, 19 Dec 2022 21:09:22 +0800 Subject: [PATCH] Fix #1001 (#1021) * feat: add none implementation for ThreadPoolCheckAlarm * fix: Alarm plugin supports dependency injection through the constructor (#1001) --- .../common/api/ThreadPoolCheckAlarm.java | 78 +++++++++++++++++++ .../impl/TaskRejectNotifyAlarmPlugin.java | 23 +++++- .../impl/TaskTimeoutNotifyAlarmPlugin.java | 28 ++++++- .../impl/TaskRejectNotifyAlarmPluginTest.java | 45 ++++++----- .../TaskTimeoutNotifyAlarmPluginTest.java | 43 ++++++++-- 5 files changed, 183 insertions(+), 34 deletions(-) diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/api/ThreadPoolCheckAlarm.java b/hippo4j-common/src/main/java/cn/hippo4j/common/api/ThreadPoolCheckAlarm.java index 6d4b0768..4b3a3f2a 100644 --- a/hippo4j-common/src/main/java/cn/hippo4j/common/api/ThreadPoolCheckAlarm.java +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/api/ThreadPoolCheckAlarm.java @@ -17,6 +17,9 @@ package cn.hippo4j.common.api; +import lombok.AccessLevel; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.boot.CommandLineRunner; import java.util.concurrent.ThreadPoolExecutor; @@ -29,6 +32,16 @@ import java.util.concurrent.ThreadPoolExecutor; */ public interface ThreadPoolCheckAlarm extends CommandLineRunner { + /** + * Get a none thread pool check alarm. + * + * @return {@link ThreadPoolCheckAlarm} + * @see NoneThreadPoolCheckAlarm + */ + static ThreadPoolCheckAlarm none() { + return NoneThreadPoolCheckAlarm.INSTANCE; + } + /** * Check pool capacity alarm. * @@ -61,4 +74,69 @@ public interface ThreadPoolCheckAlarm extends CommandLineRunner { * @param threadPoolExecutor thread-pool executor */ void asyncSendExecuteTimeOutAlarm(String threadPoolId, long executeTime, long executeTimeOut, ThreadPoolExecutor threadPoolExecutor); + + /** + * None implementation of {@link ThreadPoolCheckAlarm}. + * + * @see #none() + */ + @Slf4j + @NoArgsConstructor(access = AccessLevel.PRIVATE) + class NoneThreadPoolCheckAlarm implements ThreadPoolCheckAlarm { + + /** + * Default singleton. + */ + private static final NoneThreadPoolCheckAlarm INSTANCE = new NoneThreadPoolCheckAlarm(); + + /** + * Check pool capacity alarm. + * + * @param threadPoolId thread-pool id + * @param threadPoolExecutor thread-pool executor + */ + @Override + public void checkPoolCapacityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) { + log.info("Ignore check pool capacity alarm for ExecuteService '{}'", threadPoolId); + } + + /** + * Check pool activity alarm. + * + * @param threadPoolId thread-pool id + * @param threadPoolExecutor thread-pool executor + */ + @Override + public void checkPoolActivityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) { + log.info("Ignore check pool activity alarm for ExecuteService '{}'", threadPoolId); + } + + /** + * Async send rejected alarm. + * + * @param threadPoolId thread-pool id + */ + @Override + public void asyncSendRejectedAlarm(String threadPoolId) { + log.info("Ignore async send rejected alarm for ExecuteService '{}'", threadPoolId); + } + + /** + * Async send execute time-out alarm. + * + * @param threadPoolId thread-pool id + * @param executeTime execute time + * @param executeTimeOut execute time-out + * @param threadPoolExecutor thread-pool executor + */ + @Override + public void asyncSendExecuteTimeOutAlarm(String threadPoolId, long executeTime, long executeTimeOut, ThreadPoolExecutor threadPoolExecutor) { + log.info("Ignore async send execute time out alarm for ExecuteService '{}'", threadPoolId); + } + + @Override + public void run(String... args) throws Exception { + // do nothing + } + } } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPlugin.java index ecc6fff7..b1bb2c78 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPlugin.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPlugin.java @@ -21,6 +21,8 @@ import cn.hippo4j.common.api.ThreadPoolCheckAlarm; import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; import cn.hippo4j.core.plugin.RejectedAwarePlugin; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; import java.util.Optional; import java.util.concurrent.ThreadPoolExecutor; @@ -28,10 +30,27 @@ import java.util.concurrent.ThreadPoolExecutor; /** * Send alert notification when a task is rejected. */ +@RequiredArgsConstructor public class TaskRejectNotifyAlarmPlugin implements RejectedAwarePlugin { public static final String PLUGIN_NAME = TaskRejectNotifyAlarmPlugin.class.getSimpleName(); + /** + * Thread pool check alarm + */ + @NonNull + private final ThreadPoolCheckAlarm threadPoolCheckAlarm; + + /** + * Create a {@link TaskRejectNotifyAlarmPlugin} + */ + public TaskRejectNotifyAlarmPlugin() { + this( + Optional.ofNullable(ApplicationContextHolder.getInstance()) + .map(context -> context.getBean(ThreadPoolCheckAlarm.class)) + .orElseGet(ThreadPoolCheckAlarm::none)); + } + /** * Callback before task is rejected. * @@ -44,8 +63,6 @@ public class TaskRejectNotifyAlarmPlugin implements RejectedAwarePlugin { return; } String threadPoolId = ((ExtensibleThreadPoolExecutor) executor).getThreadPoolId(); - Optional.ofNullable(ApplicationContextHolder.getInstance()) - .map(context -> context.getBean(ThreadPoolCheckAlarm.class)) - .ifPresent(handler -> handler.asyncSendRejectedAlarm(threadPoolId)); + threadPoolCheckAlarm.asyncSendRejectedAlarm(threadPoolId); } } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPlugin.java index ed542e8b..a3026619 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPlugin.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPlugin.java @@ -22,6 +22,7 @@ import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.core.plugin.PluginRuntime; import lombok.AllArgsConstructor; import lombok.Getter; +import lombok.NonNull; import lombok.Setter; import java.util.Optional; @@ -52,6 +53,28 @@ public class TaskTimeoutNotifyAlarmPlugin extends AbstractTaskTimerPlugin { */ private final ThreadPoolExecutor threadPoolExecutor; + /** + * Thread pool check alarm + * TODO Complete dependency injection through the thread plugin registrar + */ + @NonNull + private final ThreadPoolCheckAlarm threadPoolCheckAlarm; + + /** + * Create a {@link TaskTimeoutNotifyAlarmPlugin}. + * + * @param threadPoolId thread pool id + * @param executeTimeOut execute time out + * @param threadPoolExecutor thread pool executor + */ + public TaskTimeoutNotifyAlarmPlugin(String threadPoolId, Long executeTimeOut, ThreadPoolExecutor threadPoolExecutor) { + this( + threadPoolId, executeTimeOut, threadPoolExecutor, + Optional.ofNullable(ApplicationContextHolder.getInstance()) + .map(context -> context.getBean(ThreadPoolCheckAlarm.class)) + .orElseGet(ThreadPoolCheckAlarm::none)); + } + /** * Get plugin runtime info. * @@ -74,9 +97,6 @@ public class TaskTimeoutNotifyAlarmPlugin extends AbstractTaskTimerPlugin { if (taskExecuteTime <= executeTimeOut) { return; } - Optional.ofNullable(ApplicationContextHolder.getInstance()) - .map(context -> context.getBean(ThreadPoolCheckAlarm.class)) - .ifPresent(handler -> handler.asyncSendExecuteTimeOutAlarm( - threadPoolId, taskExecuteTime, executeTimeOut, threadPoolExecutor)); + threadPoolCheckAlarm.asyncSendExecuteTimeOutAlarm(threadPoolId, taskExecuteTime, executeTimeOut, threadPoolExecutor); } } diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPluginTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPluginTest.java index 1842f3ef..8c1f1c65 100644 --- a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPluginTest.java +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPluginTest.java @@ -17,10 +17,11 @@ package cn.hippo4j.core.plugin.impl; +import cn.hippo4j.common.api.ThreadPoolCheckAlarm; import cn.hippo4j.common.toolkit.ThreadUtil; import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager; -import lombok.RequiredArgsConstructor; +import lombok.Getter; import org.junit.Assert; import org.junit.Test; @@ -51,8 +52,8 @@ public class TaskRejectNotifyAlarmPluginTest { 1, 1, 1000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy()); - AtomicInteger rejectCount = new AtomicInteger(0); - executor.register(new TestPlugin(rejectCount, executor)); + TestAlarm alarm = new TestAlarm(); + executor.register(new TaskRejectNotifyAlarmPlugin(alarm)); executor.submit(() -> ThreadUtil.sleep(200L)); executor.submit(() -> ThreadUtil.sleep(200L)); executor.submit(() -> ThreadUtil.sleep(200L)); @@ -61,26 +62,32 @@ public class TaskRejectNotifyAlarmPluginTest { executor.shutdown(); while (!executor.isTerminated()) { } - Assert.assertEquals(1, rejectCount.get()); + Assert.assertEquals(1, alarm.getNumberOfAlarms().get()); } - @RequiredArgsConstructor - private static class TestPlugin extends TaskRejectNotifyAlarmPlugin { + private static class TestAlarm implements ThreadPoolCheckAlarm { - private final AtomicInteger count; - private final ThreadPoolExecutor targetExecutor; - - /** - * Callback before task is rejected. - * - * @param runnable task - * @param executor executor - */ + @Getter + private final AtomicInteger numberOfAlarms = new AtomicInteger(0); + @Override + public void checkPoolCapacityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) { + // do noting + } + @Override + public void checkPoolActivityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) { + // do noting + } @Override - public void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { - count.incrementAndGet(); - Assert.assertEquals(targetExecutor, executor); - super.beforeRejectedExecution(runnable, executor); + public void asyncSendRejectedAlarm(String threadPoolId) { + numberOfAlarms.incrementAndGet(); + } + @Override + public void asyncSendExecuteTimeOutAlarm(String threadPoolId, long executeTime, long executeTimeOut, ThreadPoolExecutor threadPoolExecutor) { + // do noting + } + @Override + public void run(String... args) throws Exception { + } } diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPluginTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPluginTest.java index 732d4915..92c7e310 100644 --- a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPluginTest.java +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPluginTest.java @@ -17,9 +17,11 @@ package cn.hippo4j.core.plugin.impl; +import cn.hippo4j.common.api.ThreadPoolCheckAlarm; import cn.hippo4j.common.toolkit.ThreadUtil; import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager; +import lombok.Getter; import org.junit.Assert; import org.junit.Test; @@ -38,8 +40,10 @@ public class TaskTimeoutNotifyAlarmPluginTest { 5, 5, 1000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.AbortPolicy()); + private final TestAlarm testAlarm = new TestAlarm(); + private final TaskTimeoutNotifyAlarmPlugin plugin = new TaskTimeoutNotifyAlarmPlugin( - executor.getThreadPoolId(), 100L, executor); + executor.getThreadPoolId(), 1L, executor, testAlarm); @Test public void testGetId() { @@ -53,26 +57,23 @@ public class TaskTimeoutNotifyAlarmPluginTest { @Test public void testGetExecuteTimeOut() { - Assert.assertEquals(100L, plugin.getExecuteTimeOut().longValue()); + Assert.assertEquals(1L, plugin.getExecuteTimeOut().longValue()); } @Test public void testSetExecuteTimeOut() { - plugin.setExecuteTimeOut(200L); - Assert.assertEquals(200L, plugin.getExecuteTimeOut().longValue()); + plugin.setExecuteTimeOut(2L); + Assert.assertEquals(2L, plugin.getExecuteTimeOut().longValue()); } @Test public void testProcessTaskTime() { executor.register(plugin); - AtomicInteger count = new AtomicInteger(0); executor.submit(() -> { - count.incrementAndGet(); ThreadUtil.sleep(100L); }); executor.submit(() -> { - count.incrementAndGet(); ThreadUtil.sleep(300L); }); @@ -80,7 +81,33 @@ public class TaskTimeoutNotifyAlarmPluginTest { executor.shutdown(); while (!executor.isTerminated()) { } - Assert.assertEquals(2, count.get()); + Assert.assertEquals(2, testAlarm.getNumberOfAlarms().get()); + } + + private static class TestAlarm implements ThreadPoolCheckAlarm { + + @Getter + private final AtomicInteger numberOfAlarms = new AtomicInteger(0); + @Override + public void checkPoolCapacityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) { + // do noting + } + @Override + public void checkPoolActivityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) { + // do noting + } + @Override + public void asyncSendRejectedAlarm(String threadPoolId) { + // do noting + } + @Override + public void asyncSendExecuteTimeOutAlarm(String threadPoolId, long executeTime, long executeTimeOut, ThreadPoolExecutor threadPoolExecutor) { + numberOfAlarms.incrementAndGet(); + } + @Override + public void run(String... args) throws Exception { + + } } }