diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolPostProcessor.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolPostProcessor.java index 344b809d..effeb8c9 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolPostProcessor.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolPostProcessor.java @@ -51,10 +51,7 @@ import java.util.concurrent.TimeUnit; import static cn.hippo4j.common.constant.Constants.*; /** - * Dynamic threadPool post processor. - * - * @author chen.ma - * @date 2021/8/2 20:40 + * Dynamic thread-pool post processor. */ @Slf4j @AllArgsConstructor @@ -68,18 +65,6 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { private final ServerThreadPoolDynamicRefresh threadPoolDynamicRefresh; - private final ExecutorService executorService = ThreadPoolBuilder.builder() - .corePoolSize(2) - .maxPoolNum(4) - .keepAliveTime(2000) - .timeUnit(TimeUnit.MILLISECONDS) - .workQueue(QueueTypeEnum.ARRAY_BLOCKING_QUEUE) - .capacity(1024) - .allowCoreThreadTimeOut(true) - .threadFactory("client.dynamic.threadPool.change.config") - .rejected(new ThreadPoolExecutor.AbortPolicy()) - .build(); - @Override public Object postProcessBeforeInitialization(Object bean, String beanName) { return bean; @@ -102,8 +87,8 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { log.error("Failed to create dynamic thread pool in annotation mode.", ex); return bean; } - DynamicThreadPoolExecutor dynamicExecutor = (DynamicThreadPoolExecutor) bean; - DynamicThreadPoolWrapper wrap = new DynamicThreadPoolWrapper(dynamicExecutor.getThreadPoolId(), dynamicExecutor); + DynamicThreadPoolExecutor dynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) bean; + DynamicThreadPoolWrapper wrap = new DynamicThreadPoolWrapper(dynamicThreadPoolExecutor.getThreadPoolId(), dynamicThreadPoolExecutor); ThreadPoolExecutor remoteExecutor = fillPoolAndRegister(wrap); subscribeConfig(wrap); return remoteExecutor; @@ -136,49 +121,47 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { queryStrMap.put(TP_ID, tpId); queryStrMap.put(ITEM_ID, properties.getItemId()); queryStrMap.put(NAMESPACE, properties.getNamespace()); - - Result result; boolean isSubscribe = false; - ThreadPoolExecutor newDynamicPoolExecutor = null; - ThreadPoolParameterInfo ppi = new ThreadPoolParameterInfo(); + ThreadPoolExecutor newDynamicThreadPoolExecutor = null; + ThreadPoolParameterInfo threadPoolParameterInfo = new ThreadPoolParameterInfo(); try { - result = httpAgent.httpGetByConfig(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 5000L); + Result result = httpAgent.httpGetByConfig(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 5000L); if (result.isSuccess() && result.getData() != null) { String resultJsonStr = JSONUtil.toJSONString(result.getData()); - if ((ppi = JSONUtil.parseObject(resultJsonStr, ThreadPoolParameterInfo.class)) != null) { + if ((threadPoolParameterInfo = JSONUtil.parseObject(resultJsonStr, ThreadPoolParameterInfo.class)) != null) { // Create a thread pool with relevant parameters. - BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(ppi.getQueueType(), ppi.getCapacity()); - newDynamicPoolExecutor = ThreadPoolBuilder.builder() + BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(threadPoolParameterInfo.getQueueType(), threadPoolParameterInfo.getCapacity()); + newDynamicThreadPoolExecutor = ThreadPoolBuilder.builder() .dynamicPool() .workQueue(workQueue) .threadFactory(tpId) - .poolThreadSize(ppi.getCoreSize(), ppi.getMaxSize()) - .keepAliveTime(ppi.getKeepAliveTime(), TimeUnit.SECONDS) - .rejected(RejectedTypeEnum.createPolicy(ppi.getRejectedType())) - .allowCoreThreadTimeOut(EnableEnum.getBool(ppi.getAllowCoreThreadTimeOut())) + .poolThreadSize(threadPoolParameterInfo.corePoolSizeAdapt(), threadPoolParameterInfo.maximumPoolSizeAdapt()) + .keepAliveTime(threadPoolParameterInfo.getKeepAliveTime(), TimeUnit.SECONDS) + .rejected(RejectedTypeEnum.createPolicy(threadPoolParameterInfo.getRejectedType())) + .allowCoreThreadTimeOut(EnableEnum.getBool(threadPoolParameterInfo.getAllowCoreThreadTimeOut())) .build(); // Set dynamic thread pool enhancement parameters. if (dynamicThreadPoolWrap.getExecutor() instanceof AbstractDynamicExecutorSupport) { ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm( - BooleanUtil.toBoolean(ppi.getIsAlarm().toString()), - ppi.getCapacityAlarm(), - ppi.getLivenessAlarm()); + BooleanUtil.toBoolean(threadPoolParameterInfo.getIsAlarm().toString()), + threadPoolParameterInfo.getCapacityAlarm(), + threadPoolParameterInfo.getLivenessAlarm()); GlobalNotifyAlarmManage.put(tpId, threadPoolNotifyAlarm); TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).getTaskDecorator(); - ((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskDecorator(taskDecorator); + ((DynamicThreadPoolExecutor) newDynamicThreadPoolExecutor).setTaskDecorator(taskDecorator); long awaitTerminationMillis = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).awaitTerminationMillis; boolean waitForTasksToCompleteOnShutdown = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).waitForTasksToCompleteOnShutdown; - ((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown); + ((DynamicThreadPoolExecutor) newDynamicThreadPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown); long executeTimeOut = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).getExecuteTimeOut(); - ((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setExecuteTimeOut(executeTimeOut); + ((DynamicThreadPoolExecutor) newDynamicThreadPoolExecutor).setExecuteTimeOut(executeTimeOut); } - dynamicThreadPoolWrap.setExecutor(newDynamicPoolExecutor); + dynamicThreadPoolWrap.setExecutor(newDynamicThreadPoolExecutor); isSubscribe = true; } } } catch (Exception ex) { - newDynamicPoolExecutor = dynamicThreadPoolWrap.getExecutor() != null ? dynamicThreadPoolWrap.getExecutor() : CommonDynamicThreadPool.getInstance(tpId); - dynamicThreadPoolWrap.setExecutor(newDynamicPoolExecutor); + newDynamicThreadPoolExecutor = dynamicThreadPoolWrap.getExecutor() != null ? dynamicThreadPoolWrap.getExecutor() : CommonDynamicThreadPool.getInstance(tpId); + dynamicThreadPoolWrap.setExecutor(newDynamicThreadPoolExecutor); log.error("Failed to initialize thread pool configuration. error message :: {}", ex.getMessage()); } finally { if (Objects.isNull(dynamicThreadPoolWrap.getExecutor())) { @@ -187,10 +170,22 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { // Set whether to subscribe to the remote thread pool configuration. dynamicThreadPoolWrap.setSubscribeFlag(isSubscribe); } - GlobalThreadPoolManage.register(dynamicThreadPoolWrap.getThreadPoolId(), ppi, dynamicThreadPoolWrap); - return newDynamicPoolExecutor; + GlobalThreadPoolManage.register(dynamicThreadPoolWrap.getThreadPoolId(), threadPoolParameterInfo, dynamicThreadPoolWrap); + return newDynamicThreadPoolExecutor; } + private final ExecutorService configRefreshExecutorService = ThreadPoolBuilder.builder() + .corePoolSize(2) + .maxPoolNum(4) + .keepAliveTime(2000) + .timeUnit(TimeUnit.MILLISECONDS) + .workQueue(QueueTypeEnum.ARRAY_BLOCKING_QUEUE) + .capacity(1024) + .allowCoreThreadTimeOut(true) + .threadFactory("client.dynamic.threadPool.change.config") + .rejected(new ThreadPoolExecutor.AbortPolicy()) + .build(); + /** * Client dynamic thread pool subscription server configuration. * @@ -198,7 +193,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { */ protected void subscribeConfig(DynamicThreadPoolWrapper dynamicThreadPoolWrap) { if (dynamicThreadPoolWrap.isSubscribeFlag()) { - threadPoolOperation.subscribeConfig(dynamicThreadPoolWrap.getThreadPoolId(), executorService, config -> threadPoolDynamicRefresh.dynamicRefresh(config)); + threadPoolOperation.subscribeConfig(dynamicThreadPoolWrap.getThreadPoolId(), configRefreshExecutorService, config -> threadPoolDynamicRefresh.dynamicRefresh(config)); } } }