开发线程池运行历史数据采集上报. (#15)

pull/39/head
chen.ma 3 years ago
parent 862f945f9e
commit 66c4610f34

@ -5,6 +5,10 @@ import cn.hippo4j.common.api.impl.JacksonHandler;
import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.config.ApplicationContextHolder;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import static cn.hippo4j.common.constant.Constants.AVAILABLE_PROCESSORS;
/** /**
* Common config. * Common config.
@ -25,4 +29,16 @@ public class CommonConfig {
return new ApplicationContextHolder(); return new ApplicationContextHolder();
} }
@Bean
@Primary
public ThreadPoolTaskExecutor monitorThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor monitorThreadPool = new ThreadPoolTaskExecutor();
monitorThreadPool.setCorePoolSize(AVAILABLE_PROCESSORS);
monitorThreadPool.setMaxPoolSize(AVAILABLE_PROCESSORS << 1);
monitorThreadPool.setQueueCapacity(4096);
monitorThreadPool.setAllowCoreThreadTimeOut(true);
monitorThreadPool.setAwaitTerminationMillis(5000);
return monitorThreadPool;
}
} }

@ -44,6 +44,7 @@ public class HisRunDataServiceImpl extends ServiceImpl<HisRunDataMapper, HisRunD
.eq(HisRunDataInfo::getTpId, reqDTO.getTpId()) .eq(HisRunDataInfo::getTpId, reqDTO.getTpId())
.eq(HisRunDataInfo::getInstanceId, reqDTO.getInstanceId()) .eq(HisRunDataInfo::getInstanceId, reqDTO.getInstanceId())
.between(HisRunDataInfo::getTimestamp, startTime, currentDate.getTime()) .between(HisRunDataInfo::getTimestamp, startTime, currentDate.getTime())
.orderByAsc(HisRunDataInfo::getTimestamp)
.list(); .list();
return BeanUtil.convert(hisRunDataInfos, MonitorRespDTO.class); return BeanUtil.convert(hisRunDataInfos, MonitorRespDTO.class);
@ -61,6 +62,7 @@ public class HisRunDataServiceImpl extends ServiceImpl<HisRunDataMapper, HisRunD
.eq(HisRunDataInfo::getTpId, reqDTO.getTpId()) .eq(HisRunDataInfo::getTpId, reqDTO.getTpId())
.eq(HisRunDataInfo::getInstanceId, reqDTO.getInstanceId()) .eq(HisRunDataInfo::getInstanceId, reqDTO.getInstanceId())
.between(HisRunDataInfo::getTimestamp, startTime, currentDate.getTime()) .between(HisRunDataInfo::getTimestamp, startTime, currentDate.getTime())
.orderByAsc(HisRunDataInfo::getTimestamp)
.list(); .list();
List<String> times = Lists.newArrayList(); List<String> times = Lists.newArrayList();

@ -13,6 +13,7 @@ import cn.hippo4j.config.monitor.QueryMonitorExecuteChoose;
import cn.hippo4j.config.service.biz.HisRunDataService; import cn.hippo4j.config.service.biz.HisRunDataService;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import java.util.List; import java.util.List;
@ -33,6 +34,8 @@ public class MonitorController {
private final QueryMonitorExecuteChoose queryMonitorExecuteChoose; private final QueryMonitorExecuteChoose queryMonitorExecuteChoose;
private final ThreadPoolTaskExecutor monitorThreadPoolTaskExecutor;
@GetMapping @GetMapping
public Result<List<MonitorRespDTO>> queryMonitor(MonitorQueryReqDTO reqDTO) { public Result<List<MonitorRespDTO>> queryMonitor(MonitorQueryReqDTO reqDTO) {
List<MonitorRespDTO> monitorRespList = hisRunDataService.query(reqDTO); List<MonitorRespDTO> monitorRespList = hisRunDataService.query(reqDTO);
@ -40,15 +43,24 @@ public class MonitorController {
} }
@PostMapping("/info") @PostMapping("/info")
public Result<MonitorActiveRespDTO> querInfoThreadPoolMonitor(@RequestBody MonitorQueryReqDTO reqDTO) { public Result<MonitorActiveRespDTO> queryInfoThreadPoolMonitor(@RequestBody MonitorQueryReqDTO reqDTO) {
MonitorActiveRespDTO monitorRespList = hisRunDataService.queryInfoThreadPoolMonitor(reqDTO); MonitorActiveRespDTO monitorRespList = hisRunDataService.queryInfoThreadPoolMonitor(reqDTO);
return Results.success(monitorRespList); return Results.success(monitorRespList);
} }
@PostMapping @PostMapping
public Result dataCollect(@RequestBody MessageWrapper messageWrapper) { public Result<Void> dataCollect(@RequestBody MessageWrapper messageWrapper) {
Runnable task = () -> {
Message message = MessageConvert.convert(messageWrapper); Message message = MessageConvert.convert(messageWrapper);
queryMonitorExecuteChoose.chooseAndExecute(message); queryMonitorExecuteChoose.chooseAndExecute(message);
};
try {
monitorThreadPoolTaskExecutor.execute(task);
} catch (Exception ex) {
log.error("Monitoring data insertion database task overflow.", ex);
}
return Results.success(); return Results.success();
} }

@ -20,23 +20,28 @@ public class BootstrapProperties {
public static final String PREFIX = "spring.dynamic.thread-pool"; public static final String PREFIX = "spring.dynamic.thread-pool";
/** /**
* serverAddr * Server addr
*/ */
private String serverAddr; private String serverAddr;
/** /**
* namespace * Namespace
*/ */
private String namespace; private String namespace;
/** /**
* itemId * Item id
*/ */
private String itemId; private String itemId;
/** /**
* Enable banner * Print dynamic thread pool banner
*/ */
private boolean banner = true; private boolean banner = true;
/**
* Time interval for client to collect monitoring data. unit: ms
*/
private Long collectInterval = 5000L;
} }

