From 7459495757d290bb4dc28e800a3ce6934061b138 Mon Sep 17 00:00:00 2001 From: "chen.ma" Date: Sat, 27 Aug 2022 22:30:37 +0800 Subject: [PATCH] When defining a dynamic thread pool, abstract the default configuration (#572) --- .../config/BootstrapConfigProperties.java | 28 +--- .../DynamicThreadPoolNotifyProperties.java | 2 +- .../notify/CoreNotifyConfigBuilder.java | 41 ++--- .../DynamicThreadPoolRefreshListener.java | 123 +++++++------- .../DynamicThreadPoolPostProcessor.java | 154 ++++++++++-------- 5 files changed, 179 insertions(+), 169 deletions(-) diff --git a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/BootstrapConfigProperties.java b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/BootstrapConfigProperties.java index 54159c8d..430786b1 100644 --- a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/BootstrapConfigProperties.java +++ b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/BootstrapConfigProperties.java @@ -107,43 +107,23 @@ public class BootstrapConfigProperties implements BootstrapPropertiesInterface { */ private List notifyPlatforms; - /** - * Whether to enable thread pool running alarm. - */ - private Boolean alarm = Boolean.TRUE; - /** * Check thread pool running status interval. */ private Integer checkStateInterval; /** - * Active alarm. - */ - private Integer activeAlarm; - - /** - * Capacity alarm. - */ - private Integer capacityAlarm; - - /** - * Thread pool run alarm interval. unit: s - */ - private Integer alarmInterval; - - /** - * Receives. + * Default dynamic thread pool configuration. */ - private String receives; + private ExecutorProperties defaultExecutor; /** - * Executors. + * Dynamic thread pool configuration collection. */ private List executors; /** - * Adapter executors + * Tripartite framework thread pool adaptation set. */ private List adapterExecutors; } diff --git a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/DynamicThreadPoolNotifyProperties.java b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/DynamicThreadPoolNotifyProperties.java index 0f132f0a..5ca1836b 100644 --- a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/DynamicThreadPoolNotifyProperties.java +++ b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/config/DynamicThreadPoolNotifyProperties.java @@ -30,7 +30,7 @@ import lombok.NoArgsConstructor; public class DynamicThreadPoolNotifyProperties { /** - * Interval + * Thread pool run alarm interval. unit: s */ private Integer interval; diff --git a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/notify/CoreNotifyConfigBuilder.java b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/notify/CoreNotifyConfigBuilder.java index fd592577..ecbaeb8c 100644 --- a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/notify/CoreNotifyConfigBuilder.java +++ b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/notify/CoreNotifyConfigBuilder.java @@ -45,13 +45,13 @@ public class CoreNotifyConfigBuilder implements NotifyConfigBuilder { private final AlarmControlHandler alarmControlHandler; - private final BootstrapConfigProperties bootstrapConfigProperties; + private final BootstrapConfigProperties configProperties; @Override public Map> buildNotify() { Map> resultMap = Maps.newHashMap(); - boolean globalAlarm = bootstrapConfigProperties.getAlarm(); - List executors = bootstrapConfigProperties.getExecutors(); + boolean globalAlarm = Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getAlarm()).orElse(true); + List executors = configProperties.getExecutors(); if (CollectionUtil.isEmpty(executors)) { log.warn("Failed to build notify, executors configuration is empty."); return resultMap; @@ -60,8 +60,8 @@ public class CoreNotifyConfigBuilder implements NotifyConfigBuilder { if (!globalAlarm && CollectionUtil.isEmpty(actual)) { return resultMap; } - for (ExecutorProperties executor : executors) { - Map> buildSingleNotifyConfig = buildSingleNotifyConfig(executor); + for (ExecutorProperties executorProperties : executors) { + Map> buildSingleNotifyConfig = buildSingleNotifyConfig(executorProperties); initCacheAndLock(buildSingleNotifyConfig); resultMap.putAll(buildSingleNotifyConfig); } @@ -71,15 +71,15 @@ public class CoreNotifyConfigBuilder implements NotifyConfigBuilder { /** * Build single notify config. * - * @param executor + * @param executorProperties * @return */ - public Map> buildSingleNotifyConfig(ExecutorProperties executor) { + public Map> buildSingleNotifyConfig(ExecutorProperties executorProperties) { Map> resultMap = Maps.newHashMap(); - String threadPoolId = executor.getThreadPoolId(); + String threadPoolId = executorProperties.getThreadPoolId(); String alarmBuildKey = threadPoolId + "+ALARM"; List alarmNotifyConfigs = Lists.newArrayList(); - List notifyPlatforms = bootstrapConfigProperties.getNotifyPlatforms(); + List notifyPlatforms = configProperties.getNotifyPlatforms(); for (NotifyPlatformProperties platformProperties : notifyPlatforms) { NotifyConfigDTO notifyConfig = new NotifyConfigDTO(); notifyConfig.setPlatform(platformProperties.getPlatform()); @@ -87,11 +87,11 @@ public class CoreNotifyConfigBuilder implements NotifyConfigBuilder { notifyConfig.setType("ALARM"); notifyConfig.setSecret(platformProperties.getSecret()); notifyConfig.setSecretKey(getToken(platformProperties)); - int interval = Optional.ofNullable(executor.getNotify()) + int interval = Optional.ofNullable(executorProperties.getNotify()) .map(each -> each.getInterval()) - .orElseGet(() -> bootstrapConfigProperties.getAlarmInterval() != null ? bootstrapConfigProperties.getAlarmInterval() : 5); + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getNotify()).map(each -> each.getInterval()).orElse(5)); notifyConfig.setInterval(interval); - notifyConfig.setReceives(buildReceive(executor, platformProperties)); + notifyConfig.setReceives(buildReceive(executorProperties)); alarmNotifyConfigs.add(notifyConfig); } resultMap.put(alarmBuildKey, alarmNotifyConfigs); @@ -104,7 +104,7 @@ public class CoreNotifyConfigBuilder implements NotifyConfigBuilder { notifyConfig.setType("CONFIG"); notifyConfig.setSecretKey(getToken(platformProperties)); notifyConfig.setSecret(platformProperties.getSecret()); - notifyConfig.setReceives(buildReceive(executor, platformProperties)); + notifyConfig.setReceives(buildReceive(executorProperties)); changeNotifyConfigs.add(notifyConfig); } resultMap.put(changeBuildKey, changeNotifyConfigs); @@ -118,17 +118,12 @@ public class CoreNotifyConfigBuilder implements NotifyConfigBuilder { .forEach(each -> alarmControlHandler.initCacheAndLock(each.getTpId(), each.getPlatform(), each.getInterval()))); } - private String buildReceive(ExecutorProperties executor, NotifyPlatformProperties platformProperties) { - String receive; - if (executor.getNotify() != null) { - receive = executor.getNotify().getReceives(); - if (StrUtil.isBlank(receive)) { - receive = bootstrapConfigProperties.getReceives(); - } - } else { - receive = bootstrapConfigProperties.getReceives(); + private String buildReceive(ExecutorProperties executorProperties) { + String receives = Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getNotify()).map(each -> each.getReceives()).orElse(""); + if (executorProperties.getNotify() != null && StringUtil.isNotEmpty(executorProperties.getNotify().getReceives())) { + receives = executorProperties.getNotify().getReceives(); } - return receive; + return receives; } private String getToken(NotifyPlatformProperties platformProperties) { diff --git a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java index 09442e8b..6f5f0d1b 100644 --- a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java +++ b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/refresher/event/DynamicThreadPoolRefreshListener.java @@ -17,15 +17,15 @@ package cn.hippo4j.core.springboot.starter.refresher.event; +import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum; +import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum; +import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue; import cn.hippo4j.common.toolkit.CollectionUtil; import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport; -import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum; -import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum; -import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue; import cn.hippo4j.core.proxy.RejectedProxyUtil; import cn.hippo4j.core.springboot.starter.config.BootstrapConfigProperties; import cn.hippo4j.core.springboot.starter.config.ExecutorProperties; @@ -44,6 +44,7 @@ import org.springframework.core.annotation.Order; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -68,40 +69,52 @@ public class DynamicThreadPoolRefreshListener implements ApplicationListener executors = bindableConfigProperties.getExecutors(); for (ExecutorProperties properties : executors) { String threadPoolId = properties.getThreadPoolId(); - // Check whether the notification configuration is consistent. - // this operation will not trigger the notification. + /** + * Check whether the notification configuration is consistent, this operation will not trigger the notification. + */ checkNotifyConsistencyAndReplace(properties); if (!checkConsistency(threadPoolId, properties)) { continue; } - // refresh executor pool. dynamicRefreshPool(threadPoolId, properties); - // old properties. ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId()); - // refresh executor properties. - GlobalCoreThreadPoolManage.refresh(threadPoolId, properties); + GlobalCoreThreadPoolManage.refresh(threadPoolId, failDefaultExecutorProperties(beforeProperties, properties)); + ChangeParameterNotifyRequest changeRequest = buildChangeRequest(beforeProperties, properties); log.info(CHANGE_THREAD_POOL_TEXT, threadPoolId, - String.format(CHANGE_DELIMITER, beforeProperties.getCorePoolSize(), properties.getCorePoolSize()), - String.format(CHANGE_DELIMITER, beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize()), - String.format(CHANGE_DELIMITER, beforeProperties.getQueueCapacity(), properties.getQueueCapacity()), - String.format(CHANGE_DELIMITER, beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime()), - String.format(CHANGE_DELIMITER, beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut()), - String.format(CHANGE_DELIMITER, beforeProperties.getRejectedHandler(), properties.getRejectedHandler()), - String.format(CHANGE_DELIMITER, beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut())); + String.format(CHANGE_DELIMITER, beforeProperties.getCorePoolSize(), changeRequest.getNowCorePoolSize()), + String.format(CHANGE_DELIMITER, beforeProperties.getMaximumPoolSize(), changeRequest.getNowMaximumPoolSize()), + String.format(CHANGE_DELIMITER, beforeProperties.getQueueCapacity(), changeRequest.getNowQueueCapacity()), + String.format(CHANGE_DELIMITER, beforeProperties.getKeepAliveTime(), changeRequest.getNowKeepAliveTime()), + String.format(CHANGE_DELIMITER, beforeProperties.getExecuteTimeOut(), changeRequest.getNowExecuteTimeOut()), + String.format(CHANGE_DELIMITER, beforeProperties.getRejectedHandler(), changeRequest.getNowRejectedName()), + String.format(CHANGE_DELIMITER, beforeProperties.getAllowCoreThreadTimeOut(), changeRequest.getNowAllowsCoreThreadTimeOut())); try { - threadPoolNotifyAlarmHandler.sendPoolConfigChange(newChangeRequest(beforeProperties, properties)); + threadPoolNotifyAlarmHandler.sendPoolConfigChange(changeRequest); } catch (Throwable ex) { log.error("Failed to send Chang smart application listener notice. Message: {}", ex.getMessage()); } } } + private ExecutorProperties failDefaultExecutorProperties(ExecutorProperties beforeProperties, ExecutorProperties properties) { + return ExecutorProperties.builder() + .corePoolSize(Optional.ofNullable(properties.getCorePoolSize()).orElse(beforeProperties.getCorePoolSize())) + .maximumPoolSize(Optional.ofNullable(properties.getMaximumPoolSize()).orElse(beforeProperties.getMaximumPoolSize())) + .queueCapacity(Optional.ofNullable(properties.getQueueCapacity()).orElse(beforeProperties.getQueueCapacity())) + .keepAliveTime(Optional.ofNullable(properties.getKeepAliveTime()).orElse(beforeProperties.getKeepAliveTime())) + .executeTimeOut(Optional.ofNullable(properties.getExecuteTimeOut()).orElse(beforeProperties.getExecuteTimeOut())) + .rejectedHandler(Optional.ofNullable(properties.getRejectedHandler()).orElse(beforeProperties.getRejectedHandler())) + .allowCoreThreadTimeOut(Optional.ofNullable(properties.getAllowCoreThreadTimeOut()).orElse(beforeProperties.getAllowCoreThreadTimeOut())) + .threadPoolId(beforeProperties.getThreadPoolId()) + .build(); + } + /** * Construct change parameter notify request instance. * @@ -109,23 +122,23 @@ public class DynamicThreadPoolRefreshListener implements ApplicationListener changeKeys = Lists.newArrayList(); - Map> newDynamicThreadPoolNotifyMap = coreNotifyConfigBuilder.buildSingleNotifyConfig(properties); + Map> newDynamicThreadPoolNotifyMap = coreNotifyConfigBuilder.buildSingleNotifyConfig(executorProperties); Map> notifyConfigs = hippo4jBaseSendMessageService.getNotifyConfigs(); if (CollectionUtil.isNotEmpty(notifyConfigs)) { for (Map.Entry> each : newDynamicThreadPoolNotifyMap.entrySet()) { @@ -161,22 +174,22 @@ public class DynamicThreadPoolRefreshListener implements ApplicationListener queue = (ResizableCapacityLinkedBlockingQueue) executor.getQueue(); diff --git a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolPostProcessor.java b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolPostProcessor.java index f395200e..29443dcc 100644 --- a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolPostProcessor.java +++ b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolPostProcessor.java @@ -26,7 +26,8 @@ import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; -import cn.hippo4j.core.executor.support.*; +import cn.hippo4j.core.executor.support.CommonDynamicThreadPool; +import cn.hippo4j.core.executor.support.ThreadPoolBuilder; import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapterChoose; import cn.hippo4j.core.springboot.starter.config.BootstrapConfigProperties; import cn.hippo4j.core.springboot.starter.config.DynamicThreadPoolNotifyProperties; @@ -52,7 +53,7 @@ import java.util.concurrent.TimeUnit; @AllArgsConstructor public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { - private final BootstrapConfigProperties bootstrapConfigProperties; + private final BootstrapConfigProperties configProperties; @Override public Object postProcessBeforeInitialization(Object bean, String beanName) { @@ -87,20 +88,11 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { } if (bean instanceof DynamicThreadPoolWrapper) { DynamicThreadPoolWrapper wrap = (DynamicThreadPoolWrapper) bean; - registerAndSubscribe(wrap); + fillPoolAndRegister(wrap); } return bean; } - /** - * Register and subscribe. - * - * @param dynamicThreadPoolWrap - */ - protected void registerAndSubscribe(DynamicThreadPoolWrapper dynamicThreadPoolWrap) { - fillPoolAndRegister(dynamicThreadPoolWrap); - } - /** * Fill the thread pool and register. * @@ -110,57 +102,30 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { String threadPoolId = dynamicThreadPoolWrapper.getThreadPoolId(); ThreadPoolExecutor newDynamicPoolExecutor = dynamicThreadPoolWrapper.getExecutor(); ExecutorProperties executorProperties = null; - if (null != bootstrapConfigProperties.getExecutors()) { - executorProperties = bootstrapConfigProperties.getExecutors() + if (configProperties.getExecutors() != null) { + executorProperties = configProperties.getExecutors() .stream() .filter(each -> Objects.equals(threadPoolId, each.getThreadPoolId())) .findFirst() - .orElse(null); - if (executorProperties != null) { - try { - BlockingQueue workQueue = BlockingQueueTypeEnum.createBlockingQueue(executorProperties.getBlockingQueue(), executorProperties.getQueueCapacity()); - String threadNamePrefix = executorProperties.getThreadNamePrefix(); - newDynamicPoolExecutor = ThreadPoolBuilder.builder() - .dynamicPool() - .workQueue(workQueue) - .threadFactory(StringUtil.isNotBlank(threadNamePrefix) ? threadNamePrefix : threadPoolId) - .executeTimeOut(Optional.ofNullable(executorProperties.getExecuteTimeOut()).orElse(0L)) - .poolThreadSize(executorProperties.getCorePoolSize(), executorProperties.getMaximumPoolSize()) - .keepAliveTime(executorProperties.getKeepAliveTime(), TimeUnit.SECONDS) - .rejected(RejectedPolicyTypeEnum.createPolicy(executorProperties.getRejectedHandler())) - .allowCoreThreadTimeOut(executorProperties.getAllowCoreThreadTimeOut()) - .build(); - } catch (Exception ex) { - log.error("Failed to initialize thread pool configuration. error: {}", ex); - } finally { - if (Objects.isNull(dynamicThreadPoolWrapper.getExecutor())) { - dynamicThreadPoolWrapper.setExecutor(CommonDynamicThreadPool.getInstance(threadPoolId)); - } - dynamicThreadPoolWrapper.setInitFlag(Boolean.TRUE); + .orElseThrow(() -> new RuntimeException("The thread pool id does not exist in the configuration.")); + try { + newDynamicPoolExecutor = buildNewDynamicThreadPool(executorProperties); + } catch (Exception ex) { + log.error("Failed to initialize thread pool configuration. error: {}", ex); + } finally { + if (Objects.isNull(dynamicThreadPoolWrapper.getExecutor())) { + dynamicThreadPoolWrapper.setExecutor(CommonDynamicThreadPool.getInstance(threadPoolId)); } + dynamicThreadPoolWrapper.setInitFlag(Boolean.TRUE); } - if (dynamicThreadPoolWrapper.getExecutor() instanceof AbstractDynamicExecutorSupport) { - DynamicThreadPoolNotifyProperties notify = Optional.ofNullable(executorProperties).map(ExecutorProperties::getNotify).orElse(null); - boolean isAlarm = Optional.ofNullable(executorProperties.getAlarm()) - .orElseGet(() -> bootstrapConfigProperties.getAlarm() != null ? bootstrapConfigProperties.getAlarm() : true); - int activeAlarm = Optional.ofNullable(executorProperties.getActiveAlarm()) - .orElseGet(() -> bootstrapConfigProperties.getActiveAlarm() != null ? bootstrapConfigProperties.getActiveAlarm() : 80); - int capacityAlarm = Optional.ofNullable(executorProperties.getCapacityAlarm()) - .orElseGet(() -> bootstrapConfigProperties.getCapacityAlarm() != null ? bootstrapConfigProperties.getCapacityAlarm() : 80); - int interval = Optional.ofNullable(notify) - .map(each -> each.getInterval()).orElseGet(() -> bootstrapConfigProperties.getAlarmInterval() != null ? bootstrapConfigProperties.getAlarmInterval() : 5); - String receive = Optional.ofNullable(notify) - .map(each -> each.getReceives()).orElseGet(() -> StringUtil.isNotEmpty(bootstrapConfigProperties.getReceives()) ? bootstrapConfigProperties.getReceives() : ""); - ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(isAlarm, activeAlarm, capacityAlarm); - threadPoolNotifyAlarm.setInterval(interval); - threadPoolNotifyAlarm.setReceives(receive); - GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm); - TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrapper.getExecutor()).getTaskDecorator(); - ((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskDecorator(taskDecorator); - long awaitTerminationMillis = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrapper.getExecutor()).awaitTerminationMillis; - boolean waitForTasksToCompleteOnShutdown = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrapper.getExecutor()).waitForTasksToCompleteOnShutdown; - ((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown); - } + ThreadPoolNotifyAlarm threadPoolNotifyAlarm = buildThreadPoolNotifyAlarm(executorProperties); + GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm); + DynamicThreadPoolExecutor actualDynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) dynamicThreadPoolWrapper.getExecutor(); + TaskDecorator taskDecorator = actualDynamicThreadPoolExecutor.getTaskDecorator(); + ((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskDecorator(taskDecorator); + long awaitTerminationMillis = actualDynamicThreadPoolExecutor.awaitTerminationMillis; + boolean waitForTasksToCompleteOnShutdown = actualDynamicThreadPoolExecutor.waitForTasksToCompleteOnShutdown; + ((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown); dynamicThreadPoolWrapper.setExecutor(newDynamicPoolExecutor); } GlobalThreadPoolManage.registerPool(dynamicThreadPoolWrapper.getThreadPoolId(), dynamicThreadPoolWrapper); @@ -168,17 +133,14 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { threadPoolId, executorProperties == null ? buildExecutorProperties(threadPoolId, newDynamicPoolExecutor) - : executorProperties); + : buildActualExecutorProperties(executorProperties)); return newDynamicPoolExecutor; } - /** - * Build executor properties. - * - * @param threadPoolId - * @param executor - * @return - */ + private ExecutorProperties buildActualExecutorProperties(ExecutorProperties executorProperties) { + return Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> buildExecutorProperties(executorProperties)).orElse(executorProperties); + } + private ExecutorProperties buildExecutorProperties(String threadPoolId, ThreadPoolExecutor executor) { ExecutorProperties executorProperties = new ExecutorProperties(); BlockingQueue blockingQueue = executor.getQueue(); @@ -197,4 +159,64 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { .setThreadPoolId(threadPoolId); return executorProperties; } + + private ThreadPoolExecutor buildNewDynamicThreadPool(ExecutorProperties executorProperties) { + String threadNamePrefix = executorProperties.getThreadNamePrefix(); + ExecutorProperties newExecutorProperties = buildExecutorProperties(executorProperties); + ThreadPoolExecutor newDynamicPoolExecutor = ThreadPoolBuilder.builder() + .dynamicPool() + .poolThreadSize(newExecutorProperties.getCorePoolSize(), newExecutorProperties.getMaximumPoolSize()) + .workQueue(BlockingQueueTypeEnum.createBlockingQueue(newExecutorProperties.getBlockingQueue(), newExecutorProperties.getQueueCapacity())) + .threadFactory(StringUtil.isNotBlank(threadNamePrefix) ? threadNamePrefix : executorProperties.getThreadPoolId()) + .executeTimeOut(newExecutorProperties.getExecuteTimeOut()) + .keepAliveTime(newExecutorProperties.getKeepAliveTime(), TimeUnit.SECONDS) + .rejected(RejectedPolicyTypeEnum.createPolicy(newExecutorProperties.getRejectedHandler())) + .allowCoreThreadTimeOut(newExecutorProperties.getAllowCoreThreadTimeOut()) + .build(); + return newDynamicPoolExecutor; + } + + private ExecutorProperties buildExecutorProperties(ExecutorProperties executorProperties) { + ExecutorProperties newExecutorProperties = ExecutorProperties.builder() + .corePoolSize(Optional.ofNullable(executorProperties.getCorePoolSize()) + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getCorePoolSize()).get())) + .maximumPoolSize(Optional.ofNullable(executorProperties.getMaximumPoolSize()) + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getMaximumPoolSize()).get())) + .allowCoreThreadTimeOut(Optional.ofNullable(executorProperties.getAllowCoreThreadTimeOut()) + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getAllowCoreThreadTimeOut()).get())) + .keepAliveTime(Optional.ofNullable(executorProperties.getKeepAliveTime()) + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getKeepAliveTime()).get())) + .blockingQueue(Optional.ofNullable(executorProperties.getBlockingQueue()) + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getBlockingQueue()).get())) + .executeTimeOut(Optional.ofNullable(executorProperties.getExecuteTimeOut()) + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getExecuteTimeOut()).orElse(0L))) + .queueCapacity(Optional.ofNullable(executorProperties.getQueueCapacity()) + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getQueueCapacity()).get())) + .rejectedHandler(Optional.ofNullable(executorProperties.getRejectedHandler()) + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getRejectedHandler()).get())) + .threadNamePrefix(StringUtil.isBlank(executorProperties.getThreadNamePrefix()) ? executorProperties.getThreadPoolId() : executorProperties.getThreadNamePrefix()) + .threadPoolId(executorProperties.getThreadPoolId()) + .build(); + return newExecutorProperties; + } + + private ThreadPoolNotifyAlarm buildThreadPoolNotifyAlarm(ExecutorProperties executorProperties) { + DynamicThreadPoolNotifyProperties notify = Optional.ofNullable(executorProperties).map(ExecutorProperties::getNotify).orElse(null); + boolean isAlarm = Optional.ofNullable(executorProperties.getAlarm()) + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getAlarm()).orElse(true)); + int activeAlarm = Optional.ofNullable(executorProperties.getActiveAlarm()) + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getActiveAlarm()).orElse(80)); + int capacityAlarm = Optional.ofNullable(executorProperties.getCapacityAlarm()) + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getCapacityAlarm()).orElse(80)); + int interval = Optional.ofNullable(notify) + .map(each -> each.getInterval()) + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getNotify()).map(each -> each.getInterval()).orElse(5)); + String receive = Optional.ofNullable(notify) + .map(each -> each.getReceives()) + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getNotify()).map(each -> each.getReceives()).orElse("")); + ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(isAlarm, activeAlarm, capacityAlarm); + threadPoolNotifyAlarm.setInterval(interval); + threadPoolNotifyAlarm.setReceives(receive); + return threadPoolNotifyAlarm; + } }