扩展 Web Server 容器线程池动态调参、监控. (#68)

pull/84/head
chen.ma 3 years ago
parent 42a13cd3a5
commit 1e912e2223

@ -3,6 +3,7 @@ package cn.hippo4j.common.api;
import cn.hippo4j.common.model.ThreadDetailStateInfo;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
/**
* Get thread status in thread pool.
@ -20,4 +21,12 @@ public interface ThreadDetailState {
*/
List<ThreadDetailStateInfo> getThreadDetailStateInfo(String threadPoolId);
/**
* Get thread status in thread pool.
*
* @param threadPoolExecutor
* @return
*/
List<ThreadDetailStateInfo> getThreadDetailStateInfo(ThreadPoolExecutor threadPoolExecutor);
}

@ -3,6 +3,7 @@ package cn.hippo4j.starter.config;
import cn.hippo4j.common.api.ThreadDetailState;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.starter.controller.PoolRunStateController;
import cn.hippo4j.starter.controller.WebThreadPoolController;
import cn.hippo4j.starter.core.ConfigService;
import cn.hippo4j.starter.core.DynamicThreadPoolPostProcessor;
import cn.hippo4j.starter.core.ThreadPoolConfigService;
@ -12,6 +13,9 @@ import cn.hippo4j.starter.event.ApplicationContentPostProcessor;
import cn.hippo4j.starter.handler.BaseThreadDetailStateHandler;
import cn.hippo4j.starter.handler.DynamicThreadPoolBannerHandler;
import cn.hippo4j.starter.handler.ThreadPoolRunStateHandler;
import cn.hippo4j.starter.handler.web.TomcatWebThreadPoolHandler;
import cn.hippo4j.starter.handler.web.WebThreadPoolRunStateHandler;
import cn.hippo4j.starter.handler.web.WebThreadPoolService;
import cn.hippo4j.starter.monitor.ReportingEventExecutor;
import cn.hippo4j.starter.monitor.collect.RunTimeInfoCollector;
import cn.hippo4j.starter.monitor.send.HttpConnectSender;
@ -28,6 +32,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
@ -52,6 +57,8 @@ public class DynamicThreadPoolAutoConfiguration {
private final ConfigurableEnvironment environment;
private final ServletWebServerApplicationContext applicationContext;
public static final String CLIENT_IDENTIFICATION_VALUE = IdUtil.simpleUUID();
@Bean
@ -130,6 +137,23 @@ public class DynamicThreadPoolAutoConfiguration {
return new ApplicationContentPostProcessor();
}
@Bean
public WebThreadPoolRunStateHandler webThreadPoolRunStateHandler() {
return new WebThreadPoolRunStateHandler();
}
@Bean
public TomcatWebThreadPoolHandler tomcatWebThreadPoolHandler() {
return new TomcatWebThreadPoolHandler(applicationContext);
}
@Bean
public WebThreadPoolController webThreadPoolController(WebThreadPoolService webThreadPoolService,
ThreadDetailState threadDetailState,
WebThreadPoolRunStateHandler webThreadPoolRunStateHandler) {
return new WebThreadPoolController(webThreadPoolService, threadDetailState, webThreadPoolRunStateHandler);
}
}

@ -0,0 +1,45 @@
package cn.hippo4j.starter.controller;
import cn.hippo4j.common.api.ThreadDetailState;
import cn.hippo4j.common.model.PoolParameterInfo;
import cn.hippo4j.common.model.PoolRunStateInfo;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.common.web.base.Results;
import cn.hippo4j.starter.handler.web.WebThreadPoolRunStateHandler;
import cn.hippo4j.starter.handler.web.WebThreadPoolService;
import lombok.AllArgsConstructor;
import org.springframework.web.bind.annotation.*;
import java.util.concurrent.Executor;
/**
* Web thread pool controller.
*
* @author chen.ma
* @date 2022/1/19 20:54
*/
@CrossOrigin
@RestController
@AllArgsConstructor
public class WebThreadPoolController {
private final WebThreadPoolService webThreadPoolService;
private final ThreadDetailState threadDetailState;
private final WebThreadPoolRunStateHandler webThreadPoolRunStateHandler;
@GetMapping("/web/run/state")
public Result<PoolRunStateInfo> getPoolRunState() {
Executor webThreadPool = webThreadPoolService.getWebThreadPool();
PoolRunStateInfo poolRunState = webThreadPoolRunStateHandler.getPoolRunState(null, webThreadPool);
return Results.success(poolRunState);
}
@PostMapping("/web/update/pool")
public Result<Void> updateWebThreadPool(@RequestBody PoolParameterInfo poolParameterInfo) {
webThreadPoolService.updateWebThreadPool(poolParameterInfo);
return Results.success();
}
}

