Refactoring web thread pool code

pull/160/head
chen.ma 2 years ago
parent 24bb4b5633
commit af435bfebb

@ -1,6 +1,5 @@
package cn.hippo4j.core.executor.state;
import cn.hippo4j.common.model.PoolBaseInfo;
import cn.hippo4j.common.model.PoolRunStateInfo;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
@ -21,16 +20,6 @@ import java.util.concurrent.ThreadPoolExecutor;
*/
public abstract class AbstractThreadPoolRuntime {
/**
* Simple info.
*
* @param executor
* @return
*/
protected PoolBaseInfo simpleInfo(Executor executor) {
return null;
}
/**
* Supplement.
*

@ -1,13 +1,16 @@
package cn.hippo4j.core.executor.web;
import cn.hippo4j.common.model.PoolBaseInfo;
import cn.hippo4j.common.model.PoolParameter;
import cn.hippo4j.common.model.PoolParameterInfo;
import cn.hippo4j.common.model.PoolRunStateInfo;
import cn.hippo4j.common.toolkit.ReflectUtil;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.springframework.boot.web.embedded.jetty.JettyWebServer;
import org.springframework.boot.web.server.WebServer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
/**
@ -24,6 +27,23 @@ public class JettyWebThreadPoolHandler extends AbstractWebThreadPoolService {
return jettyWebServer.getServer().getThreadPool();
}
@Override
public PoolBaseInfo simpleInfo() {
PoolBaseInfo poolBaseInfo = new PoolBaseInfo();
QueuedThreadPool queuedThreadPool = (QueuedThreadPool) executor;
poolBaseInfo.setCoreSize(queuedThreadPool.getMinThreads());
poolBaseInfo.setMaximumSize(queuedThreadPool.getMaxThreads());
BlockingQueue jobs = (BlockingQueue) ReflectUtil.getFieldValue(queuedThreadPool, "_jobs");
int queueCapacity = jobs.remainingCapacity() + jobs.size();
poolBaseInfo.setQueueCapacity(queueCapacity);
poolBaseInfo.setQueueType(jobs.getClass().getSimpleName());
poolBaseInfo.setKeepAliveTime((long) queuedThreadPool.getIdleTimeout());
poolBaseInfo.setRejectedName("RejectedExecutionException");
return poolBaseInfo;
}
@Override
public PoolParameter getWebThreadPoolParameter() {
PoolParameterInfo parameterInfo = null;

@ -1,5 +1,6 @@
package cn.hippo4j.core.executor.web;
import cn.hippo4j.common.model.PoolBaseInfo;
import cn.hippo4j.common.model.PoolParameter;
import cn.hippo4j.common.model.PoolParameterInfo;
import cn.hippo4j.common.model.PoolRunStateInfo;
@ -10,7 +11,9 @@ import org.apache.tomcat.util.threads.ThreadPoolExecutor;
import org.springframework.boot.web.embedded.tomcat.TomcatWebServer;
import org.springframework.boot.web.server.WebServer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -49,6 +52,30 @@ public class TomcatWebThreadPoolHandler extends AbstractWebThreadPoolService {
return tomcatExecutor;
}
@Override
public PoolBaseInfo simpleInfo() {
PoolBaseInfo poolBaseInfo = new PoolBaseInfo();
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
int corePoolSize = threadPoolExecutor.getCorePoolSize();
int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler();
long keepAliveTime = threadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS);
BlockingQueue<Runnable> queue = threadPoolExecutor.getQueue();
int queueSize = queue.size();
int remainingCapacity = queue.remainingCapacity();
int queueCapacity = queueSize + remainingCapacity;
poolBaseInfo.setCoreSize(corePoolSize);
poolBaseInfo.setMaximumSize(maximumPoolSize);
poolBaseInfo.setKeepAliveTime(keepAliveTime);
poolBaseInfo.setQueueType(queue.getClass().getSimpleName());
poolBaseInfo.setQueueCapacity(queueCapacity);
poolBaseInfo.setRejectedName(rejectedExecutionHandler.getClass().getSimpleName());
return poolBaseInfo;
}
@Override
public PoolParameter getWebThreadPoolParameter() {
PoolParameterInfo parameterInfo = null;

@ -1,5 +1,6 @@
package cn.hippo4j.core.executor.web;
import cn.hippo4j.common.model.PoolBaseInfo;
import cn.hippo4j.common.model.PoolParameter;
import cn.hippo4j.common.model.PoolParameterInfo;
import cn.hippo4j.common.model.PoolRunStateInfo;
@ -42,6 +43,27 @@ public class UndertowWebThreadPoolHandler extends AbstractWebThreadPoolService {
return Objects.isNull(undertow) ? null : undertow.getWorker();
}
@Override
public PoolBaseInfo simpleInfo() {
PoolBaseInfo poolBaseInfo = new PoolBaseInfo();
XnioWorker xnioWorker = (XnioWorker) executor;
try {
int coreSize = xnioWorker.getOption(Options.WORKER_TASK_CORE_THREADS);
int maximumPoolSize = xnioWorker.getOption(Options.WORKER_TASK_MAX_THREADS);
int keepAliveTime = xnioWorker.getOption(Options.WORKER_TASK_KEEPALIVE);
poolBaseInfo.setCoreSize(coreSize);
poolBaseInfo.setMaximumSize(maximumPoolSize);
poolBaseInfo.setKeepAliveTime((long) keepAliveTime);
poolBaseInfo.setRejectedName("-");
poolBaseInfo.setQueueType("-");
} catch (Exception ex) {
log.error("The undertow container failed to get thread pool parameters.", ex);
}
return poolBaseInfo;
}
@Override
public PoolParameter getWebThreadPoolParameter() {
PoolParameterInfo parameterInfo = null;

@ -1,19 +1,11 @@
package cn.hippo4j.core.executor.web;
import cn.hippo4j.common.model.PoolBaseInfo;
import cn.hippo4j.common.model.PoolRunStateInfo;
import cn.hippo4j.common.toolkit.ByteConvertUtil;
import cn.hippo4j.common.toolkit.ReflectUtil;
import cn.hippo4j.core.executor.state.AbstractThreadPoolRuntime;
import cn.hutool.core.util.StrUtil;
import cn.hutool.system.RuntimeInfo;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.xnio.Options;
import org.xnio.XnioWorker;
import java.util.Objects;
import java.util.concurrent.*;
/**
* Web thread pool run state handler.
@ -24,59 +16,6 @@ import java.util.concurrent.*;
@Slf4j
public class WebThreadPoolRunStateHandler extends AbstractThreadPoolRuntime {
@Override
public PoolBaseInfo simpleInfo(Executor executor) {
PoolBaseInfo poolBaseInfo = new PoolBaseInfo();
if (executor != null && executor instanceof ThreadPoolExecutor) {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
int corePoolSize = threadPoolExecutor.getCorePoolSize();
int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler();
long keepAliveTime = threadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS);
BlockingQueue<Runnable> queue = threadPoolExecutor.getQueue();
int queueSize = queue.size();
int remainingCapacity = queue.remainingCapacity();
int queueCapacity = queueSize + remainingCapacity;
poolBaseInfo.setCoreSize(corePoolSize);
poolBaseInfo.setMaximumSize(maximumPoolSize);
poolBaseInfo.setKeepAliveTime(keepAliveTime);
poolBaseInfo.setQueueType(queue.getClass().getSimpleName());
poolBaseInfo.setQueueCapacity(queueCapacity);
poolBaseInfo.setRejectedName(rejectedExecutionHandler.getClass().getSimpleName());
} else if (Objects.equals("QueuedThreadPool", executor.getClass().getSimpleName())) {
QueuedThreadPool queuedThreadPool = (QueuedThreadPool) executor;
poolBaseInfo.setCoreSize(queuedThreadPool.getMinThreads());
poolBaseInfo.setMaximumSize(queuedThreadPool.getMaxThreads());
BlockingQueue jobs = (BlockingQueue) ReflectUtil.getFieldValue(queuedThreadPool, "_jobs");
int queueCapacity = jobs.remainingCapacity() + jobs.size();
poolBaseInfo.setQueueCapacity(queueCapacity);
poolBaseInfo.setQueueType(jobs.getClass().getSimpleName());
poolBaseInfo.setKeepAliveTime((long) queuedThreadPool.getIdleTimeout());
poolBaseInfo.setRejectedName("RejectedExecutionException");
} else if (Objects.equals("NioXnioWorker", executor.getClass().getSimpleName())) {
XnioWorker xnioWorker = (XnioWorker) executor;
try {
int coreSize = xnioWorker.getOption(Options.WORKER_TASK_CORE_THREADS);
int maximumPoolSize = xnioWorker.getOption(Options.WORKER_TASK_MAX_THREADS);
int keepAliveTime = xnioWorker.getOption(Options.WORKER_TASK_KEEPALIVE);
poolBaseInfo.setCoreSize(coreSize);
poolBaseInfo.setMaximumSize(maximumPoolSize);
poolBaseInfo.setKeepAliveTime((long) keepAliveTime);
poolBaseInfo.setRejectedName("-");
poolBaseInfo.setQueueType("-");
} catch (Exception ex) {
log.error("The undertow container failed to get thread pool parameters.", ex);
}
}
return poolBaseInfo;
}
@Override
protected PoolRunStateInfo supplement(PoolRunStateInfo poolRunStateInfo) {
// 内存占比: 使用内存 / 最大内存

@ -1,5 +1,6 @@
package cn.hippo4j.core.executor.web;
import cn.hippo4j.common.model.PoolBaseInfo;
import cn.hippo4j.common.model.PoolParameter;
import cn.hippo4j.common.model.PoolParameterInfo;
import cn.hippo4j.common.model.PoolRunStateInfo;
@ -21,6 +22,13 @@ public interface WebThreadPoolService {
*/
Executor getWebThreadPool();
/**
* Simple info.
*
* @return
*/
PoolBaseInfo simpleInfo();
/**
* Get web thread pool parameter.
*

@ -2,10 +2,11 @@ package cn.hippo4j.starter.config;
import cn.hippo4j.common.api.ThreadDetailState;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.core.executor.web.WebThreadPoolHandlerChoose;
import cn.hippo4j.core.config.UtilAutoConfiguration;
import cn.hippo4j.core.config.WebThreadPoolConfiguration;
import cn.hippo4j.core.enable.MarkerConfiguration;
import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler;
import cn.hippo4j.core.executor.web.WebThreadPoolHandlerChoose;
import cn.hippo4j.core.handler.DynamicThreadPoolBannerHandler;
import cn.hippo4j.core.toolkit.IdentifyUtil;
import cn.hippo4j.core.toolkit.inet.InetUtils;
@ -13,9 +14,6 @@ import cn.hippo4j.starter.controller.PoolRunStateController;
import cn.hippo4j.starter.controller.WebThreadPoolController;
import cn.hippo4j.starter.core.*;
import cn.hippo4j.starter.event.ApplicationContentPostProcessor;
import cn.hippo4j.starter.core.BaseThreadDetailStateHandler;
import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler;
import cn.hippo4j.core.executor.web.WebThreadPoolRunStateHandler;
import cn.hippo4j.starter.monitor.ReportingEventExecutor;
import cn.hippo4j.starter.monitor.collect.RunTimeInfoCollector;
import cn.hippo4j.starter.monitor.send.HttpConnectSender;
@ -126,9 +124,8 @@ public class DynamicThreadPoolAutoConfiguration {
}
@Bean
public WebThreadPoolController webThreadPoolController(WebThreadPoolHandlerChoose webThreadPoolServiceChoose,
WebThreadPoolRunStateHandler webThreadPoolRunStateHandler) {
return new WebThreadPoolController(webThreadPoolServiceChoose, webThreadPoolRunStateHandler);
public WebThreadPoolController webThreadPoolController(WebThreadPoolHandlerChoose webThreadPoolServiceChoose) {
return new WebThreadPoolController(webThreadPoolServiceChoose);
}
}

@ -6,12 +6,9 @@ import cn.hippo4j.common.model.PoolRunStateInfo;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.common.web.base.Results;
import cn.hippo4j.core.executor.web.WebThreadPoolHandlerChoose;
import cn.hippo4j.core.executor.web.WebThreadPoolRunStateHandler;
import lombok.AllArgsConstructor;
import org.springframework.web.bind.annotation.*;
import java.util.concurrent.Executor;
/**
* Web thread pool controller.
*
@ -27,12 +24,9 @@ public class WebThreadPoolController {
private final WebThreadPoolHandlerChoose webThreadPoolServiceChoose;
private final WebThreadPoolRunStateHandler webThreadPoolRunStateHandler;
@GetMapping("/web/base/info")
public Result<PoolBaseInfo> getPoolBaseState() {
Executor webThreadPool = webThreadPoolServiceChoose.choose().getWebThreadPool();
PoolBaseInfo poolBaseInfo = webThreadPoolRunStateHandler.simpleInfo(webThreadPool);
PoolBaseInfo poolBaseInfo = webThreadPoolServiceChoose.choose().simpleInfo();
return Results.success(poolBaseInfo);
}

Loading…
Cancel
Save