* feat: add none implementation for ThreadPoolCheckAlarm

* fix: Alarm plugin supports dependency injection through the constructor (#1001)
pull/1042/head
黄成兴 2 years ago committed by GitHub
parent b453d75df7
commit 7ab9fc36a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -17,6 +17,9 @@
package cn.hippo4j.common.api; package cn.hippo4j.common.api;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner; import org.springframework.boot.CommandLineRunner;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@ -29,6 +32,16 @@ import java.util.concurrent.ThreadPoolExecutor;
*/ */
public interface ThreadPoolCheckAlarm extends CommandLineRunner { public interface ThreadPoolCheckAlarm extends CommandLineRunner {
/**
* Get a none thread pool check alarm.
*
* @return {@link ThreadPoolCheckAlarm}
* @see NoneThreadPoolCheckAlarm
*/
static ThreadPoolCheckAlarm none() {
return NoneThreadPoolCheckAlarm.INSTANCE;
}
/** /**
* Check pool capacity alarm. * Check pool capacity alarm.
* *
@ -61,4 +74,69 @@ public interface ThreadPoolCheckAlarm extends CommandLineRunner {
* @param threadPoolExecutor thread-pool executor * @param threadPoolExecutor thread-pool executor
*/ */
void asyncSendExecuteTimeOutAlarm(String threadPoolId, long executeTime, long executeTimeOut, ThreadPoolExecutor threadPoolExecutor); void asyncSendExecuteTimeOutAlarm(String threadPoolId, long executeTime, long executeTimeOut, ThreadPoolExecutor threadPoolExecutor);
/**
* None implementation of {@link ThreadPoolCheckAlarm}.
*
* @see #none()
*/
@Slf4j
@NoArgsConstructor(access = AccessLevel.PRIVATE)
class NoneThreadPoolCheckAlarm implements ThreadPoolCheckAlarm {
/**
* Default singleton.
*/
private static final NoneThreadPoolCheckAlarm INSTANCE = new NoneThreadPoolCheckAlarm();
/**
* Check pool capacity alarm.
*
* @param threadPoolId thread-pool id
* @param threadPoolExecutor thread-pool executor
*/
@Override
public void checkPoolCapacityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) {
log.info("Ignore check pool capacity alarm for ExecuteService '{}'", threadPoolId);
}
/**
* Check pool activity alarm.
*
* @param threadPoolId thread-pool id
* @param threadPoolExecutor thread-pool executor
*/
@Override
public void checkPoolActivityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) {
log.info("Ignore check pool activity alarm for ExecuteService '{}'", threadPoolId);
}
/**
* Async send rejected alarm.
*
* @param threadPoolId thread-pool id
*/
@Override
public void asyncSendRejectedAlarm(String threadPoolId) {
log.info("Ignore async send rejected alarm for ExecuteService '{}'", threadPoolId);
}
/**
* Async send execute time-out alarm.
*
* @param threadPoolId thread-pool id
* @param executeTime execute time
* @param executeTimeOut execute time-out
* @param threadPoolExecutor thread-pool executor
*/
@Override
public void asyncSendExecuteTimeOutAlarm(String threadPoolId, long executeTime, long executeTimeOut, ThreadPoolExecutor threadPoolExecutor) {
log.info("Ignore async send execute time out alarm for ExecuteService '{}'", threadPoolId);
}
@Override
public void run(String... args) throws Exception {
// do nothing
}
}
} }

