diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPlugin.java index ecc6fff7..b1bb2c78 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPlugin.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPlugin.java @@ -21,6 +21,8 @@ import cn.hippo4j.common.api.ThreadPoolCheckAlarm; import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; import cn.hippo4j.core.plugin.RejectedAwarePlugin; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; import java.util.Optional; import java.util.concurrent.ThreadPoolExecutor; @@ -28,10 +30,27 @@ import java.util.concurrent.ThreadPoolExecutor; /** * Send alert notification when a task is rejected. */ +@RequiredArgsConstructor public class TaskRejectNotifyAlarmPlugin implements RejectedAwarePlugin { public static final String PLUGIN_NAME = TaskRejectNotifyAlarmPlugin.class.getSimpleName(); + /** + * Thread pool check alarm + */ + @NonNull + private final ThreadPoolCheckAlarm threadPoolCheckAlarm; + + /** + * Create a {@link TaskRejectNotifyAlarmPlugin} + */ + public TaskRejectNotifyAlarmPlugin() { + this( + Optional.ofNullable(ApplicationContextHolder.getInstance()) + .map(context -> context.getBean(ThreadPoolCheckAlarm.class)) + .orElseGet(ThreadPoolCheckAlarm::none)); + } + /** * Callback before task is rejected. * @@ -44,8 +63,6 @@ public class TaskRejectNotifyAlarmPlugin implements RejectedAwarePlugin { return; } String threadPoolId = ((ExtensibleThreadPoolExecutor) executor).getThreadPoolId(); - Optional.ofNullable(ApplicationContextHolder.getInstance()) - .map(context -> context.getBean(ThreadPoolCheckAlarm.class)) - .ifPresent(handler -> handler.asyncSendRejectedAlarm(threadPoolId)); + threadPoolCheckAlarm.asyncSendRejectedAlarm(threadPoolId); } } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPlugin.java b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPlugin.java index ed542e8b..a3026619 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPlugin.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPlugin.java @@ -22,6 +22,7 @@ import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.core.plugin.PluginRuntime; import lombok.AllArgsConstructor; import lombok.Getter; +import lombok.NonNull; import lombok.Setter; import java.util.Optional; @@ -52,6 +53,28 @@ public class TaskTimeoutNotifyAlarmPlugin extends AbstractTaskTimerPlugin { */ private final ThreadPoolExecutor threadPoolExecutor; + /** + * Thread pool check alarm + * TODO Complete dependency injection through the thread plugin registrar + */ + @NonNull + private final ThreadPoolCheckAlarm threadPoolCheckAlarm; + + /** + * Create a {@link TaskTimeoutNotifyAlarmPlugin}. + * + * @param threadPoolId thread pool id + * @param executeTimeOut execute time out + * @param threadPoolExecutor thread pool executor + */ + public TaskTimeoutNotifyAlarmPlugin(String threadPoolId, Long executeTimeOut, ThreadPoolExecutor threadPoolExecutor) { + this( + threadPoolId, executeTimeOut, threadPoolExecutor, + Optional.ofNullable(ApplicationContextHolder.getInstance()) + .map(context -> context.getBean(ThreadPoolCheckAlarm.class)) + .orElseGet(ThreadPoolCheckAlarm::none)); + } + /** * Get plugin runtime info. * @@ -74,9 +97,6 @@ public class TaskTimeoutNotifyAlarmPlugin extends AbstractTaskTimerPlugin { if (taskExecuteTime <= executeTimeOut) { return; } - Optional.ofNullable(ApplicationContextHolder.getInstance()) - .map(context -> context.getBean(ThreadPoolCheckAlarm.class)) - .ifPresent(handler -> handler.asyncSendExecuteTimeOutAlarm( - threadPoolId, taskExecuteTime, executeTimeOut, threadPoolExecutor)); + threadPoolCheckAlarm.asyncSendExecuteTimeOutAlarm(threadPoolId, taskExecuteTime, executeTimeOut, threadPoolExecutor); } } diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPluginTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPluginTest.java index 1842f3ef..8c1f1c65 100644 --- a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPluginTest.java +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskRejectNotifyAlarmPluginTest.java @@ -17,10 +17,11 @@ package cn.hippo4j.core.plugin.impl; +import cn.hippo4j.common.api.ThreadPoolCheckAlarm; import cn.hippo4j.common.toolkit.ThreadUtil; import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager; -import lombok.RequiredArgsConstructor; +import lombok.Getter; import org.junit.Assert; import org.junit.Test; @@ -51,8 +52,8 @@ public class TaskRejectNotifyAlarmPluginTest { 1, 1, 1000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy()); - AtomicInteger rejectCount = new AtomicInteger(0); - executor.register(new TestPlugin(rejectCount, executor)); + TestAlarm alarm = new TestAlarm(); + executor.register(new TaskRejectNotifyAlarmPlugin(alarm)); executor.submit(() -> ThreadUtil.sleep(200L)); executor.submit(() -> ThreadUtil.sleep(200L)); executor.submit(() -> ThreadUtil.sleep(200L)); @@ -61,26 +62,32 @@ public class TaskRejectNotifyAlarmPluginTest { executor.shutdown(); while (!executor.isTerminated()) { } - Assert.assertEquals(1, rejectCount.get()); + Assert.assertEquals(1, alarm.getNumberOfAlarms().get()); } - @RequiredArgsConstructor - private static class TestPlugin extends TaskRejectNotifyAlarmPlugin { + private static class TestAlarm implements ThreadPoolCheckAlarm { - private final AtomicInteger count; - private final ThreadPoolExecutor targetExecutor; - - /** - * Callback before task is rejected. - * - * @param runnable task - * @param executor executor - */ + @Getter + private final AtomicInteger numberOfAlarms = new AtomicInteger(0); + @Override + public void checkPoolCapacityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) { + // do noting + } + @Override + public void checkPoolActivityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) { + // do noting + } @Override - public void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { - count.incrementAndGet(); - Assert.assertEquals(targetExecutor, executor); - super.beforeRejectedExecution(runnable, executor); + public void asyncSendRejectedAlarm(String threadPoolId) { + numberOfAlarms.incrementAndGet(); + } + @Override + public void asyncSendExecuteTimeOutAlarm(String threadPoolId, long executeTime, long executeTimeOut, ThreadPoolExecutor threadPoolExecutor) { + // do noting + } + @Override + public void run(String... args) throws Exception { + } } diff --git a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPluginTest.java b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPluginTest.java index 732d4915..92c7e310 100644 --- a/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPluginTest.java +++ b/hippo4j-core/src/test/java/cn/hippo4j/core/plugin/impl/TaskTimeoutNotifyAlarmPluginTest.java @@ -17,9 +17,11 @@ package cn.hippo4j.core.plugin.impl; +import cn.hippo4j.common.api.ThreadPoolCheckAlarm; import cn.hippo4j.common.toolkit.ThreadUtil; import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager; +import lombok.Getter; import org.junit.Assert; import org.junit.Test; @@ -38,8 +40,10 @@ public class TaskTimeoutNotifyAlarmPluginTest { 5, 5, 1000L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.AbortPolicy()); + private final TestAlarm testAlarm = new TestAlarm(); + private final TaskTimeoutNotifyAlarmPlugin plugin = new TaskTimeoutNotifyAlarmPlugin( - executor.getThreadPoolId(), 100L, executor); + executor.getThreadPoolId(), 1L, executor, testAlarm); @Test public void testGetId() { @@ -53,26 +57,23 @@ public class TaskTimeoutNotifyAlarmPluginTest { @Test public void testGetExecuteTimeOut() { - Assert.assertEquals(100L, plugin.getExecuteTimeOut().longValue()); + Assert.assertEquals(1L, plugin.getExecuteTimeOut().longValue()); } @Test public void testSetExecuteTimeOut() { - plugin.setExecuteTimeOut(200L); - Assert.assertEquals(200L, plugin.getExecuteTimeOut().longValue()); + plugin.setExecuteTimeOut(2L); + Assert.assertEquals(2L, plugin.getExecuteTimeOut().longValue()); } @Test public void testProcessTaskTime() { executor.register(plugin); - AtomicInteger count = new AtomicInteger(0); executor.submit(() -> { - count.incrementAndGet(); ThreadUtil.sleep(100L); }); executor.submit(() -> { - count.incrementAndGet(); ThreadUtil.sleep(300L); }); @@ -80,7 +81,33 @@ public class TaskTimeoutNotifyAlarmPluginTest { executor.shutdown(); while (!executor.isTerminated()) { } - Assert.assertEquals(2, count.get()); + Assert.assertEquals(2, testAlarm.getNumberOfAlarms().get()); + } + + private static class TestAlarm implements ThreadPoolCheckAlarm { + + @Getter + private final AtomicInteger numberOfAlarms = new AtomicInteger(0); + @Override + public void checkPoolCapacityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) { + // do noting + } + @Override + public void checkPoolActivityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) { + // do noting + } + @Override + public void asyncSendRejectedAlarm(String threadPoolId) { + // do noting + } + @Override + public void asyncSendExecuteTimeOutAlarm(String threadPoolId, long executeTime, long executeTimeOut, ThreadPoolExecutor threadPoolExecutor) { + numberOfAlarms.incrementAndGet(); + } + @Override + public void run(String... args) throws Exception { + + } } }