修改动态线程池包装器中线程池类型及名称.

pull/10/head
chen.ma 3 years ago
parent 4b49a9efaf
commit 1951baacca

@ -28,7 +28,7 @@ public class AlarmSendMessageTest {
ScheduledExecutorService scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(); ScheduledExecutorService scheduledThreadPool = Executors.newSingleThreadScheduledExecutor();
scheduledThreadPool.scheduleWithFixedDelay(() -> { scheduledThreadPool.scheduleWithFixedDelay(() -> {
DynamicThreadPoolWrapper poolWrapper = GlobalThreadPoolManage.getExecutorService(GlobalTestConstant.MESSAGE_PRODUCE); DynamicThreadPoolWrapper poolWrapper = GlobalThreadPoolManage.getExecutorService(GlobalTestConstant.MESSAGE_PRODUCE);
ThreadPoolExecutor poolExecutor = poolWrapper.getPool(); ThreadPoolExecutor poolExecutor = poolWrapper.getExecutor();
try { try {
poolExecutor.execute(() -> ThreadUtil.sleep(10240124)); poolExecutor.execute(() -> ThreadUtil.sleep(10240124));
} catch (Exception ex) { } catch (Exception ex) {

@ -29,7 +29,7 @@ public class RunStateHandlerTest {
ThreadUtil.sleep(5000); ThreadUtil.sleep(5000);
for (int i = 0; i < Integer.MAX_VALUE; i++) { for (int i = 0; i < Integer.MAX_VALUE; i++) {
DynamicThreadPoolWrapper poolWrapper = GlobalThreadPoolManage.getExecutorService(GlobalTestConstant.MESSAGE_PRODUCE); DynamicThreadPoolWrapper poolWrapper = GlobalThreadPoolManage.getExecutorService(GlobalTestConstant.MESSAGE_PRODUCE);
ThreadPoolExecutor pool = poolWrapper.getPool(); ThreadPoolExecutor pool = poolWrapper.getExecutor();
try { try {
pool.execute(() -> { pool.execute(() -> {
log.info("Thread pool name :: {}, Executing incoming blocking...", Thread.currentThread().getName()); log.info("Thread pool name :: {}, Executing incoming blocking...", Thread.currentThread().getName());

@ -19,6 +19,7 @@ import lombok.extern.slf4j.Slf4j;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
@ -138,7 +139,7 @@ public class DingSendMessageHandler implements SendMessageHandler {
List<String> receives = StrUtil.split(notifyConfig.getReceives(), ','); List<String> receives = StrUtil.split(notifyConfig.getReceives(), ',');
String afterReceives = Joiner.on(", @").join(receives); String afterReceives = Joiner.on(", @").join(receives);
DynamicThreadPoolExecutor customPool = poolWrap.getPool(); ThreadPoolExecutor customPool = poolWrap.getExecutor();
/** /**
* hesitant e.g. * hesitant e.g.
*/ */

@ -89,7 +89,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
* *
* @param dynamicThreadPoolWrap * @param dynamicThreadPoolWrap
*/ */
protected DynamicThreadPoolExecutor fillPoolAndRegister(DynamicThreadPoolWrapper dynamicThreadPoolWrap) { protected ThreadPoolExecutor fillPoolAndRegister(DynamicThreadPoolWrapper dynamicThreadPoolWrap) {
String tpId = dynamicThreadPoolWrap.getTpId(); String tpId = dynamicThreadPoolWrap.getTpId();
Map<String, String> queryStrMap = new HashMap(3); Map<String, String> queryStrMap = new HashMap(3);
queryStrMap.put(TP_ID, tpId); queryStrMap.put(TP_ID, tpId);
@ -98,7 +98,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
Result result; Result result;
boolean isSubscribe = false; boolean isSubscribe = false;
DynamicThreadPoolExecutor poolExecutor = null; ThreadPoolExecutor poolExecutor = null;
PoolParameterInfo ppi = new PoolParameterInfo(); PoolParameterInfo ppi = new PoolParameterInfo();
try { try {
@ -116,17 +116,17 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
.alarmConfig(ppi.getIsAlarm(), ppi.getCapacityAlarm(), ppi.getLivenessAlarm()) .alarmConfig(ppi.getIsAlarm(), ppi.getCapacityAlarm(), ppi.getLivenessAlarm())
.build(); .build();
dynamicThreadPoolWrap.setPool(poolExecutor); dynamicThreadPoolWrap.setExecutor(poolExecutor);
isSubscribe = true; isSubscribe = true;
} }
} catch (Exception ex) { } catch (Exception ex) {
poolExecutor = dynamicThreadPoolWrap.getPool() != null ? dynamicThreadPoolWrap.getPool() : CommonDynamicThreadPool.getInstance(tpId); poolExecutor = dynamicThreadPoolWrap.getExecutor() != null ? dynamicThreadPoolWrap.getExecutor() : CommonDynamicThreadPool.getInstance(tpId);
dynamicThreadPoolWrap.setPool(poolExecutor); dynamicThreadPoolWrap.setExecutor(poolExecutor);
log.error("[Init pool] Failed to initialize thread pool configuration. error message :: {}", ex.getMessage()); log.error("[Init pool] Failed to initialize thread pool configuration. error message :: {}", ex.getMessage());
} finally { } finally {
if (Objects.isNull(dynamicThreadPoolWrap.getPool())) { if (Objects.isNull(dynamicThreadPoolWrap.getExecutor())) {
dynamicThreadPoolWrap.setPool(CommonDynamicThreadPool.getInstance(tpId)); dynamicThreadPoolWrap.setExecutor(CommonDynamicThreadPool.getInstance(tpId));
} }
// 设置是否订阅远端线程池配置 // 设置是否订阅远端线程池配置

@ -29,7 +29,7 @@ public class ThreadPoolDynamicRefresh {
public static void refreshDynamicPool(PoolParameterInfo parameter) { public static void refreshDynamicPool(PoolParameterInfo parameter) {
String threadPoolId = parameter.getTpId(); String threadPoolId = parameter.getTpId();
ThreadPoolExecutor executor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getPool(); ThreadPoolExecutor executor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getExecutor();
int originalCoreSize = executor.getCorePoolSize(); int originalCoreSize = executor.getCorePoolSize();
int originalMaximumPoolSize = executor.getMaximumPoolSize(); int originalMaximumPoolSize = executor.getMaximumPoolSize();
@ -39,7 +39,7 @@ public class ThreadPoolDynamicRefresh {
String originalRejected = executor.getRejectedExecutionHandler().getClass().getSimpleName(); String originalRejected = executor.getRejectedExecutionHandler().getClass().getSimpleName();
changePoolInfo(executor, parameter); changePoolInfo(executor, parameter);
ThreadPoolExecutor afterExecutor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getPool(); ThreadPoolExecutor afterExecutor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getExecutor();
log.info("[🔥 {}] Changed thread pool. \ncoreSize :: [{}], maxSize :: [{}], queueType :: [{}], capacity :: [{}], keepAliveTime :: [{}], rejectedType :: [{}]", log.info("[🔥 {}] Changed thread pool. \ncoreSize :: [{}], maxSize :: [{}], queueType :: [{}], capacity :: [{}], keepAliveTime :: [{}], rejectedType :: [{}]",
threadPoolId.toUpperCase(), threadPoolId.toUpperCase(),

@ -36,19 +36,19 @@ public class ThreadPoolRunStateHandler {
public static PoolRunStateInfo getPoolRunState(String tpId) { public static PoolRunStateInfo getPoolRunState(String tpId) {
DynamicThreadPoolWrapper executorService = GlobalThreadPoolManage.getExecutorService(tpId); DynamicThreadPoolWrapper executorService = GlobalThreadPoolManage.getExecutorService(tpId);
ThreadPoolExecutor pool = executorService.getPool(); ThreadPoolExecutor pool = executorService.getExecutor();
// 核心线程数 // 核心线程数
int corePoolSize = pool.getCorePoolSize(); int corePoolSize = pool.getCorePoolSize();
// 最大线程数 // 最大线程数
int maximumPoolSize = pool.getMaximumPoolSize(); int maximumPoolSize = pool.getMaximumPoolSize();
// 线程池当前线程数 // 线程池当前线程数 (有锁)
int poolSize = pool.getPoolSize(); int poolSize = pool.getPoolSize();
// 活跃线程数 // 活跃线程数 (有锁)
int activeCount = pool.getActiveCount(); int activeCount = pool.getActiveCount();
// 同时进入池中的最大线程数 // 同时进入池中的最大线程数 (有锁)
int largestPoolSize = pool.getLargestPoolSize(); int largestPoolSize = pool.getLargestPoolSize();
// 线程池中执行任务总数量 // 线程池中执行任务总数量 (有锁)
long completedTaskCount = pool.getCompletedTaskCount(); long completedTaskCount = pool.getCompletedTaskCount();
// 当前负载 // 当前负载
String currentLoad = CalculateUtil.divide(activeCount, maximumPoolSize) + "%"; String currentLoad = CalculateUtil.divide(activeCount, maximumPoolSize) + "%";

@ -1,11 +1,11 @@
package cn.hippo4j.starter.wrapper; package cn.hippo4j.starter.wrapper;
import cn.hippo4j.starter.common.CommonDynamicThreadPool; import cn.hippo4j.starter.common.CommonDynamicThreadPool;
import cn.hippo4j.starter.core.DynamicThreadPoolExecutor;
import lombok.Data; import lombok.Data;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
/** /**
* Dynamic threadPool wrapper. * Dynamic threadPool wrapper.
@ -24,7 +24,7 @@ public class DynamicThreadPoolWrapper {
private boolean subscribeFlag; private boolean subscribeFlag;
private DynamicThreadPoolExecutor pool; private ThreadPoolExecutor executor;
/** /**
* 线, 使线 {@link CommonDynamicThreadPool#getInstance(String)} * 线, 使线 {@link CommonDynamicThreadPool#getInstance(String)}
@ -41,9 +41,9 @@ public class DynamicThreadPoolWrapper {
* @param threadPoolId * @param threadPoolId
* @param threadPoolExecutor * @param threadPoolExecutor
*/ */
public DynamicThreadPoolWrapper(String threadPoolId, DynamicThreadPoolExecutor threadPoolExecutor) { public DynamicThreadPoolWrapper(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) {
this.tpId = threadPoolId; this.tpId = threadPoolId;
this.pool = threadPoolExecutor; this.executor = threadPoolExecutor;
} }
/** /**
@ -52,7 +52,7 @@ public class DynamicThreadPoolWrapper {
* @param command * @param command
*/ */
public void execute(Runnable command) { public void execute(Runnable command) {
pool.execute(command); executor.execute(command);
} }
/** /**
@ -62,7 +62,7 @@ public class DynamicThreadPoolWrapper {
* @return * @return
*/ */
public Future<?> submit(Runnable task) { public Future<?> submit(Runnable task) {
return pool.submit(task); return executor.submit(task);
} }
/** /**
@ -73,7 +73,7 @@ public class DynamicThreadPoolWrapper {
* @return * @return
*/ */
public <T> Future<T> submit(Callable<T> task) { public <T> Future<T> submit(Callable<T> task) {
return pool.submit(task); return executor.submit(task);
} }
} }

Loading…
Cancel
Save