@ -21,6 +21,8 @@ import cn.hippo4j.common.api.ThreadPoolCheckAlarm;
import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.RejectedAwarePlugin; import cn.hippo4j.core.plugin.RejectedAwarePlugin;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@ -28,10 +30,27 @@ import java.util.concurrent.ThreadPoolExecutor;
/** /**
* Send alert notification when a task is rejected. * Send alert notification when a task is rejected.
*/ */
@RequiredArgsConstructor
public class TaskRejectNotifyAlarmPlugin implements RejectedAwarePlugin { public class TaskRejectNotifyAlarmPlugin implements RejectedAwarePlugin {
public static final String PLUGIN_NAME = TaskRejectNotifyAlarmPlugin.class.getSimpleName(); public static final String PLUGIN_NAME = TaskRejectNotifyAlarmPlugin.class.getSimpleName();
/**
* Thread pool check alarm
*/
@NonNull
private final ThreadPoolCheckAlarm threadPoolCheckAlarm;
/**
* Create a {@link TaskRejectNotifyAlarmPlugin}
*/
public TaskRejectNotifyAlarmPlugin() {
this(
Optional.ofNullable(ApplicationContextHolder.getInstance())
.map(context -> context.getBean(ThreadPoolCheckAlarm.class))
.orElseGet(ThreadPoolCheckAlarm::none));
}
/** /**
* Callback before task is rejected. * Callback before task is rejected.
* *
@ -44,8 +63,6 @@ public class TaskRejectNotifyAlarmPlugin implements RejectedAwarePlugin {
return; return;
} }
String threadPoolId = ((ExtensibleThreadPoolExecutor) executor).getThreadPoolId(); String threadPoolId = ((ExtensibleThreadPoolExecutor) executor).getThreadPoolId();
Optional.ofNullable(ApplicationContextHolder.getInstance()) threadPoolCheckAlarm.asyncSendRejectedAlarm(threadPoolId);
.map(context -> context.getBean(ThreadPoolCheckAlarm.class))
.ifPresent(handler -> handler.asyncSendRejectedAlarm(threadPoolId));
} }
} }

@ -22,6 +22,7 @@ import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.core.plugin.PluginRuntime; import cn.hippo4j.core.plugin.PluginRuntime;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;
import lombok.NonNull;
import lombok.Setter; import lombok.Setter;
import java.util.Optional; import java.util.Optional;
@ -52,6 +53,28 @@ public class TaskTimeoutNotifyAlarmPlugin extends AbstractTaskTimerPlugin {
*/ */
private final ThreadPoolExecutor threadPoolExecutor; private final ThreadPoolExecutor threadPoolExecutor;
/**
* Thread pool check alarm
* TODO Complete dependency injection through the thread plugin registrar
*/
@NonNull
private final ThreadPoolCheckAlarm threadPoolCheckAlarm;
/**
* Create a {@link TaskTimeoutNotifyAlarmPlugin}.
*
* @param threadPoolId thread pool id
* @param executeTimeOut execute time out
* @param threadPoolExecutor thread pool executor
*/
public TaskTimeoutNotifyAlarmPlugin(String threadPoolId, Long executeTimeOut, ThreadPoolExecutor threadPoolExecutor) {
this(
threadPoolId, executeTimeOut, threadPoolExecutor,
Optional.ofNullable(ApplicationContextHolder.getInstance())
.map(context -> context.getBean(ThreadPoolCheckAlarm.class))
.orElseGet(ThreadPoolCheckAlarm::none));
}
/** /**
* Get plugin runtime info. * Get plugin runtime info.
* *
@ -74,9 +97,6 @@ public class TaskTimeoutNotifyAlarmPlugin extends AbstractTaskTimerPlugin {
if (taskExecuteTime <= executeTimeOut) { if (taskExecuteTime <= executeTimeOut) {
return; return;
} }
Optional.ofNullable(ApplicationContextHolder.getInstance()) threadPoolCheckAlarm.asyncSendExecuteTimeOutAlarm(threadPoolId, taskExecuteTime, executeTimeOut, threadPoolExecutor);
.map(context -> context.getBean(ThreadPoolCheckAlarm.class))
.ifPresent(handler -> handler.asyncSendExecuteTimeOutAlarm(
threadPoolId, taskExecuteTime, executeTimeOut, threadPoolExecutor));
} }
} }

@ -17,10 +17,11 @@
package cn.hippo4j.core.plugin.impl; package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.api.ThreadPoolCheckAlarm;
import cn.hippo4j.common.toolkit.ThreadUtil; import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager; import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager;
import lombok.RequiredArgsConstructor; import lombok.Getter;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -51,8 +52,8 @@ public class TaskRejectNotifyAlarmPluginTest {
1, 1, 1000L, TimeUnit.MILLISECONDS, 1, 1, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy()); new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.DiscardPolicy());
AtomicInteger rejectCount = new AtomicInteger(0); TestAlarm alarm = new TestAlarm();
executor.register(new TestPlugin(rejectCount, executor)); executor.register(new TaskRejectNotifyAlarmPlugin(alarm));
executor.submit(() -> ThreadUtil.sleep(200L)); executor.submit(() -> ThreadUtil.sleep(200L));
executor.submit(() -> ThreadUtil.sleep(200L)); executor.submit(() -> ThreadUtil.sleep(200L));
executor.submit(() -> ThreadUtil.sleep(200L)); executor.submit(() -> ThreadUtil.sleep(200L));
@ -61,26 +62,32 @@ public class TaskRejectNotifyAlarmPluginTest {
executor.shutdown(); executor.shutdown();
while (!executor.isTerminated()) { while (!executor.isTerminated()) {
} }
Assert.assertEquals(1, rejectCount.get()); Assert.assertEquals(1, alarm.getNumberOfAlarms().get());
} }
@RequiredArgsConstructor private static class TestAlarm implements ThreadPoolCheckAlarm {
private static class TestPlugin extends TaskRejectNotifyAlarmPlugin {
private final AtomicInteger count; @Getter
private final ThreadPoolExecutor targetExecutor; private final AtomicInteger numberOfAlarms = new AtomicInteger(0);
@Override
/** public void checkPoolCapacityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) {
* Callback before task is rejected. // do noting
* }
* @param runnable task @Override
* @param executor executor public void checkPoolActivityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) {
*/ // do noting
}
@Override
public void asyncSendRejectedAlarm(String threadPoolId) {
numberOfAlarms.incrementAndGet();
}
@Override @Override
public void beforeRejectedExecution(Runnable runnable, ThreadPoolExecutor executor) { public void asyncSendExecuteTimeOutAlarm(String threadPoolId, long executeTime, long executeTimeOut, ThreadPoolExecutor threadPoolExecutor) {
count.incrementAndGet(); // do noting
Assert.assertEquals(targetExecutor, executor); }
super.beforeRejectedExecution(runnable, executor); @Override
public void run(String... args) throws Exception {
} }
} }

