Optimize thread pool monitoring task assembly

pull/457/head
chen.ma 2 years ago
parent 1807bc03b3
commit 7e2933f3dd

@ -19,6 +19,7 @@ package cn.hippo4j.springboot.starter.monitor;
import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.monitor.Message; import cn.hippo4j.common.monitor.Message;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.ThreadUtil; import cn.hippo4j.common.toolkit.ThreadUtil;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.ThreadFactoryBuilder; import cn.hippo4j.core.executor.support.ThreadFactoryBuilder;
@ -40,6 +41,7 @@ import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.CommandLineRunner; import org.springframework.boot.CommandLineRunner;
import java.util.Collection; import java.util.Collection;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
@ -78,7 +80,7 @@ public class ReportingEventExecutor implements Runnable, CommandLineRunner, Disp
/** /**
* Thread pool monitoring collection. * Thread pool monitoring collection.
*/ */
private Map<String, ThreadPoolMonitor> threadPoolMonitors; private List<ThreadPoolMonitor> threadPoolMonitors;
/** /**
* Buffer container for data collection, waiting * Buffer container for data collection, waiting
@ -114,14 +116,18 @@ public class ReportingEventExecutor implements Runnable, CommandLineRunner, Disp
collectVesselExecutor = new ScheduledThreadPoolExecutor( collectVesselExecutor = new ScheduledThreadPoolExecutor(
new Integer(collectType.split(",").length), new Integer(collectType.split(",").length),
ThreadFactoryBuilder.builder().daemon(true).prefix("client.scheduled.collect.data").build()); ThreadFactoryBuilder.builder().daemon(true).prefix("client.scheduled.collect.data").build());
if (collectType.contains(MonitorTypeEnum.PROMETHEUS.name().toLowerCase())
|| collectType.contains(MonitorTypeEnum.LOG.name().toLowerCase())
|| collectType.contains(MonitorTypeEnum.ES.name().toLowerCase())) {
// Get all dynamic thread pool monitoring components.
threadPoolMonitors = ApplicationContextHolder.getBeansOfType(ThreadPoolMonitor.class);
Collection<DynamicThreadPoolMonitor> dynamicThreadPoolMonitors = Collection<DynamicThreadPoolMonitor> dynamicThreadPoolMonitors =
DynamicThreadPoolServiceLoader.getSingletonServiceInstances(DynamicThreadPoolMonitor.class); DynamicThreadPoolServiceLoader.getSingletonServiceInstances(DynamicThreadPoolMonitor.class);
dynamicThreadPoolMonitors.stream().filter(each -> collectType.contains(each.getType())).forEach(each -> threadPoolMonitors.put(each.getType(), each)); boolean customerDynamicThreadPoolMonitorFlag = CollectionUtil.isNotEmpty(dynamicThreadPoolMonitors) || (
collectType.contains(MonitorTypeEnum.PROMETHEUS.name().toLowerCase())
|| collectType.contains(MonitorTypeEnum.LOG.name().toLowerCase())
|| collectType.contains(MonitorTypeEnum.ES.name().toLowerCase())
);
if (customerDynamicThreadPoolMonitorFlag) {
// Get all dynamic thread pool monitoring components.
Map<String, ThreadPoolMonitor> threadPoolMonitorMap = ApplicationContextHolder.getBeansOfType(ThreadPoolMonitor.class);
threadPoolMonitorMap.forEach((beanName, monitor) -> threadPoolMonitors.add(monitor));
dynamicThreadPoolMonitors.stream().filter(each -> collectType.contains(each.getType())).forEach(each -> threadPoolMonitors.add(each));
collectVesselExecutor.scheduleWithFixedDelay( collectVesselExecutor.scheduleWithFixedDelay(
() -> dynamicThreadPoolMonitor(), () -> dynamicThreadPoolMonitor(),
properties.getInitialDelay(), properties.getInitialDelay(),
@ -151,8 +157,11 @@ public class ReportingEventExecutor implements Runnable, CommandLineRunner, Disp
Optional.ofNullable(collectVesselExecutor).ifPresent((each) -> each.shutdown()); Optional.ofNullable(collectVesselExecutor).ifPresent((each) -> each.shutdown());
} }
/**
* Running dynamic thread pool monitoring.
*/
private void dynamicThreadPoolMonitor() { private void dynamicThreadPoolMonitor() {
threadPoolMonitors.forEach((beanName, monitor) -> monitor.collect()); threadPoolMonitors.forEach(each -> each.collect());
} }
/** /**

Loading…
Cancel
Save