Abstract web container and develop core monitoring function.

pull/160/head
chen.ma 2 years ago
parent de946e78a6
commit 1bd367dcb7

@ -53,27 +53,6 @@
</exclusions>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jetty</artifactId>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-undertow</artifactId>
<scope>compile</scope>
<optional>true</optional>
</dependency>
</dependencies>
<build>

@ -36,6 +36,27 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jetty</artifactId>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-undertow</artifactId>
<scope>compile</scope>
<optional>true</optional>
</dependency>
</dependencies>
<build>

@ -1,12 +1,13 @@
package cn.hippo4j.core.config;
import cn.hippo4j.common.web.executor.JettyWebThreadPoolHandler;
import cn.hippo4j.common.web.executor.TomcatWebThreadPoolHandler;
import cn.hippo4j.common.web.executor.UndertowWebThreadPoolHandler;
import cn.hippo4j.common.web.executor.WebThreadPoolHandlerChoose;
import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler;
import cn.hippo4j.core.executor.web.*;
import cn.hippo4j.core.toolkit.inet.InetUtils;
import lombok.RequiredArgsConstructor;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.ConfigurableEnvironment;
/**
* Web thread pool configuration.
@ -15,6 +16,7 @@ import org.springframework.context.annotation.Configuration;
* @date 2022/3/11 19:09
*/
@Configuration
@RequiredArgsConstructor
public class WebThreadPoolConfiguration {
private static final String TOMCAT_SERVLET_WEB_SERVER_FACTORY = "tomcatServletWebServerFactory";
@ -23,10 +25,23 @@ public class WebThreadPoolConfiguration {
private static final String UNDERTOW_SERVLET_WEB_SERVER_FACTORY = "undertowServletWebServerFactory";
private final ConfigurableEnvironment environment;
@Bean
public WebThreadPoolRunStateHandler webThreadPoolRunStateHandler() {
return new WebThreadPoolRunStateHandler();
}
@Bean
@SuppressWarnings("all")
public ThreadPoolRunStateHandler threadPoolRunStateHandler(InetUtils hippo4JInetUtils) {
return new ThreadPoolRunStateHandler(hippo4JInetUtils, environment);
}
@Bean
@ConditionalOnBean(name = TOMCAT_SERVLET_WEB_SERVER_FACTORY)
public TomcatWebThreadPoolHandler tomcatWebThreadPoolHandler() {
return new TomcatWebThreadPoolHandler();
public TomcatWebThreadPoolHandler tomcatWebThreadPoolHandler(WebThreadPoolRunStateHandler webThreadPoolRunStateHandler) {
return new TomcatWebThreadPoolHandler(webThreadPoolRunStateHandler);
}
@Bean

@ -1,14 +1,4 @@
package cn.hippo4j.starter.handler;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.util.ReflectionUtils;
import org.xnio.XnioWorker;
package cn.hippo4j.core.executor.state;
import cn.hippo4j.common.model.PoolBaseInfo;
import cn.hippo4j.common.model.PoolRunStateInfo;
@ -18,6 +8,11 @@ import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.toolkit.CalculateUtil;
import cn.hutool.core.date.DateUtil;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Abstract threadPool runtime info.
*
@ -65,62 +60,7 @@ public abstract class AbstractThreadPoolRuntime {
*/
public PoolRunStateInfo getPoolRunState(String threadPoolId, Executor executor) {
PoolRunStateInfo stateInfo = new PoolRunStateInfo();
if (executor != null && executor instanceof ThreadPoolExecutor) {
createJucThreadPoolStateInfo(threadPoolId, executor, stateInfo);
} else if (executor != null && executor instanceof XnioWorker) {
createXnioThreadPoolStateInfo(threadPoolId, executor, stateInfo);
}
return supplement(stateInfo);
}
private void createXnioThreadPoolStateInfo(String threadPoolId, Executor executor, PoolRunStateInfo stateInfo) {
XnioWorker xnioWorker = (XnioWorker)executor;
// private final TaskPool taskPool;
Field field = ReflectionUtils.findField(XnioWorker.class, "taskPool");
ReflectionUtils.makeAccessible(field);
Object fieldObject = ReflectionUtils.getField(field, xnioWorker);
// 核心线程数
Method getCorePoolSize = ReflectionUtils.findMethod(fieldObject.getClass(), "getCorePoolSize");
ReflectionUtils.makeAccessible(getCorePoolSize);
int corePoolSize = (int)ReflectionUtils.invokeMethod(getCorePoolSize, fieldObject);
// 最大线程数
Method getMaximumPoolSize = ReflectionUtils.findMethod(fieldObject.getClass(), "getMaximumPoolSize");
ReflectionUtils.makeAccessible(getMaximumPoolSize);
int maximumPoolSize = (int)ReflectionUtils.invokeMethod(getMaximumPoolSize, fieldObject);
// 线程池当前线程数 (有锁)
Method getPoolSize = ReflectionUtils.findMethod(fieldObject.getClass(), "getPoolSize");
ReflectionUtils.makeAccessible(getPoolSize);
int poolSize = (int)ReflectionUtils.invokeMethod(getPoolSize, fieldObject);
// 活跃线程数 (有锁)
Method getActiveCount = ReflectionUtils.findMethod(fieldObject.getClass(), "getActiveCount");
ReflectionUtils.makeAccessible(getActiveCount);
int activeCount = (int)ReflectionUtils.invokeMethod(getActiveCount, fieldObject);
activeCount = (activeCount <= 0) ? 0 : activeCount;
// 当前负载
String currentLoad = CalculateUtil.divide(activeCount, maximumPoolSize) + "";
// 峰值负载
// 没有峰值记录,直接使用当前数据
String peakLoad = CalculateUtil.divide(activeCount, maximumPoolSize) + "";
stateInfo.setCoreSize(corePoolSize);
stateInfo.setTpId(threadPoolId);
stateInfo.setPoolSize(poolSize);
stateInfo.setMaximumSize(maximumPoolSize);
stateInfo.setActiveSize(activeCount);
stateInfo.setCurrentLoad(currentLoad);
stateInfo.setPeakLoad(peakLoad);
long rejectCount = fieldObject instanceof DynamicThreadPoolExecutor
? ((DynamicThreadPoolExecutor)fieldObject).getRejectCountNum() : -1L;
stateInfo.setRejectCount(rejectCount);
stateInfo.setClientLastRefreshTime(DateUtil.formatDateTime(new Date()));
stateInfo.setTimestamp(System.currentTimeMillis());
}
private void createJucThreadPoolStateInfo(String threadPoolId, Executor executor, PoolRunStateInfo stateInfo) {
ThreadPoolExecutor pool = (ThreadPoolExecutor)executor;
ThreadPoolExecutor pool = (ThreadPoolExecutor) executor;
// 核心线程数
int corePoolSize = pool.getCorePoolSize();
// 最大线程数
@ -163,10 +103,11 @@ public abstract class AbstractThreadPoolRuntime {
stateInfo.setCompletedTaskCount(completedTaskCount);
long rejectCount =
pool instanceof DynamicThreadPoolExecutor ? ((DynamicThreadPoolExecutor)pool).getRejectCountNum() : -1L;
pool instanceof DynamicThreadPoolExecutor ? ((DynamicThreadPoolExecutor) pool).getRejectCountNum() : -1L;
stateInfo.setRejectCount(rejectCount);
stateInfo.setClientLastRefreshTime(DateUtil.formatDateTime(new Date()));
stateInfo.setTimestamp(System.currentTimeMillis());
return supplement(stateInfo);
}
}

@ -1,13 +1,13 @@
package cn.hippo4j.starter.handler;
package cn.hippo4j.core.executor.state;
import cn.hippo4j.common.model.ManyPoolRunStateInfo;
import cn.hippo4j.common.model.PoolRunStateInfo;
import cn.hippo4j.common.toolkit.ByteConvertUtil;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.core.toolkit.inet.InetUtils;
import cn.hippo4j.starter.toolkit.ByteConvertUtil;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.system.RuntimeInfo;

@ -1,4 +1,4 @@
package cn.hippo4j.starter.handler;
package cn.hippo4j.core.executor.state;
import cn.hutool.core.util.ReflectUtil;
import lombok.extern.slf4j.Slf4j;

@ -1,7 +1,8 @@
package cn.hippo4j.common.web.executor;
package cn.hippo4j.core.executor.web;
import cn.hippo4j.common.model.PoolParameter;
import cn.hippo4j.common.model.PoolParameterInfo;
import cn.hippo4j.common.model.PoolRunStateInfo;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.springframework.boot.web.embedded.jetty.JettyWebServer;
@ -42,6 +43,11 @@ public class JettyWebThreadPoolHandler extends AbstractWebThreadPoolService {
return parameterInfo;
}
@Override
public PoolRunStateInfo getWebRunStateInfo() {
return null;
}
@Override
public void updateWebThreadPool(PoolParameterInfo poolParameterInfo) {
try {

@ -1,8 +1,10 @@
package cn.hippo4j.common.web.executor;
package cn.hippo4j.core.executor.web;
import cn.hippo4j.common.model.PoolParameter;
import cn.hippo4j.common.model.PoolParameterInfo;
import lombok.AllArgsConstructor;
import cn.hippo4j.common.model.PoolRunStateInfo;
import cn.hippo4j.core.executor.state.AbstractThreadPoolRuntime;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.tomcat.util.threads.ThreadPoolExecutor;
import org.springframework.boot.web.embedded.tomcat.TomcatWebServer;
@ -19,13 +21,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
* @date 2022/1/19 20:57
*/
@Slf4j
@AllArgsConstructor
@RequiredArgsConstructor
public class TomcatWebThreadPoolHandler extends AbstractWebThreadPoolService {
private final AtomicBoolean cacheFlag = new AtomicBoolean(Boolean.FALSE);
private static String EXCEPTION_MESSAGE;
private final AbstractThreadPoolRuntime webThreadPoolRunStateHandler;
@Override
protected Executor getWebThreadPoolByServer(WebServer webServer) {
if (cacheFlag.get()) {
@ -65,6 +69,11 @@ public class TomcatWebThreadPoolHandler extends AbstractWebThreadPoolService {
return parameterInfo;
}
@Override
public PoolRunStateInfo getWebRunStateInfo() {
return webThreadPoolRunStateHandler.getPoolRunState(null, executor);
}
@Override
public void updateWebThreadPool(PoolParameterInfo poolParameterInfo) {
try {

@ -1,7 +1,11 @@
package cn.hippo4j.common.web.executor;
package cn.hippo4j.core.executor.web;
import cn.hippo4j.common.model.PoolParameter;
import cn.hippo4j.common.model.PoolParameterInfo;
import cn.hippo4j.common.model.PoolRunStateInfo;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.toolkit.CalculateUtil;
import cn.hutool.core.date.DateUtil;
import io.undertow.Undertow;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.web.embedded.undertow.UndertowWebServer;
@ -11,6 +15,8 @@ import org.xnio.Options;
import org.xnio.XnioWorker;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Date;
import java.util.Objects;
import java.util.concurrent.Executor;
@ -56,6 +62,53 @@ public class UndertowWebThreadPoolHandler extends AbstractWebThreadPoolService {
return parameterInfo;
}
@Override
public PoolRunStateInfo getWebRunStateInfo() {
PoolRunStateInfo stateInfo = new PoolRunStateInfo();
XnioWorker xnioWorker = (XnioWorker) executor;
// private final TaskPool taskPool;
Field field = ReflectionUtils.findField(XnioWorker.class, "taskPool");
ReflectionUtils.makeAccessible(field);
Object fieldObject = ReflectionUtils.getField(field, xnioWorker);
// 核心线程数
Method getCorePoolSize = ReflectionUtils.findMethod(fieldObject.getClass(), "getCorePoolSize");
ReflectionUtils.makeAccessible(getCorePoolSize);
int corePoolSize = (int) ReflectionUtils.invokeMethod(getCorePoolSize, fieldObject);
// 最大线程数
Method getMaximumPoolSize = ReflectionUtils.findMethod(fieldObject.getClass(), "getMaximumPoolSize");
ReflectionUtils.makeAccessible(getMaximumPoolSize);
int maximumPoolSize = (int) ReflectionUtils.invokeMethod(getMaximumPoolSize, fieldObject);
// 线程池当前线程数 (有锁)
Method getPoolSize = ReflectionUtils.findMethod(fieldObject.getClass(), "getPoolSize");
ReflectionUtils.makeAccessible(getPoolSize);
int poolSize = (int) ReflectionUtils.invokeMethod(getPoolSize, fieldObject);
// 活跃线程数 (有锁)
Method getActiveCount = ReflectionUtils.findMethod(fieldObject.getClass(), "getActiveCount");
ReflectionUtils.makeAccessible(getActiveCount);
int activeCount = (int) ReflectionUtils.invokeMethod(getActiveCount, fieldObject);
activeCount = (activeCount <= 0) ? 0 : activeCount;
// 当前负载
String currentLoad = CalculateUtil.divide(activeCount, maximumPoolSize) + "";
// 峰值负载
// 没有峰值记录,直接使用当前数据
String peakLoad = CalculateUtil.divide(activeCount, maximumPoolSize) + "";
stateInfo.setCoreSize(corePoolSize);
stateInfo.setPoolSize(poolSize);
stateInfo.setMaximumSize(maximumPoolSize);
stateInfo.setActiveSize(activeCount);
stateInfo.setCurrentLoad(currentLoad);
stateInfo.setPeakLoad(peakLoad);
long rejectCount = fieldObject instanceof DynamicThreadPoolExecutor
? ((DynamicThreadPoolExecutor) fieldObject).getRejectCountNum() : -1L;
stateInfo.setRejectCount(rejectCount);
stateInfo.setClientLastRefreshTime(DateUtil.formatDateTime(new Date()));
stateInfo.setTimestamp(System.currentTimeMillis());
return stateInfo;
}
@Override
public void updateWebThreadPool(PoolParameterInfo poolParameterInfo) {
try {

@ -1,4 +1,4 @@
package cn.hippo4j.common.web.executor;
package cn.hippo4j.core.executor.web;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.web.exception.ServiceException;

@ -1,10 +1,10 @@
package cn.hippo4j.starter.handler.web;
package cn.hippo4j.core.executor.web;
import cn.hippo4j.common.model.PoolBaseInfo;
import cn.hippo4j.common.model.PoolRunStateInfo;
import cn.hippo4j.common.toolkit.ByteConvertUtil;
import cn.hippo4j.common.toolkit.ReflectUtil;
import cn.hippo4j.starter.handler.AbstractThreadPoolRuntime;
import cn.hippo4j.starter.toolkit.ByteConvertUtil;
import cn.hippo4j.core.executor.state.AbstractThreadPoolRuntime;
import cn.hutool.core.util.StrUtil;
import cn.hutool.system.RuntimeInfo;
import lombok.extern.slf4j.Slf4j;

@ -1,7 +1,8 @@
package cn.hippo4j.common.web.executor;
package cn.hippo4j.core.executor.web;
import cn.hippo4j.common.model.PoolParameter;
import cn.hippo4j.common.model.PoolParameterInfo;
import cn.hippo4j.common.model.PoolRunStateInfo;
import java.util.concurrent.Executor;
@ -27,6 +28,13 @@ public interface WebThreadPoolService {
*/
PoolParameter getWebThreadPoolParameter();
/**
* Get web run state info.
*
* @return
*/
PoolRunStateInfo getWebRunStateInfo();
/**
* Update web thread pool.
*

@ -1,6 +1,7 @@
package cn.hippo4j.core.starter.config;
import cn.hippo4j.core.config.BootstrapPropertiesInterface;
import cn.hippo4j.core.starter.monitor.DynamicThreadPoolMonitor;
import cn.hippo4j.core.starter.parser.ConfigFileTypeEnum;
import lombok.Getter;
import lombok.Setter;
@ -33,14 +34,25 @@ public class BootstrapCoreProperties implements BootstrapPropertiesInterface {
private Boolean banner = Boolean.TRUE;
/***
* Enabled collect.
* Collect thread pool runtime indicators.
*/
private Boolean collect = Boolean.TRUE;
/**
* Check state interval.
* Type of collection thread pool running data. eg: log,metric. Multiple can be used at the same time.
* Custom SPI support {@link DynamicThreadPoolMonitor}.
*/
private Integer checkStateInterval;
private String collectType;
/**
* Delay starting data acquisition task. unit: ms
*/
private Long initialDelay = 10000L;
/**
* Collect interval. unit: ms
*/
private Long collectInterval = 5000L;
/**
* Config file type.
@ -84,9 +96,14 @@ public class BootstrapCoreProperties implements BootstrapPropertiesInterface {
private List<NotifyPlatformProperties> notifyPlatforms;
/**
* Is alarm.
* Whether to enable thread pool running alarm.
*/
private Boolean isAlarm;
private Boolean alarm = Boolean.TRUE;
/**
* Check thread pool running status interval.
*/
private Integer checkStateInterval;
/**
* Active alarm.
@ -99,9 +116,9 @@ public class BootstrapCoreProperties implements BootstrapPropertiesInterface {
private Integer capacityAlarm;
/**
* Interval.
* Thread pool run alarm interval. unit: s
*/
private Integer interval;
private Integer alarmInterval;
/**
* Receive.

@ -13,6 +13,10 @@ import cn.hippo4j.core.config.UtilAutoConfiguration;
import cn.hippo4j.core.config.WebThreadPoolConfiguration;
import cn.hippo4j.core.enable.MarkerConfiguration;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler;
import cn.hippo4j.core.starter.monitor.DynamicThreadPoolMonitorExecutor;
import cn.hippo4j.core.starter.monitor.LogMonitorHandler;
import cn.hippo4j.core.starter.monitor.MetricMonitorHandler;
import cn.hippo4j.core.starter.notify.CoreNotifyConfigBuilder;
import cn.hippo4j.core.starter.refresher.ApolloRefresherHandler;
import cn.hippo4j.core.starter.refresher.NacosCloudRefresherHandler;
@ -105,29 +109,41 @@ public class DynamicThreadPoolCoreAutoConfiguration {
@Bean
@ConditionalOnClass(name = NACOS_CONFIG_KEY)
@ConditionalOnMissingClass(NACOS_CONFIG_MANAGER_KEY)
public NacosRefresherHandler nacosRefresherHandler(ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler,
BootstrapCoreProperties bootstrapCoreProperties) {
public NacosRefresherHandler nacosRefresherHandler(ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler) {
return new NacosRefresherHandler(threadPoolNotifyAlarmHandler, bootstrapCoreProperties);
}
@Bean
@ConditionalOnClass(name = NACOS_CONFIG_MANAGER_KEY)
public NacosCloudRefresherHandler nacosCloudRefresherHandler(ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler,
BootstrapCoreProperties bootstrapCoreProperties) {
public NacosCloudRefresherHandler nacosCloudRefresherHandler(ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler) {
return new NacosCloudRefresherHandler(threadPoolNotifyAlarmHandler, bootstrapCoreProperties);
}
@Bean
@ConditionalOnClass(name = APOLLO_CONFIG_KEY)
public ApolloRefresherHandler apolloRefresher(ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler,
BootstrapCoreProperties bootstrapCoreProperties) {
public ApolloRefresherHandler apolloRefresher(ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler) {
return new ApolloRefresherHandler(threadPoolNotifyAlarmHandler, bootstrapCoreProperties);
}
@Bean
@ConditionalOnClass(name = ZK_CONFIG_KEY)
public ZookeeperRefresherHandler zookeeperRefresher(ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler,
BootstrapCoreProperties bootstrapCoreProperties) {
public ZookeeperRefresherHandler zookeeperRefresher(ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler) {
return new ZookeeperRefresherHandler(threadPoolNotifyAlarmHandler, bootstrapCoreProperties);
}
@Bean
public DynamicThreadPoolMonitorExecutor hippo4jDynamicThreadPoolMonitorExecutor() {
return new DynamicThreadPoolMonitorExecutor(bootstrapCoreProperties);
}
@Bean
public LogMonitorHandler hippo4jLogMonitorHandler(ThreadPoolRunStateHandler threadPoolRunStateHandler) {
return new LogMonitorHandler(threadPoolRunStateHandler);
}
@Bean
public MetricMonitorHandler hippo4jMetricMonitorHandler(ThreadPoolRunStateHandler threadPoolRunStateHandler) {
return new MetricMonitorHandler(threadPoolRunStateHandler);
}
}

@ -0,0 +1,37 @@
package cn.hippo4j.core.starter.monitor;
import cn.hippo4j.common.model.PoolRunStateInfo;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler;
import lombok.RequiredArgsConstructor;
import java.util.List;
/**
* Abstract dynamic thread-pool monitor.
*
* @author chen.ma
* @date 2022/3/25 12:07
*/
@RequiredArgsConstructor
public abstract class AbstractDynamicThreadPoolMonitor implements DynamicThreadPoolMonitor {
private final ThreadPoolRunStateHandler threadPoolRunStateHandler;
/**
* Execute.
*
* @param poolRunStateInfo
*/
protected abstract void execute(PoolRunStateInfo poolRunStateInfo);
@Override
public void collect() {
List<String> listDynamicThreadPoolId = GlobalThreadPoolManage.listThreadPoolId();
for (String each : listDynamicThreadPoolId) {
PoolRunStateInfo poolRunState = threadPoolRunStateHandler.getPoolRunState(each);
execute(poolRunState);
}
}
}

@ -0,0 +1,11 @@
package cn.hippo4j.core.starter.monitor;
/**
* Dynamic thread-pool monitor.
*
* @author chen.ma
* @date 2022/3/25 19:03
*/
public interface DynamicThreadPoolMonitor extends ThreadPoolMonitor {
}

@ -0,0 +1,83 @@
package cn.hippo4j.core.starter.monitor;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.core.executor.support.ThreadFactoryBuilder;
import cn.hippo4j.core.spi.DynamicThreadPoolServiceLoader;
import cn.hippo4j.core.starter.config.BootstrapCoreProperties;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Dynamic thread-pool monitor executor.
*
* @author chen.ma
* @date 2022/3/25 19:29
*/
@Slf4j
@RequiredArgsConstructor
public class DynamicThreadPoolMonitorExecutor implements ApplicationRunner {
private final BootstrapCoreProperties properties;
private ScheduledThreadPoolExecutor collectExecutor;
private List<ThreadPoolMonitor> threadPoolMonitors;
@Override
public void run(ApplicationArguments args) throws Exception {
String collectType = properties.getCollectType();
if (!properties.getCollect() || StringUtil.isBlank(collectType)) {
return;
}
log.info("Start monitoring the running status of dynamic thread pool.");
threadPoolMonitors = Lists.newArrayList();
String collectTaskName = "client.scheduled.collect.data";
collectExecutor = new ScheduledThreadPoolExecutor(
new Integer(1),
ThreadFactoryBuilder.builder().daemon(true).prefix(collectTaskName).build()
);
// Get dynamic thread pool monitoring component.
List<String> collectTypes = Arrays.asList(collectType.split(","));
ApplicationContextHolder.getBeansOfType(ThreadPoolMonitor.class)
.forEach((key, val) -> {
if (collectTypes.contains(val.getType())) {
threadPoolMonitors.add(val);
}
});
Collection<DynamicThreadPoolMonitor> dynamicThreadPoolMonitors =
DynamicThreadPoolServiceLoader.getSingletonServiceInstances(DynamicThreadPoolMonitor.class);
dynamicThreadPoolMonitors.stream().filter(each -> collectTypes.contains(each.getType())).forEach(each -> threadPoolMonitors.add(each));
// Execute dynamic thread pool monitoring component.
collectExecutor.scheduleWithFixedDelay(
() -> scheduleRunnable(),
properties.getInitialDelay(),
properties.getCollectInterval(),
TimeUnit.MILLISECONDS
);
}
private void scheduleRunnable() {
for (ThreadPoolMonitor each : threadPoolMonitors) {
try {
each.collect();
} catch (Exception ex) {
log.error("Error monitoring the running status of dynamic thread pool. Type :: {}", each.getType(), ex);
}
}
}
}

@ -0,0 +1,31 @@
package cn.hippo4j.core.starter.monitor;
import cn.hippo4j.common.model.PoolRunStateInfo;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler;
import lombok.extern.slf4j.Slf4j;
/**
* Log monitor handler.
*
* @author chen.ma
* @date 2022/3/25 19:22
*/
@Slf4j
public class LogMonitorHandler extends AbstractDynamicThreadPoolMonitor {
public LogMonitorHandler(ThreadPoolRunStateHandler threadPoolRunStateHandler) {
super(threadPoolRunStateHandler);
}
@Override
protected void execute(PoolRunStateInfo poolRunStateInfo) {
log.info("{}", JSONUtil.toJSONString(poolRunStateInfo));
}
@Override
public String getType() {
return "log";
}
}

@ -0,0 +1,28 @@
package cn.hippo4j.core.starter.monitor;
import cn.hippo4j.common.model.PoolRunStateInfo;
import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler;
/**
* Metric monitor handler.
*
* @author chen.ma
* @date 2022/3/25 20:37
*/
public class MetricMonitorHandler extends AbstractDynamicThreadPoolMonitor {
public MetricMonitorHandler(ThreadPoolRunStateHandler threadPoolRunStateHandler) {
super(threadPoolRunStateHandler);
}
@Override
protected void execute(PoolRunStateInfo poolRunStateInfo) {
}
@Override
public String getType() {
return "metric";
}
}

@ -0,0 +1,23 @@
package cn.hippo4j.core.starter.monitor;
/**
* Thread-pool monitor.
*
* @author chen.ma
* @date 2022/3/25 19:03
*/
public interface ThreadPoolMonitor {
/**
* Get type.
*
* @return
*/
String getType();
/**
* Collect data.
*/
void collect();
}

@ -3,7 +3,6 @@ package cn.hippo4j.core.starter.notify;
import cn.hippo4j.common.api.NotifyConfigBuilder;
import cn.hippo4j.common.notify.AlarmControlHandler;
import cn.hippo4j.common.notify.NotifyConfigDTO;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.core.starter.config.BootstrapCoreProperties;
import cn.hippo4j.core.starter.config.ExecutorProperties;
import cn.hippo4j.core.starter.config.NotifyPlatformProperties;
@ -64,7 +63,7 @@ public class CoreNotifyConfigBuilder implements NotifyConfigBuilder {
notifyConfig.setSecretKey(platformProperties.getSecretKey());
int interval = Optional.ofNullable(executor.getNotify())
.map(each -> each.getInterval())
.orElseGet(() -> bootstrapCoreProperties.getInterval() != null ? bootstrapCoreProperties.getInterval() : 5);
.orElseGet(() -> bootstrapCoreProperties.getAlarmInterval() != null ? bootstrapCoreProperties.getAlarmInterval() : 5);
notifyConfig.setInterval(interval);
notifyConfig.setReceives(buildReceive(executor, platformProperties));
alarmNotifyConfigs.add(notifyConfig);

@ -8,8 +8,8 @@ import cn.hippo4j.common.notify.HippoBaseSendMessageService;
import cn.hippo4j.common.notify.NotifyConfigDTO;
import cn.hippo4j.common.notify.ThreadPoolNotifyAlarm;
import cn.hippo4j.common.notify.request.ChangeParameterNotifyRequest;
import cn.hippo4j.common.web.executor.WebThreadPoolHandlerChoose;
import cn.hippo4j.common.web.executor.WebThreadPoolService;
import cn.hippo4j.core.executor.web.WebThreadPoolHandlerChoose;
import cn.hippo4j.core.executor.web.WebThreadPoolService;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;

@ -127,7 +127,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
ThreadPoolNotifyAlarm notify = executorProperties.getNotify();
boolean isAlarm = Optional.ofNullable(notify)
.map(each -> each.getIsAlarm())
.orElseGet(() -> bootstrapCoreProperties.getIsAlarm() != null ? bootstrapCoreProperties.getIsAlarm() : true);
.orElseGet(() -> bootstrapCoreProperties.getAlarm() != null ? bootstrapCoreProperties.getAlarm() : true);
int activeAlarm = Optional.ofNullable(notify)
.map(each -> each.getActiveAlarm())
.orElseGet(() -> bootstrapCoreProperties.getActiveAlarm() != null ? bootstrapCoreProperties.getActiveAlarm() : 80);
@ -136,7 +136,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
.orElseGet(() -> bootstrapCoreProperties.getCapacityAlarm() != null ? bootstrapCoreProperties.getCapacityAlarm() : 80);
int interval = Optional.ofNullable(notify)
.map(each -> each.getInterval())
.orElseGet(() -> bootstrapCoreProperties.getInterval() != null ? bootstrapCoreProperties.getInterval() : 5);
.orElseGet(() -> bootstrapCoreProperties.getAlarmInterval() != null ? bootstrapCoreProperties.getAlarmInterval() : 5);
String receive = Optional.ofNullable(notify)
.map(each -> each.getReceive())
.orElseGet(() -> bootstrapCoreProperties.getReceive() != null ? bootstrapCoreProperties.getReceive() : null);

@ -2,7 +2,7 @@ package cn.hippo4j.starter.config;
import cn.hippo4j.common.api.ThreadDetailState;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.web.executor.WebThreadPoolHandlerChoose;
import cn.hippo4j.core.executor.web.WebThreadPoolHandlerChoose;
import cn.hippo4j.core.config.UtilAutoConfiguration;
import cn.hippo4j.core.config.WebThreadPoolConfiguration;
import cn.hippo4j.core.enable.MarkerConfiguration;
@ -13,9 +13,9 @@ import cn.hippo4j.starter.controller.PoolRunStateController;
import cn.hippo4j.starter.controller.WebThreadPoolController;
import cn.hippo4j.starter.core.*;
import cn.hippo4j.starter.event.ApplicationContentPostProcessor;
import cn.hippo4j.starter.handler.BaseThreadDetailStateHandler;
import cn.hippo4j.starter.handler.ThreadPoolRunStateHandler;
import cn.hippo4j.starter.handler.web.WebThreadPoolRunStateHandler;
import cn.hippo4j.starter.core.BaseThreadDetailStateHandler;
import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler;
import cn.hippo4j.core.executor.web.WebThreadPoolRunStateHandler;
import cn.hippo4j.starter.monitor.ReportingEventExecutor;
import cn.hippo4j.starter.monitor.collect.RunTimeInfoCollector;
import cn.hippo4j.starter.monitor.send.HttpConnectSender;
@ -85,12 +85,6 @@ public class DynamicThreadPoolAutoConfiguration {
return new DynamicThreadPoolPostProcessor(properties, httpAgent, threadPoolOperation, threadPoolDynamicRefresh);
}
@Bean
@SuppressWarnings("all")
public ThreadPoolRunStateHandler threadPoolRunStateHandler(InetUtils hippo4JInetUtils) {
return new ThreadPoolRunStateHandler(hippo4JInetUtils, environment);
}
@Bean
@ConditionalOnMissingBean(value = ThreadDetailState.class)
public ThreadDetailState baseThreadDetailStateHandler() {
@ -131,11 +125,6 @@ public class DynamicThreadPoolAutoConfiguration {
return new ApplicationContentPostProcessor();
}
@Bean
public WebThreadPoolRunStateHandler webThreadPoolRunStateHandler() {
return new WebThreadPoolRunStateHandler();
}
@Bean
public WebThreadPoolController webThreadPoolController(WebThreadPoolHandlerChoose webThreadPoolServiceChoose,
WebThreadPoolRunStateHandler webThreadPoolRunStateHandler) {

@ -5,7 +5,7 @@ import cn.hippo4j.common.model.PoolRunStateInfo;
import cn.hippo4j.common.model.ThreadDetailStateInfo;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.common.web.base.Results;
import cn.hippo4j.starter.handler.ThreadPoolRunStateHandler;
import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler;
import lombok.AllArgsConstructor;
import org.springframework.web.bind.annotation.CrossOrigin;
import org.springframework.web.bind.annotation.GetMapping;

@ -5,9 +5,8 @@ import cn.hippo4j.common.model.PoolParameterInfo;
import cn.hippo4j.common.model.PoolRunStateInfo;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.common.web.base.Results;
import cn.hippo4j.common.web.executor.WebThreadPoolHandlerChoose;
import cn.hippo4j.starter.handler.web.WebThreadPoolRunStateHandler;
import cn.hippo4j.common.web.executor.WebThreadPoolService;
import cn.hippo4j.core.executor.web.WebThreadPoolHandlerChoose;
import cn.hippo4j.core.executor.web.WebThreadPoolRunStateHandler;
import lombok.AllArgsConstructor;
import org.springframework.web.bind.annotation.*;
@ -32,24 +31,20 @@ public class WebThreadPoolController {
@GetMapping("/web/base/info")
public Result<PoolBaseInfo> getPoolBaseState() {
WebThreadPoolService webThreadPoolService = webThreadPoolServiceChoose.choose();
Executor webThreadPool = webThreadPoolService.getWebThreadPool();
Executor webThreadPool = webThreadPoolServiceChoose.choose().getWebThreadPool();
PoolBaseInfo poolBaseInfo = webThreadPoolRunStateHandler.simpleInfo(webThreadPool);
return Results.success(poolBaseInfo);
}
@GetMapping("/web/run/state")
public Result<PoolRunStateInfo> getPoolRunState() {
WebThreadPoolService webThreadPoolService = webThreadPoolServiceChoose.choose();
Executor webThreadPool = webThreadPoolService.getWebThreadPool();
PoolRunStateInfo poolRunState = webThreadPoolRunStateHandler.getPoolRunState(null, webThreadPool);
PoolRunStateInfo poolRunState = webThreadPoolServiceChoose.choose().getWebRunStateInfo();
return Results.success(poolRunState);
}
@PostMapping("/web/update/pool")
public Result<Void> updateWebThreadPool(@RequestBody PoolParameterInfo poolParameterInfo) {
WebThreadPoolService webThreadPoolService = webThreadPoolServiceChoose.choose();
webThreadPoolService.updateWebThreadPool(poolParameterInfo);
webThreadPoolServiceChoose.choose().updateWebThreadPool(poolParameterInfo);
return Results.success();
}

@ -1,4 +1,4 @@
package cn.hippo4j.starter.handler;
package cn.hippo4j.starter.core;
import cn.hippo4j.common.api.ThreadDetailState;
import cn.hippo4j.common.model.ThreadDetailStateInfo;

@ -7,7 +7,7 @@ import cn.hippo4j.common.monitor.MessageTypeEnum;
import cn.hippo4j.common.monitor.RuntimeMessage;
import cn.hippo4j.starter.config.BootstrapProperties;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.starter.handler.AbstractThreadPoolRuntime;
import cn.hippo4j.core.executor.state.AbstractThreadPoolRuntime;
import cn.hutool.core.bean.BeanUtil;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;

Loading…
Cancel
Save