From 135889e8d157394ee0ba9aa42b6ee1518c29cd18 Mon Sep 17 00:00:00 2001 From: "chen.ma" Date: Sun, 16 Jan 2022 12:41:25 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=9B=E5=BB=BA=E5=8A=A8=E6=80=81=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E6=B1=A0=E5=A2=9E=E5=BC=BA=E5=8F=82=E6=95=B0=E6=9C=AA?= =?UTF-8?q?=E8=AE=BE=E7=BD=AE=E9=97=AE=E9=A2=98=E4=BF=AE=E5=A4=8D.=20(#78)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../DynamicExecutorConfigurationSupport.java | 15 ++++++++++-- .../core/DynamicThreadPoolPostProcessor.java | 24 +++++++++++++------ 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DynamicExecutorConfigurationSupport.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DynamicExecutorConfigurationSupport.java index ddc3d6a7..a80ba92c 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DynamicExecutorConfigurationSupport.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DynamicExecutorConfigurationSupport.java @@ -20,9 +20,9 @@ public abstract class DynamicExecutorConfigurationSupport extends ThreadPoolExec private ExecutorService executor; - private long awaitTerminationMillis = 0; + protected long awaitTerminationMillis; - private boolean waitForTasksToCompleteOnShutdown = false; + protected boolean waitForTasksToCompleteOnShutdown; public DynamicExecutorConfigurationSupport(int corePoolSize, int maximumPoolSize, @@ -81,6 +81,17 @@ public abstract class DynamicExecutorConfigurationSupport extends ThreadPoolExec this.executor = initializeExecutor(); } + /** + * Set support param. + * + * @param awaitTerminationMillis + * @param waitForTasksToCompleteOnShutdown + */ + public void setSupportParam(long awaitTerminationMillis, boolean waitForTasksToCompleteOnShutdown) { + this.awaitTerminationMillis = awaitTerminationMillis; + this.waitForTasksToCompleteOnShutdown = waitForTasksToCompleteOnShutdown; + } + /** * Perform a shutdown on the underlying ExecutorService. * diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DynamicThreadPoolPostProcessor.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DynamicThreadPoolPostProcessor.java index 98d385ff..c641e109 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DynamicThreadPoolPostProcessor.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/DynamicThreadPoolPostProcessor.java @@ -116,7 +116,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { Result result; boolean isSubscribe = false; - ThreadPoolExecutor poolExecutor = null; + ThreadPoolExecutor newDynamicPoolExecutor = null; PoolParameterInfo ppi = new PoolParameterInfo(); try { @@ -126,7 +126,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { if ((ppi = JSONUtil.parseObject(resultJsonStr, PoolParameterInfo.class)) != null) { // 使用相关参数创建线程池 BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(ppi.getQueueType(), ppi.getCapacity()); - poolExecutor = ThreadPoolBuilder.builder() + newDynamicPoolExecutor = ThreadPoolBuilder.builder() .dynamicPool() .workQueue(workQueue) .threadFactory(tpId) @@ -137,18 +137,23 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { .allowCoreThreadTimeOut(EnableEnum.getBool(ppi.getAllowCoreThreadTimeOut())) .build(); + // 设置动态线程池增强参数 if (dynamicThreadPoolWrap.getExecutor() instanceof DynamicExecutorConfigurationSupport) { TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).getTaskDecorator(); - ((DynamicThreadPoolExecutor) poolExecutor).setTaskDecorator(taskDecorator); + ((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskDecorator(taskDecorator); + + long awaitTerminationMillis = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).awaitTerminationMillis; + boolean waitForTasksToCompleteOnShutdown = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).waitForTasksToCompleteOnShutdown; + ((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown); } - dynamicThreadPoolWrap.setExecutor(poolExecutor); + dynamicThreadPoolWrap.setExecutor(newDynamicPoolExecutor); isSubscribe = true; } } } catch (Exception ex) { - poolExecutor = dynamicThreadPoolWrap.getExecutor() != null ? dynamicThreadPoolWrap.getExecutor() : CommonDynamicThreadPool.getInstance(tpId); - dynamicThreadPoolWrap.setExecutor(poolExecutor); + newDynamicPoolExecutor = dynamicThreadPoolWrap.getExecutor() != null ? dynamicThreadPoolWrap.getExecutor() : CommonDynamicThreadPool.getInstance(tpId); + dynamicThreadPoolWrap.setExecutor(newDynamicPoolExecutor); log.error("Failed to initialize thread pool configuration. error message :: {}", ex.getMessage()); } finally { @@ -161,9 +166,14 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { } GlobalThreadPoolManage.register(dynamicThreadPoolWrap.getTpId(), ppi, dynamicThreadPoolWrap); - return poolExecutor; + return newDynamicPoolExecutor; } + /** + * Client dynamic thread pool subscription server configuration. + * + * @param dynamicThreadPoolWrap + */ protected void subscribeConfig(DynamicThreadPoolWrapper dynamicThreadPoolWrap) { if (dynamicThreadPoolWrap.isSubscribeFlag()) { threadPoolOperation.subscribeConfig(dynamicThreadPoolWrap.getTpId(), executorService, config -> ThreadPoolDynamicRefresh.refreshDynamicPool(config));