@ -17,9 +17,11 @@
package cn.hippo4j.core.plugin.impl; package cn.hippo4j.core.plugin.impl;
import cn.hippo4j.common.api.ThreadPoolCheckAlarm;
import cn.hippo4j.common.toolkit.ThreadUtil; import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor; import cn.hippo4j.core.executor.ExtensibleThreadPoolExecutor;
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager; import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager;
import lombok.Getter;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -38,8 +40,10 @@ public class TaskTimeoutNotifyAlarmPluginTest {
5, 5, 1000L, TimeUnit.MILLISECONDS, 5, 5, 1000L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.AbortPolicy()); new ArrayBlockingQueue<>(1), Thread::new, new ThreadPoolExecutor.AbortPolicy());
private final TestAlarm testAlarm = new TestAlarm();
private final TaskTimeoutNotifyAlarmPlugin plugin = new TaskTimeoutNotifyAlarmPlugin( private final TaskTimeoutNotifyAlarmPlugin plugin = new TaskTimeoutNotifyAlarmPlugin(
executor.getThreadPoolId(), 100L, executor); executor.getThreadPoolId(), 1L, executor, testAlarm);
@Test @Test
public void testGetId() { public void testGetId() {
@ -53,26 +57,23 @@ public class TaskTimeoutNotifyAlarmPluginTest {
@Test @Test
public void testGetExecuteTimeOut() { public void testGetExecuteTimeOut() {
Assert.assertEquals(100L, plugin.getExecuteTimeOut().longValue()); Assert.assertEquals(1L, plugin.getExecuteTimeOut().longValue());
} }
@Test @Test
public void testSetExecuteTimeOut() { public void testSetExecuteTimeOut() {
plugin.setExecuteTimeOut(200L); plugin.setExecuteTimeOut(2L);
Assert.assertEquals(200L, plugin.getExecuteTimeOut().longValue()); Assert.assertEquals(2L, plugin.getExecuteTimeOut().longValue());
} }
@Test @Test
public void testProcessTaskTime() { public void testProcessTaskTime() {
executor.register(plugin); executor.register(plugin);
AtomicInteger count = new AtomicInteger(0);
executor.submit(() -> { executor.submit(() -> {
count.incrementAndGet();
ThreadUtil.sleep(100L); ThreadUtil.sleep(100L);
}); });
executor.submit(() -> { executor.submit(() -> {
count.incrementAndGet();
ThreadUtil.sleep(300L); ThreadUtil.sleep(300L);
}); });
@ -80,7 +81,33 @@ public class TaskTimeoutNotifyAlarmPluginTest {
executor.shutdown(); executor.shutdown();
while (!executor.isTerminated()) { while (!executor.isTerminated()) {
} }
Assert.assertEquals(2, count.get()); Assert.assertEquals(2, testAlarm.getNumberOfAlarms().get());
}
private static class TestAlarm implements ThreadPoolCheckAlarm {
@Getter
private final AtomicInteger numberOfAlarms = new AtomicInteger(0);
@Override
public void checkPoolCapacityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) {
// do noting
}
@Override
public void checkPoolActivityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) {
// do noting
}
@Override
public void asyncSendRejectedAlarm(String threadPoolId) {
// do noting
}
@Override
public void asyncSendExecuteTimeOutAlarm(String threadPoolId, long executeTime, long executeTimeOut, ThreadPoolExecutor threadPoolExecutor) {
numberOfAlarms.incrementAndGet();
}
@Override
public void run(String... args) throws Exception {
}
} }
} }

Loading…
Cancel
Save