@ -11,9 +11,10 @@ import cn.hippo4j.starter.core.ThreadPoolOperation;
import cn.hippo4j.starter.enable.MarkerConfiguration; import cn.hippo4j.starter.enable.MarkerConfiguration;
import cn.hippo4j.starter.handler.DynamicThreadPoolBannerHandler; import cn.hippo4j.starter.handler.DynamicThreadPoolBannerHandler;
import cn.hippo4j.starter.handler.ThreadPoolRunStateHandler; import cn.hippo4j.starter.handler.ThreadPoolRunStateHandler;
import cn.hippo4j.starter.monitor.HttpConnectSender;
import cn.hippo4j.starter.monitor.MessageSender;
import cn.hippo4j.starter.monitor.ReportingEventExecutor; import cn.hippo4j.starter.monitor.ReportingEventExecutor;
import cn.hippo4j.starter.monitor.collect.RunTimeInfoCollector;
import cn.hippo4j.starter.monitor.send.HttpConnectSender;
import cn.hippo4j.starter.monitor.send.MessageSender;
import cn.hippo4j.starter.remote.HttpAgent; import cn.hippo4j.starter.remote.HttpAgent;
import cn.hippo4j.starter.remote.HttpScheduledHealthCheck; import cn.hippo4j.starter.remote.HttpScheduledHealthCheck;
import cn.hippo4j.starter.remote.ServerHealthCheck; import cn.hippo4j.starter.remote.ServerHealthCheck;
@ -112,6 +113,11 @@ public class DynamicThreadPoolAutoConfiguration {
return new JacksonHandler(); return new JacksonHandler();
} }
@Bean
public RunTimeInfoCollector runTimeInfoCollector() {
return new RunTimeInfoCollector(properties);
}
} }