@ -9,6 +9,7 @@ import cn.hutool.core.date.DateUtil;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
@ -36,55 +37,68 @@ public abstract class AbstractThreadPoolRuntime {
public PoolRunStateInfo getPoolRunState(String threadPoolId) {
DynamicThreadPoolWrapper executorService = GlobalThreadPoolManage.getExecutorService(threadPoolId);
ThreadPoolExecutor pool = executorService.getExecutor();
return getPoolRunState(threadPoolId, pool);
}
// 核心线程数
int corePoolSize = pool.getCorePoolSize();
// 最大线程数
int maximumPoolSize = pool.getMaximumPoolSize();
// 线程池当前线程数 (有锁)
int poolSize = pool.getPoolSize();
// 活跃线程数 (有锁)
int activeCount = pool.getActiveCount();
// 同时进入池中的最大线程数 (有锁)
int largestPoolSize = pool.getLargestPoolSize();
// 线程池中执行任务总数量 (有锁)
long completedTaskCount = pool.getCompletedTaskCount();
// 当前负载
String currentLoad = CalculateUtil.divide(activeCount, maximumPoolSize) + "";
// 峰值负载
String peakLoad = CalculateUtil.divide(largestPoolSize, maximumPoolSize) + "";
/**
* Get pool run state.
*
* @param threadPoolId
* @param executor
* @return
*/
public PoolRunStateInfo getPoolRunState(String threadPoolId, Executor executor) {
PoolRunStateInfo stateInfo = new PoolRunStateInfo();
if (executor != null && executor instanceof ThreadPoolExecutor) {
ThreadPoolExecutor pool = (ThreadPoolExecutor) executor;
// 核心线程数
int corePoolSize = pool.getCorePoolSize();
// 最大线程数
int maximumPoolSize = pool.getMaximumPoolSize();
// 线程池当前线程数 (有锁)
int poolSize = pool.getPoolSize();
// 活跃线程数 (有锁)
int activeCount = pool.getActiveCount();
// 同时进入池中的最大线程数 (有锁)
int largestPoolSize = pool.getLargestPoolSize();
// 线程池中执行任务总数量 (有锁)
long completedTaskCount = pool.getCompletedTaskCount();
// 当前负载
String currentLoad = CalculateUtil.divide(activeCount, maximumPoolSize) + "";
// 峰值负载
String peakLoad = CalculateUtil.divide(largestPoolSize, maximumPoolSize) + "";
BlockingQueue<Runnable> queue = pool.getQueue();
// 队列元素个数
int queueSize = queue.size();
// 队列类型
String queueType = queue.getClass().getSimpleName();
// 队列剩余容量
int remainingCapacity = queue.remainingCapacity();
// 队列容量
int queueCapacity = queueSize + remainingCapacity;
BlockingQueue<Runnable> queue = pool.getQueue();
// 队列元素个数
int queueSize = queue.size();
// 队列类型
String queueType = queue.getClass().getSimpleName();
// 队列剩余容量
int remainingCapacity = queue.remainingCapacity();
// 队列容量
int queueCapacity = queueSize + remainingCapacity;
PoolRunStateInfo stateInfo = new PoolRunStateInfo();
stateInfo.setCoreSize(corePoolSize);
stateInfo.setTpId(threadPoolId);
stateInfo.setPoolSize(poolSize);
stateInfo.setMaximumSize(maximumPoolSize);
stateInfo.setActiveSize(activeCount);
stateInfo.setCurrentLoad(currentLoad);
stateInfo.setPeakLoad(peakLoad);
stateInfo.setQueueType(queueType);
stateInfo.setQueueSize(queueSize);
stateInfo.setQueueCapacity(queueCapacity);
stateInfo.setQueueRemainingCapacity(remainingCapacity);
stateInfo.setLargestPoolSize(largestPoolSize);
stateInfo.setCompletedTaskCount(completedTaskCount);
stateInfo.setCoreSize(corePoolSize);
stateInfo.setTpId(threadPoolId);
stateInfo.setPoolSize(poolSize);
stateInfo.setMaximumSize(maximumPoolSize);
stateInfo.setActiveSize(activeCount);
stateInfo.setCurrentLoad(currentLoad);
stateInfo.setPeakLoad(peakLoad);
stateInfo.setQueueType(queueType);
stateInfo.setQueueSize(queueSize);
stateInfo.setQueueCapacity(queueCapacity);
stateInfo.setQueueRemainingCapacity(remainingCapacity);
stateInfo.setLargestPoolSize(largestPoolSize);
stateInfo.setCompletedTaskCount(completedTaskCount);
int rejectCount = pool instanceof DynamicThreadPoolExecutor
? ((DynamicThreadPoolExecutor) pool).getRejectCount()
: -1;
stateInfo.setRejectCount(rejectCount);
stateInfo.setClientLastRefreshTime(DateUtil.formatDateTime(new Date()));
stateInfo.setTimestamp(System.currentTimeMillis());
int rejectCount = pool instanceof DynamicThreadPoolExecutor
? ((DynamicThreadPoolExecutor) pool).getRejectCount()
: -1;
stateInfo.setRejectCount(rejectCount);
stateInfo.setClientLastRefreshTime(DateUtil.formatDateTime(new Date()));
stateInfo.setTimestamp(System.currentTimeMillis());
}
return supplement(stateInfo);
}

@ -16,7 +16,7 @@ import java.util.concurrent.ThreadPoolExecutor;
/**
* Base thread detail state handler.
*
* <p>The Java 8 implementation is temporarily provided,
* <p> The Java 8 implementation is temporarily provided,
* {@link ThreadDetailState} interface can be customized.
*
* @author chen.ma
@ -33,11 +33,15 @@ public class BaseThreadDetailStateHandler implements ThreadDetailState {
public List<ThreadDetailStateInfo> getThreadDetailStateInfo(String threadPoolId) {
DynamicThreadPoolWrapper poolWrapper = GlobalThreadPoolManage.getExecutorService(threadPoolId);
ThreadPoolExecutor executor = poolWrapper.getExecutor();
return getThreadDetailStateInfo(executor);
}
@Override
public List<ThreadDetailStateInfo> getThreadDetailStateInfo(ThreadPoolExecutor threadPoolExecutor) {
List<ThreadDetailStateInfo> resultThreadState = new ArrayList();
try {
// TODO: Should the object be copied deeply to avoid the destruction of the worker
HashSet<Object> workers = (HashSet<Object>) ReflectUtil.getFieldValue(executor, WORKERS);
HashSet<Object> workers = (HashSet<Object>) ReflectUtil.getFieldValue(threadPoolExecutor, WORKERS);
if (CollectionUtil.isEmpty(workers)) {
return resultThreadState;
}

@ -0,0 +1,38 @@
package cn.hippo4j.starter.handler.web;
import java.util.concurrent.Executor;
/**
* Abstract web thread pool service.
*
* @author chen.ma
* @date 2022/1/19 21:20
*/
public abstract class AbstractWebThreadPoolService implements WebThreadPoolService {
/**
* Thread pool executor.
*/
protected volatile Executor executor;
/**
* Get web thread pool by server.
*
* @return
*/
protected abstract Executor getWebThreadPoolByServer();
@Override
public Executor getWebThreadPool() {
if (executor == null) {
synchronized (AbstractWebThreadPoolService.class) {
if (executor == null) {
executor = getWebThreadPoolByServer();
}
}
}
return executor;
}
}

@ -0,0 +1,61 @@
package cn.hippo4j.starter.handler.web;
import cn.hippo4j.common.model.PoolParameterInfo;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.web.embedded.tomcat.TomcatWebServer;
import org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Tomcat web thread pool handler.
*
* @author chen.ma
* @date 2022/1/19 20:57
*/
@Slf4j
@AllArgsConstructor
public class TomcatWebThreadPoolHandler extends AbstractWebThreadPoolService {
private final ServletWebServerApplicationContext applicationContext;
private final AtomicBoolean cacheFlag = new AtomicBoolean(Boolean.FALSE);
private static String EXCEPTION_MESSAGE;
@Override
protected Executor getWebThreadPoolByServer() {
if (cacheFlag.get()) {
log.warn("Exception getting Tomcat thread pool. Exception message :: {}", EXCEPTION_MESSAGE);
return null;
}
Executor tomcatExecutor = null;
try {
tomcatExecutor = ((TomcatWebServer) applicationContext.getWebServer()).getTomcat().getConnector().getProtocolHandler().getExecutor();
} catch (Exception ex) {
cacheFlag.set(Boolean.TRUE);
EXCEPTION_MESSAGE = ex.getMessage();
log.error("Failed to get Tomcat thread pool. Message :: {}", EXCEPTION_MESSAGE);
}
return tomcatExecutor;
}
@Override
public void updateWebThreadPool(PoolParameterInfo poolParameterInfo) {
ThreadPoolExecutor tomcatExecutor = (ThreadPoolExecutor) executor;
try {
tomcatExecutor.setCorePoolSize(poolParameterInfo.getCoreSize());
tomcatExecutor.setMaximumPoolSize(poolParameterInfo.getMaxSize());
tomcatExecutor.setKeepAliveTime(poolParameterInfo.getKeepAliveTime(), TimeUnit.SECONDS);
} catch (Exception ex) {
log.error("Failed to modify the Tomcat thread pool parameter.", ex);
}
}
}

@ -0,0 +1,30 @@
package cn.hippo4j.starter.handler.web;
import cn.hippo4j.common.model.PoolParameterInfo;
import lombok.AllArgsConstructor;
import org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext;
import java.util.concurrent.Executor;
/**
* Undertow web thread pool handler.
*
* @author chen.ma
* @date 2022/1/19 21:19
*/
@AllArgsConstructor
public class UndertowWebThreadPoolHandler extends AbstractWebThreadPoolService {
private final ServletWebServerApplicationContext applicationContext;
@Override
protected Executor getWebThreadPoolByServer() {
return null;
}
@Override
public void updateWebThreadPool(PoolParameterInfo poolParameterInfo) {
}
}

@ -0,0 +1,19 @@
package cn.hippo4j.starter.handler.web;
import cn.hippo4j.common.model.PoolRunStateInfo;
import cn.hippo4j.starter.handler.AbstractThreadPoolRuntime;
/**
* Web thread pool run state handler.
*
* @author chen.ma
* @date 2022/1/19 21:05
*/
public class WebThreadPoolRunStateHandler extends AbstractThreadPoolRuntime {
@Override
protected PoolRunStateInfo supplement(PoolRunStateInfo basePoolRunStateInfo) {
return basePoolRunStateInfo;
}
}

@ -0,0 +1,29 @@
package cn.hippo4j.starter.handler.web;
import cn.hippo4j.common.model.PoolParameterInfo;
import java.util.concurrent.Executor;
/**
* Web thread pool service.
*
* @author chen.ma
* @date 2022/1/19 20:51
*/
public interface WebThreadPoolService {
/**
* Get web thread pool.
*
* @return TomcatJettyUndertow ThreadPoolExecutor
*/
Executor getWebThreadPool();
/**
* Update web thread pool.
*
* @param poolParameterInfo
*/
void updateWebThreadPool(PoolParameterInfo poolParameterInfo);
}
Loading…
Cancel
Save