|
|
@ -47,22 +47,19 @@ public class ReportingEventExecutor implements Runnable, CommandLineRunner, Disp
|
|
|
|
private final ServerHealthCheck serverHealthCheck;
|
|
|
|
private final ServerHealthCheck serverHealthCheck;
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
/**
|
|
|
|
* 数据采集集合
|
|
|
|
* 数据采集组件集合
|
|
|
|
*/
|
|
|
|
*/
|
|
|
|
private Map<String, Collector> collectors;
|
|
|
|
private Map<String, Collector> collectors;
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
/**
|
|
|
|
* 数据采集的缓冲容器, 等待 ReportingEventExecutor 上报服务端
|
|
|
|
* 数据采集的缓冲容器, 等待 ReportingEventExecutor 上报服务端
|
|
|
|
*/
|
|
|
|
*/
|
|
|
|
private final BlockingQueue<Message> messageCollectVessel = new ArrayBlockingQueue(4096);
|
|
|
|
private BlockingQueue<Message> messageCollectVessel;
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
/**
|
|
|
|
* 数据采集定时执行器, Spring 启动后延时一段时间进行采集动态线程池的运行数据
|
|
|
|
* 数据采集定时执行器, Spring 启动后延时一段时间进行采集动态线程池的运行数据
|
|
|
|
*/
|
|
|
|
*/
|
|
|
|
private final ScheduledThreadPoolExecutor collectVesselExecutor = new ScheduledThreadPoolExecutor(
|
|
|
|
private ScheduledThreadPoolExecutor collectVesselExecutor;
|
|
|
|
new Integer(1),
|
|
|
|
|
|
|
|
ThreadFactoryBuilder.builder().daemon(true).prefix("collect-data-scheduled").build()
|
|
|
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@SneakyThrows
|
|
|
|
@SneakyThrows
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
@ -79,13 +76,32 @@ public class ReportingEventExecutor implements Runnable, CommandLineRunner, Disp
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
@Override
|
|
|
|
public void run(String... args) {
|
|
|
|
public void run(String... args) {
|
|
|
|
long initialDelay = 100000;
|
|
|
|
if (properties.getEnableCollect()) {
|
|
|
|
String reportingTaskName = "reporting-task";
|
|
|
|
Integer bufferSize = properties.getTaskBufferSize();
|
|
|
|
|
|
|
|
messageCollectVessel = new ArrayBlockingQueue(bufferSize);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
String collectVesselTaskName = "collect-data-scheduled";
|
|
|
|
|
|
|
|
collectVesselExecutor = new ScheduledThreadPoolExecutor(
|
|
|
|
|
|
|
|
new Integer(1),
|
|
|
|
|
|
|
|
ThreadFactoryBuilder.builder().daemon(true).prefix(collectVesselTaskName).build()
|
|
|
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
|
|
// 延迟 initialDelay 后循环调用. scheduleWithFixedDelay 每次执行时间为上一次任务结束时, 向后推一个时间间隔
|
|
|
|
// 延迟 initialDelay 后循环调用. scheduleWithFixedDelay 每次执行时间为上一次任务结束时, 向后推一个时间间隔
|
|
|
|
collectVesselExecutor.scheduleWithFixedDelay(() -> runTimeGatherTask(), initialDelay, properties.getCollectInterval(), TimeUnit.MILLISECONDS);
|
|
|
|
collectVesselExecutor.scheduleWithFixedDelay(
|
|
|
|
|
|
|
|
() -> runTimeGatherTask(),
|
|
|
|
|
|
|
|
properties.getInitialDelay(),
|
|
|
|
|
|
|
|
properties.getCollectInterval(),
|
|
|
|
|
|
|
|
TimeUnit.MILLISECONDS
|
|
|
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 启动上报监控数据线程
|
|
|
|
|
|
|
|
String reportingTaskName = "reporting-task";
|
|
|
|
ThreadUtil.newThread(this, reportingTaskName, Boolean.TRUE).start();
|
|
|
|
ThreadUtil.newThread(this, reportingTaskName, Boolean.TRUE).start();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 获取所有数据采集组件, 目前仅有历史运行数据采集
|
|
|
|
collectors = ApplicationContextHolder.getBeansOfType(Collector.class);
|
|
|
|
collectors = ApplicationContextHolder.getBeansOfType(Collector.class);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
log.info("Dynamic thread pool :: [{}]. The dynamic thread pool starts data collection and reporting. ", getThreadPoolNum());
|
|
|
|
log.info("Dynamic thread pool :: [{}]. The dynamic thread pool starts data collection and reporting. ", getThreadPoolNum());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|