@ -1,18 +1,14 @@
package cn.hippo4j.starter.monitor; package cn.hippo4j.starter.monitor;
import cn.hippo4j.common.model.PoolRunStateInfo; import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.monitor.AbstractMessage;
import cn.hippo4j.common.monitor.Message; import cn.hippo4j.common.monitor.Message;
import cn.hippo4j.common.monitor.MessageTypeEnum;
import cn.hippo4j.common.monitor.RuntimeMessage;
import cn.hippo4j.starter.config.BootstrapProperties; import cn.hippo4j.starter.config.BootstrapProperties;
import cn.hippo4j.starter.core.GlobalThreadPoolManage; import cn.hippo4j.starter.monitor.collect.Collector;
import cn.hippo4j.starter.handler.AbstractThreadPoolRuntime; import cn.hippo4j.starter.monitor.send.MessageSender;
import cn.hippo4j.starter.remote.ServerHealthCheck; import cn.hippo4j.starter.remote.ServerHealthCheck;
import cn.hippo4j.starter.toolkit.thread.ThreadFactoryBuilder; import cn.hippo4j.starter.toolkit.thread.ThreadFactoryBuilder;
import cn.hippo4j.starter.toolkit.thread.ThreadUtil; import cn.hippo4j.starter.toolkit.thread.ThreadUtil;
import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollUtil;
import com.google.common.collect.Lists;
import lombok.NonNull; import lombok.NonNull;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows; import lombok.SneakyThrows;
@ -20,7 +16,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.CommandLineRunner; import org.springframework.boot.CommandLineRunner;
import java.util.List; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
@ -28,7 +24,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static cn.hippo4j.starter.core.GlobalThreadPoolManage.getThreadPoolNum; import static cn.hippo4j.starter.core.GlobalThreadPoolManage.getThreadPoolNum;
import static cn.hippo4j.starter.toolkit.IdentifyUtil.getThreadPoolIdentify;
/** /**
* 线. * 线.
@ -40,7 +35,7 @@ import static cn.hippo4j.starter.toolkit.IdentifyUtil.getThreadPoolIdentify;
*/ */
@Slf4j @Slf4j
@RequiredArgsConstructor @RequiredArgsConstructor
public class ReportingEventExecutor extends AbstractThreadPoolRuntime implements Runnable, Collect, CommandLineRunner, DisposableBean { public class ReportingEventExecutor implements Runnable, CommandLineRunner, DisposableBean {
@NonNull @NonNull
private final BootstrapProperties properties; private final BootstrapProperties properties;
@ -51,6 +46,11 @@ public class ReportingEventExecutor extends AbstractThreadPoolRuntime implements
@NonNull @NonNull
private final ServerHealthCheck serverHealthCheck; private final ServerHealthCheck serverHealthCheck;
/**
*
*/
private Map<String, Collector> collectors;
/** /**
* , ReportingEventExecutor * , ReportingEventExecutor
*/ */
@ -79,10 +79,13 @@ public class ReportingEventExecutor extends AbstractThreadPoolRuntime implements
@Override @Override
public void run(String... args) { public void run(String... args) {
// 延迟 10秒后每 5秒调用一次. scheduleWithFixedDelay 每次执行时间为上一次任务结束时, 向后推一个时间间隔 long initialDelay = 100000;
collectVesselExecutor.scheduleWithFixedDelay(() -> runTimeGatherTask(), 10, 5, TimeUnit.SECONDS); String reportingTaskName = "reporting-task";
ThreadUtil.newThread(this, "reporting-task", Boolean.TRUE).start(); // 延迟 initialDelay 后循环调用. scheduleWithFixedDelay 每次执行时间为上一次任务结束时, 向后推一个时间间隔
collectVesselExecutor.scheduleWithFixedDelay(() -> runTimeGatherTask(), initialDelay, properties.getCollectInterval(), TimeUnit.MILLISECONDS);
ThreadUtil.newThread(this, reportingTaskName, Boolean.TRUE).start();
collectors = ApplicationContextHolder.getBeansOfType(Collector.class);
log.info("Dynamic thread pool :: [{}]. The dynamic thread pool starts data collection and reporting. ", getThreadPoolNum()); log.info("Dynamic thread pool :: [{}]. The dynamic thread pool starts data collection and reporting. ", getThreadPoolNum());
} }
@ -96,35 +99,17 @@ public class ReportingEventExecutor extends AbstractThreadPoolRuntime implements
*/ */
private void runTimeGatherTask() { private void runTimeGatherTask() {
boolean healthStatus = serverHealthCheck.isHealthStatus(); boolean healthStatus = serverHealthCheck.isHealthStatus();
if (!healthStatus) { if (!healthStatus || CollUtil.isEmpty(collectors)) {
return; return;
} }
Message message = collectMessage();
messageCollectVessel.offer(message);
}
@Override
public Message collectMessage() {
AbstractMessage message = new RuntimeMessage();
List<Message> runtimeMessages = Lists.newArrayList();
List<String> listThreadPoolId = GlobalThreadPoolManage.listThreadPoolId();
for (String each : listThreadPoolId) {
PoolRunStateInfo poolRunState = getPoolRunState(each);
RuntimeMessage runtimeMessage = BeanUtil.toBean(poolRunState, RuntimeMessage.class);
runtimeMessage.setGroupKey(getThreadPoolIdentify(each, properties));
runtimeMessages.add(runtimeMessage);
}
message.setMessageType(MessageTypeEnum.RUNTIME);
message.setMessages(runtimeMessages);
return message; collectors.forEach((beanName, collector) -> {
Message message = collector.collectMessage();
boolean offer = messageCollectVessel.offer(message);
if (!offer) {
log.warn("Buffer data starts stacking data...");
} }
});
@Override
protected PoolRunStateInfo supplement(PoolRunStateInfo poolRunStateInfo) {
return poolRunStateInfo;
} }
} }

