创建动态线程池增强参数未设置问题修复. (#78)

pull/84/head
chen.ma 3 years ago
parent a31878d755
commit 135889e8d1

@ -20,9 +20,9 @@ public abstract class DynamicExecutorConfigurationSupport extends ThreadPoolExec
private ExecutorService executor; private ExecutorService executor;
private long awaitTerminationMillis = 0; protected long awaitTerminationMillis;
private boolean waitForTasksToCompleteOnShutdown = false; protected boolean waitForTasksToCompleteOnShutdown;
public DynamicExecutorConfigurationSupport(int corePoolSize, public DynamicExecutorConfigurationSupport(int corePoolSize,
int maximumPoolSize, int maximumPoolSize,
@ -81,6 +81,17 @@ public abstract class DynamicExecutorConfigurationSupport extends ThreadPoolExec
this.executor = initializeExecutor(); this.executor = initializeExecutor();
} }
/**
* Set support param.
*
* @param awaitTerminationMillis
* @param waitForTasksToCompleteOnShutdown
*/
public void setSupportParam(long awaitTerminationMillis, boolean waitForTasksToCompleteOnShutdown) {
this.awaitTerminationMillis = awaitTerminationMillis;
this.waitForTasksToCompleteOnShutdown = waitForTasksToCompleteOnShutdown;
}
/** /**
* Perform a shutdown on the underlying ExecutorService. * Perform a shutdown on the underlying ExecutorService.
* *

@ -116,7 +116,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
Result result; Result result;
boolean isSubscribe = false; boolean isSubscribe = false;
ThreadPoolExecutor poolExecutor = null; ThreadPoolExecutor newDynamicPoolExecutor = null;
PoolParameterInfo ppi = new PoolParameterInfo(); PoolParameterInfo ppi = new PoolParameterInfo();
try { try {
@ -126,7 +126,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
if ((ppi = JSONUtil.parseObject(resultJsonStr, PoolParameterInfo.class)) != null) { if ((ppi = JSONUtil.parseObject(resultJsonStr, PoolParameterInfo.class)) != null) {
// 使用相关参数创建线程池 // 使用相关参数创建线程池
BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(ppi.getQueueType(), ppi.getCapacity()); BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(ppi.getQueueType(), ppi.getCapacity());
poolExecutor = ThreadPoolBuilder.builder() newDynamicPoolExecutor = ThreadPoolBuilder.builder()
.dynamicPool() .dynamicPool()
.workQueue(workQueue) .workQueue(workQueue)
.threadFactory(tpId) .threadFactory(tpId)
@ -137,18 +137,23 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
.allowCoreThreadTimeOut(EnableEnum.getBool(ppi.getAllowCoreThreadTimeOut())) .allowCoreThreadTimeOut(EnableEnum.getBool(ppi.getAllowCoreThreadTimeOut()))
.build(); .build();
// 设置动态线程池增强参数
if (dynamicThreadPoolWrap.getExecutor() instanceof DynamicExecutorConfigurationSupport) { if (dynamicThreadPoolWrap.getExecutor() instanceof DynamicExecutorConfigurationSupport) {
TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).getTaskDecorator(); TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).getTaskDecorator();
((DynamicThreadPoolExecutor) poolExecutor).setTaskDecorator(taskDecorator); ((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskDecorator(taskDecorator);
long awaitTerminationMillis = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).awaitTerminationMillis;
boolean waitForTasksToCompleteOnShutdown = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).waitForTasksToCompleteOnShutdown;
((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown);
} }
dynamicThreadPoolWrap.setExecutor(poolExecutor); dynamicThreadPoolWrap.setExecutor(newDynamicPoolExecutor);
isSubscribe = true; isSubscribe = true;
} }
} }
} catch (Exception ex) { } catch (Exception ex) {
poolExecutor = dynamicThreadPoolWrap.getExecutor() != null ? dynamicThreadPoolWrap.getExecutor() : CommonDynamicThreadPool.getInstance(tpId); newDynamicPoolExecutor = dynamicThreadPoolWrap.getExecutor() != null ? dynamicThreadPoolWrap.getExecutor() : CommonDynamicThreadPool.getInstance(tpId);
dynamicThreadPoolWrap.setExecutor(poolExecutor); dynamicThreadPoolWrap.setExecutor(newDynamicPoolExecutor);
log.error("Failed to initialize thread pool configuration. error message :: {}", ex.getMessage()); log.error("Failed to initialize thread pool configuration. error message :: {}", ex.getMessage());
} finally { } finally {
@ -161,9 +166,14 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
} }
GlobalThreadPoolManage.register(dynamicThreadPoolWrap.getTpId(), ppi, dynamicThreadPoolWrap); GlobalThreadPoolManage.register(dynamicThreadPoolWrap.getTpId(), ppi, dynamicThreadPoolWrap);
return poolExecutor; return newDynamicPoolExecutor;
} }
/**
* Client dynamic thread pool subscription server configuration.
*
* @param dynamicThreadPoolWrap
*/
protected void subscribeConfig(DynamicThreadPoolWrapper dynamicThreadPoolWrap) { protected void subscribeConfig(DynamicThreadPoolWrapper dynamicThreadPoolWrap) {
if (dynamicThreadPoolWrap.isSubscribeFlag()) { if (dynamicThreadPoolWrap.isSubscribeFlag()) {
threadPoolOperation.subscribeConfig(dynamicThreadPoolWrap.getTpId(), executorService, config -> ThreadPoolDynamicRefresh.refreshDynamicPool(config)); threadPoolOperation.subscribeConfig(dynamicThreadPoolWrap.getTpId(), executorService, config -> ThreadPoolDynamicRefresh.refreshDynamicPool(config));

Loading…
Cancel
Save