@ -1,4 +1,4 @@
package cn.hippo4j.starter.monitor; package cn.hippo4j.starter.monitor.collect;
import cn.hippo4j.common.monitor.Message; import cn.hippo4j.common.monitor.Message;
@ -8,7 +8,7 @@ import cn.hippo4j.common.monitor.Message;
* @author chen.ma * @author chen.ma
* @date 2021/12/7 20:11 * @date 2021/12/7 20:11
*/ */
public interface Collect { public interface Collector {
/** /**
* Collect message. * Collect message.

@ -0,0 +1,54 @@
package cn.hippo4j.starter.monitor.collect;
import cn.hippo4j.common.model.PoolRunStateInfo;
import cn.hippo4j.common.monitor.AbstractMessage;
import cn.hippo4j.common.monitor.Message;
import cn.hippo4j.common.monitor.MessageTypeEnum;
import cn.hippo4j.common.monitor.RuntimeMessage;
import cn.hippo4j.starter.config.BootstrapProperties;
import cn.hippo4j.starter.core.GlobalThreadPoolManage;
import cn.hippo4j.starter.handler.AbstractThreadPoolRuntime;
import cn.hutool.core.bean.BeanUtil;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import java.util.List;
import static cn.hippo4j.starter.toolkit.IdentifyUtil.getThreadPoolIdentify;
/**
* Thread pool runtime data collection.
*
* @author chen.ma
* @date 2021/12/16 19:46
*/
@AllArgsConstructor
public class RunTimeInfoCollector extends AbstractThreadPoolRuntime implements Collector {
private final BootstrapProperties properties;
@Override
public Message collectMessage() {
AbstractMessage message = new RuntimeMessage();
List<Message> runtimeMessages = Lists.newArrayList();
List<String> listThreadPoolId = GlobalThreadPoolManage.listThreadPoolId();
for (String each : listThreadPoolId) {
PoolRunStateInfo poolRunState = getPoolRunState(each);
RuntimeMessage runtimeMessage = BeanUtil.toBean(poolRunState, RuntimeMessage.class);
runtimeMessage.setGroupKey(getThreadPoolIdentify(each, properties));
runtimeMessages.add(runtimeMessage);
}
message.setMessageType(MessageTypeEnum.RUNTIME);
message.setMessages(runtimeMessages);
return message;
}
@Override
protected PoolRunStateInfo supplement(PoolRunStateInfo basePoolRunStateInfo) {
return basePoolRunStateInfo;
}
}

@ -1,4 +1,4 @@
package cn.hippo4j.starter.monitor; package cn.hippo4j.starter.monitor.send;
import cn.hippo4j.common.monitor.Message; import cn.hippo4j.common.monitor.Message;
import cn.hippo4j.common.monitor.MessageWrapper; import cn.hippo4j.common.monitor.MessageWrapper;

@ -1,4 +1,4 @@
package cn.hippo4j.starter.monitor; package cn.hippo4j.starter.monitor.send;
import cn.hippo4j.common.monitor.Message; import cn.hippo4j.common.monitor.Message;
Loading…
Cancel
Save