When defining a dynamic thread pool, abstract the default configuration (#572)

pull/602/head
chen.ma 2 years ago
parent 29a47565b2
commit 7459495757

@ -107,43 +107,23 @@ public class BootstrapConfigProperties implements BootstrapPropertiesInterface {
*/ */
private List<NotifyPlatformProperties> notifyPlatforms; private List<NotifyPlatformProperties> notifyPlatforms;
/**
* Whether to enable thread pool running alarm.
*/
private Boolean alarm = Boolean.TRUE;
/** /**
* Check thread pool running status interval. * Check thread pool running status interval.
*/ */
private Integer checkStateInterval; private Integer checkStateInterval;
/** /**
* Active alarm. * Default dynamic thread pool configuration.
*/
private Integer activeAlarm;
/**
* Capacity alarm.
*/
private Integer capacityAlarm;
/**
* Thread pool run alarm interval. unit: s
*/
private Integer alarmInterval;
/**
* Receives.
*/ */
private String receives; private ExecutorProperties defaultExecutor;
/** /**
* Executors. * Dynamic thread pool configuration collection.
*/ */
private List<ExecutorProperties> executors; private List<ExecutorProperties> executors;
/** /**
* Adapter executors * Tripartite framework thread pool adaptation set.
*/ */
private List<AdapterExecutorProperties> adapterExecutors; private List<AdapterExecutorProperties> adapterExecutors;
} }

@ -30,7 +30,7 @@ import lombok.NoArgsConstructor;
public class DynamicThreadPoolNotifyProperties { public class DynamicThreadPoolNotifyProperties {
/** /**
* Interval * Thread pool run alarm interval. unit: s
*/ */
private Integer interval; private Integer interval;

@ -45,13 +45,13 @@ public class CoreNotifyConfigBuilder implements NotifyConfigBuilder {
private final AlarmControlHandler alarmControlHandler; private final AlarmControlHandler alarmControlHandler;
private final BootstrapConfigProperties bootstrapConfigProperties; private final BootstrapConfigProperties configProperties;
@Override @Override
public Map<String, List<NotifyConfigDTO>> buildNotify() { public Map<String, List<NotifyConfigDTO>> buildNotify() {
Map<String, List<NotifyConfigDTO>> resultMap = Maps.newHashMap(); Map<String, List<NotifyConfigDTO>> resultMap = Maps.newHashMap();
boolean globalAlarm = bootstrapConfigProperties.getAlarm(); boolean globalAlarm = Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getAlarm()).orElse(true);
List<ExecutorProperties> executors = bootstrapConfigProperties.getExecutors(); List<ExecutorProperties> executors = configProperties.getExecutors();
if (CollectionUtil.isEmpty(executors)) { if (CollectionUtil.isEmpty(executors)) {
log.warn("Failed to build notify, executors configuration is empty."); log.warn("Failed to build notify, executors configuration is empty.");
return resultMap; return resultMap;
@ -60,8 +60,8 @@ public class CoreNotifyConfigBuilder implements NotifyConfigBuilder {
if (!globalAlarm && CollectionUtil.isEmpty(actual)) { if (!globalAlarm && CollectionUtil.isEmpty(actual)) {
return resultMap; return resultMap;
} }
for (ExecutorProperties executor : executors) { for (ExecutorProperties executorProperties : executors) {
Map<String, List<NotifyConfigDTO>> buildSingleNotifyConfig = buildSingleNotifyConfig(executor); Map<String, List<NotifyConfigDTO>> buildSingleNotifyConfig = buildSingleNotifyConfig(executorProperties);
initCacheAndLock(buildSingleNotifyConfig); initCacheAndLock(buildSingleNotifyConfig);
resultMap.putAll(buildSingleNotifyConfig); resultMap.putAll(buildSingleNotifyConfig);
} }
@ -71,15 +71,15 @@ public class CoreNotifyConfigBuilder implements NotifyConfigBuilder {
/** /**
* Build single notify config. * Build single notify config.
* *
* @param executor * @param executorProperties
* @return * @return
*/ */
public Map<String, List<NotifyConfigDTO>> buildSingleNotifyConfig(ExecutorProperties executor) { public Map<String, List<NotifyConfigDTO>> buildSingleNotifyConfig(ExecutorProperties executorProperties) {
Map<String, List<NotifyConfigDTO>> resultMap = Maps.newHashMap(); Map<String, List<NotifyConfigDTO>> resultMap = Maps.newHashMap();
String threadPoolId = executor.getThreadPoolId(); String threadPoolId = executorProperties.getThreadPoolId();
String alarmBuildKey = threadPoolId + "+ALARM"; String alarmBuildKey = threadPoolId + "+ALARM";
List<NotifyConfigDTO> alarmNotifyConfigs = Lists.newArrayList(); List<NotifyConfigDTO> alarmNotifyConfigs = Lists.newArrayList();
List<NotifyPlatformProperties> notifyPlatforms = bootstrapConfigProperties.getNotifyPlatforms(); List<NotifyPlatformProperties> notifyPlatforms = configProperties.getNotifyPlatforms();
for (NotifyPlatformProperties platformProperties : notifyPlatforms) { for (NotifyPlatformProperties platformProperties : notifyPlatforms) {
NotifyConfigDTO notifyConfig = new NotifyConfigDTO(); NotifyConfigDTO notifyConfig = new NotifyConfigDTO();
notifyConfig.setPlatform(platformProperties.getPlatform()); notifyConfig.setPlatform(platformProperties.getPlatform());
@ -87,11 +87,11 @@ public class CoreNotifyConfigBuilder implements NotifyConfigBuilder {
notifyConfig.setType("ALARM"); notifyConfig.setType("ALARM");
notifyConfig.setSecret(platformProperties.getSecret()); notifyConfig.setSecret(platformProperties.getSecret());
notifyConfig.setSecretKey(getToken(platformProperties)); notifyConfig.setSecretKey(getToken(platformProperties));
int interval = Optional.ofNullable(executor.getNotify()) int interval = Optional.ofNullable(executorProperties.getNotify())
.map(each -> each.getInterval()) .map(each -> each.getInterval())
.orElseGet(() -> bootstrapConfigProperties.getAlarmInterval() != null ? bootstrapConfigProperties.getAlarmInterval() : 5); .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getNotify()).map(each -> each.getInterval()).orElse(5));
notifyConfig.setInterval(interval); notifyConfig.setInterval(interval);
notifyConfig.setReceives(buildReceive(executor, platformProperties)); notifyConfig.setReceives(buildReceive(executorProperties));
alarmNotifyConfigs.add(notifyConfig); alarmNotifyConfigs.add(notifyConfig);
} }
resultMap.put(alarmBuildKey, alarmNotifyConfigs); resultMap.put(alarmBuildKey, alarmNotifyConfigs);
@ -104,7 +104,7 @@ public class CoreNotifyConfigBuilder implements NotifyConfigBuilder {
notifyConfig.setType("CONFIG"); notifyConfig.setType("CONFIG");
notifyConfig.setSecretKey(getToken(platformProperties)); notifyConfig.setSecretKey(getToken(platformProperties));
notifyConfig.setSecret(platformProperties.getSecret()); notifyConfig.setSecret(platformProperties.getSecret());
notifyConfig.setReceives(buildReceive(executor, platformProperties)); notifyConfig.setReceives(buildReceive(executorProperties));
changeNotifyConfigs.add(notifyConfig); changeNotifyConfigs.add(notifyConfig);
} }
resultMap.put(changeBuildKey, changeNotifyConfigs); resultMap.put(changeBuildKey, changeNotifyConfigs);
@ -118,17 +118,12 @@ public class CoreNotifyConfigBuilder implements NotifyConfigBuilder {
.forEach(each -> alarmControlHandler.initCacheAndLock(each.getTpId(), each.getPlatform(), each.getInterval()))); .forEach(each -> alarmControlHandler.initCacheAndLock(each.getTpId(), each.getPlatform(), each.getInterval())));
} }
private String buildReceive(ExecutorProperties executor, NotifyPlatformProperties platformProperties) { private String buildReceive(ExecutorProperties executorProperties) {
String receive; String receives = Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getNotify()).map(each -> each.getReceives()).orElse("");
if (executor.getNotify() != null) { if (executorProperties.getNotify() != null && StringUtil.isNotEmpty(executorProperties.getNotify().getReceives())) {
receive = executor.getNotify().getReceives(); receives = executorProperties.getNotify().getReceives();
if (StrUtil.isBlank(receive)) {
receive = bootstrapConfigProperties.getReceives();
} }
} else { return receives;
receive = bootstrapConfigProperties.getReceives();
}
return receive;
} }
private String getToken(NotifyPlatformProperties platformProperties) { private String getToken(NotifyPlatformProperties platformProperties) {

@ -17,15 +17,15 @@
package cn.hippo4j.core.springboot.starter.refresher.event; package cn.hippo4j.core.springboot.starter.refresher.event;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue;
import cn.hippo4j.common.toolkit.CollectionUtil; import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage; import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport; import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue;
import cn.hippo4j.core.proxy.RejectedProxyUtil; import cn.hippo4j.core.proxy.RejectedProxyUtil;
import cn.hippo4j.core.springboot.starter.config.BootstrapConfigProperties; import cn.hippo4j.core.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.core.springboot.starter.config.ExecutorProperties; import cn.hippo4j.core.springboot.starter.config.ExecutorProperties;
@ -44,6 +44,7 @@ import org.springframework.core.annotation.Order;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -68,40 +69,52 @@ public class DynamicThreadPoolRefreshListener implements ApplicationListener<Hip
private final Hippo4jBaseSendMessageService hippo4jBaseSendMessageService; private final Hippo4jBaseSendMessageService hippo4jBaseSendMessageService;
@Override @Override
public void onApplicationEvent(Hippo4jConfigDynamicRefreshEvent threadPoolDynamicRefreshEvent) { public void onApplicationEvent(Hippo4jConfigDynamicRefreshEvent event) {
BootstrapConfigProperties bindableConfigProperties = threadPoolDynamicRefreshEvent.getBootstrapConfigProperties(); BootstrapConfigProperties bindableConfigProperties = event.getBootstrapConfigProperties();
List<ExecutorProperties> executors = bindableConfigProperties.getExecutors(); List<ExecutorProperties> executors = bindableConfigProperties.getExecutors();
for (ExecutorProperties properties : executors) { for (ExecutorProperties properties : executors) {
String threadPoolId = properties.getThreadPoolId(); String threadPoolId = properties.getThreadPoolId();
// Check whether the notification configuration is consistent. /**
// this operation will not trigger the notification. * Check whether the notification configuration is consistent, this operation will not trigger the notification.
*/
checkNotifyConsistencyAndReplace(properties); checkNotifyConsistencyAndReplace(properties);
if (!checkConsistency(threadPoolId, properties)) { if (!checkConsistency(threadPoolId, properties)) {
continue; continue;
} }
// refresh executor pool.
dynamicRefreshPool(threadPoolId, properties); dynamicRefreshPool(threadPoolId, properties);
// old properties.
ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId()); ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId());
// refresh executor properties. GlobalCoreThreadPoolManage.refresh(threadPoolId, failDefaultExecutorProperties(beforeProperties, properties));
GlobalCoreThreadPoolManage.refresh(threadPoolId, properties); ChangeParameterNotifyRequest changeRequest = buildChangeRequest(beforeProperties, properties);
log.info(CHANGE_THREAD_POOL_TEXT, log.info(CHANGE_THREAD_POOL_TEXT,
threadPoolId, threadPoolId,
String.format(CHANGE_DELIMITER, beforeProperties.getCorePoolSize(), properties.getCorePoolSize()), String.format(CHANGE_DELIMITER, beforeProperties.getCorePoolSize(), changeRequest.getNowCorePoolSize()),
String.format(CHANGE_DELIMITER, beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize()), String.format(CHANGE_DELIMITER, beforeProperties.getMaximumPoolSize(), changeRequest.getNowMaximumPoolSize()),
String.format(CHANGE_DELIMITER, beforeProperties.getQueueCapacity(), properties.getQueueCapacity()), String.format(CHANGE_DELIMITER, beforeProperties.getQueueCapacity(), changeRequest.getNowQueueCapacity()),
String.format(CHANGE_DELIMITER, beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime()), String.format(CHANGE_DELIMITER, beforeProperties.getKeepAliveTime(), changeRequest.getNowKeepAliveTime()),
String.format(CHANGE_DELIMITER, beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut()), String.format(CHANGE_DELIMITER, beforeProperties.getExecuteTimeOut(), changeRequest.getNowExecuteTimeOut()),
String.format(CHANGE_DELIMITER, beforeProperties.getRejectedHandler(), properties.getRejectedHandler()), String.format(CHANGE_DELIMITER, beforeProperties.getRejectedHandler(), changeRequest.getNowRejectedName()),
String.format(CHANGE_DELIMITER, beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut())); String.format(CHANGE_DELIMITER, beforeProperties.getAllowCoreThreadTimeOut(), changeRequest.getNowAllowsCoreThreadTimeOut()));
try { try {
threadPoolNotifyAlarmHandler.sendPoolConfigChange(newChangeRequest(beforeProperties, properties)); threadPoolNotifyAlarmHandler.sendPoolConfigChange(changeRequest);
} catch (Throwable ex) { } catch (Throwable ex) {
log.error("Failed to send Chang smart application listener notice. Message: {}", ex.getMessage()); log.error("Failed to send Chang smart application listener notice. Message: {}", ex.getMessage());
} }
} }
} }
private ExecutorProperties failDefaultExecutorProperties(ExecutorProperties beforeProperties, ExecutorProperties properties) {
return ExecutorProperties.builder()
.corePoolSize(Optional.ofNullable(properties.getCorePoolSize()).orElse(beforeProperties.getCorePoolSize()))
.maximumPoolSize(Optional.ofNullable(properties.getMaximumPoolSize()).orElse(beforeProperties.getMaximumPoolSize()))
.queueCapacity(Optional.ofNullable(properties.getQueueCapacity()).orElse(beforeProperties.getQueueCapacity()))
.keepAliveTime(Optional.ofNullable(properties.getKeepAliveTime()).orElse(beforeProperties.getKeepAliveTime()))
.executeTimeOut(Optional.ofNullable(properties.getExecuteTimeOut()).orElse(beforeProperties.getExecuteTimeOut()))
.rejectedHandler(Optional.ofNullable(properties.getRejectedHandler()).orElse(beforeProperties.getRejectedHandler()))
.allowCoreThreadTimeOut(Optional.ofNullable(properties.getAllowCoreThreadTimeOut()).orElse(beforeProperties.getAllowCoreThreadTimeOut()))
.threadPoolId(beforeProperties.getThreadPoolId())
.build();
}
/** /**
* Construct change parameter notify request instance. * Construct change parameter notify request instance.
* *
@ -109,23 +122,23 @@ public class DynamicThreadPoolRefreshListener implements ApplicationListener<Hip
* @param properties new properties * @param properties new properties
* @return instance * @return instance
*/ */
private ChangeParameterNotifyRequest newChangeRequest(ExecutorProperties beforeProperties, ExecutorProperties properties) { private ChangeParameterNotifyRequest buildChangeRequest(ExecutorProperties beforeProperties, ExecutorProperties properties) {
ChangeParameterNotifyRequest changeParameterNotifyRequest = ChangeParameterNotifyRequest.builder() ChangeParameterNotifyRequest changeParameterNotifyRequest = ChangeParameterNotifyRequest.builder()
.blockingQueueName(beforeProperties.getBlockingQueue())
.beforeCorePoolSize(beforeProperties.getCorePoolSize()) .beforeCorePoolSize(beforeProperties.getCorePoolSize())
.beforeMaximumPoolSize(beforeProperties.getMaximumPoolSize()) .beforeMaximumPoolSize(beforeProperties.getMaximumPoolSize())
.beforeAllowsCoreThreadTimeOut(beforeProperties.getAllowCoreThreadTimeOut()) .beforeAllowsCoreThreadTimeOut(beforeProperties.getAllowCoreThreadTimeOut())
.beforeKeepAliveTime(beforeProperties.getKeepAliveTime()) .beforeKeepAliveTime(beforeProperties.getKeepAliveTime())
.blockingQueueName(beforeProperties.getBlockingQueue())
.beforeQueueCapacity(beforeProperties.getQueueCapacity()) .beforeQueueCapacity(beforeProperties.getQueueCapacity())
.beforeRejectedName(beforeProperties.getRejectedHandler()) .beforeRejectedName(beforeProperties.getRejectedHandler())
.beforeExecuteTimeOut(beforeProperties.getExecuteTimeOut()) .beforeExecuteTimeOut(beforeProperties.getExecuteTimeOut())
.nowCorePoolSize(properties.getCorePoolSize()) .nowCorePoolSize(Optional.ofNullable(properties.getCorePoolSize()).orElse(beforeProperties.getCorePoolSize()))
.nowMaximumPoolSize(properties.getMaximumPoolSize()) .nowMaximumPoolSize(Optional.ofNullable(properties.getMaximumPoolSize()).orElse(beforeProperties.getMaximumPoolSize()))
.nowAllowsCoreThreadTimeOut(properties.getAllowCoreThreadTimeOut()) .nowAllowsCoreThreadTimeOut(Optional.ofNullable(properties.getAllowCoreThreadTimeOut()).orElse(beforeProperties.getAllowCoreThreadTimeOut()))
.nowKeepAliveTime(properties.getKeepAliveTime()) .nowKeepAliveTime(Optional.ofNullable(properties.getKeepAliveTime()).orElse(beforeProperties.getKeepAliveTime()))
.nowQueueCapacity(properties.getQueueCapacity()) .nowQueueCapacity(Optional.ofNullable(properties.getQueueCapacity()).orElse(beforeProperties.getQueueCapacity()))
.nowRejectedName(properties.getRejectedHandler()) .nowRejectedName(Optional.ofNullable(properties.getRejectedHandler()).orElse(beforeProperties.getRejectedHandler()))
.nowExecuteTimeOut(properties.getExecuteTimeOut()) .nowExecuteTimeOut(Optional.ofNullable(properties.getExecuteTimeOut()).orElse(beforeProperties.getExecuteTimeOut()))
.build(); .build();
changeParameterNotifyRequest.setThreadPoolId(beforeProperties.getThreadPoolId()); changeParameterNotifyRequest.setThreadPoolId(beforeProperties.getThreadPoolId());
return changeParameterNotifyRequest; return changeParameterNotifyRequest;
@ -134,13 +147,13 @@ public class DynamicThreadPoolRefreshListener implements ApplicationListener<Hip
/** /**
* Check notify consistency and replace. * Check notify consistency and replace.
* *
* @param properties * @param executorProperties
*/ */
private void checkNotifyConsistencyAndReplace(ExecutorProperties properties) { private void checkNotifyConsistencyAndReplace(ExecutorProperties executorProperties) {
boolean checkNotifyConfig = false; boolean checkNotifyConfig = false;
boolean checkNotifyAlarm = false; boolean checkNotifyAlarm = false;
List<String> changeKeys = Lists.newArrayList(); List<String> changeKeys = Lists.newArrayList();
Map<String, List<NotifyConfigDTO>> newDynamicThreadPoolNotifyMap = coreNotifyConfigBuilder.buildSingleNotifyConfig(properties); Map<String, List<NotifyConfigDTO>> newDynamicThreadPoolNotifyMap = coreNotifyConfigBuilder.buildSingleNotifyConfig(executorProperties);
Map<String, List<NotifyConfigDTO>> notifyConfigs = hippo4jBaseSendMessageService.getNotifyConfigs(); Map<String, List<NotifyConfigDTO>> notifyConfigs = hippo4jBaseSendMessageService.getNotifyConfigs();
if (CollectionUtil.isNotEmpty(notifyConfigs)) { if (CollectionUtil.isNotEmpty(notifyConfigs)) {
for (Map.Entry<String, List<NotifyConfigDTO>> each : newDynamicThreadPoolNotifyMap.entrySet()) { for (Map.Entry<String, List<NotifyConfigDTO>> each : newDynamicThreadPoolNotifyMap.entrySet()) {
@ -161,22 +174,22 @@ public class DynamicThreadPoolRefreshListener implements ApplicationListener<Hip
coreNotifyConfigBuilder.initCacheAndLock(newDynamicThreadPoolNotifyMap); coreNotifyConfigBuilder.initCacheAndLock(newDynamicThreadPoolNotifyMap);
hippo4jBaseSendMessageService.putPlatform(newDynamicThreadPoolNotifyMap); hippo4jBaseSendMessageService.putPlatform(newDynamicThreadPoolNotifyMap);
} }
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(properties.getThreadPoolId()); ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(executorProperties.getThreadPoolId());
if (threadPoolNotifyAlarm != null) { if (threadPoolNotifyAlarm != null) {
boolean isAlarm = properties.getAlarm(); Boolean isAlarm = executorProperties.getAlarm();
Integer activeAlarm = properties.getActiveAlarm(); Integer activeAlarm = executorProperties.getActiveAlarm();
Integer capacityAlarm = properties.getCapacityAlarm(); Integer capacityAlarm = executorProperties.getCapacityAlarm();
if (threadPoolNotifyAlarm.getAlarm() != isAlarm if ((isAlarm != null && isAlarm != threadPoolNotifyAlarm.getAlarm())
|| threadPoolNotifyAlarm.getActiveAlarm() != activeAlarm || (activeAlarm != null && activeAlarm != threadPoolNotifyAlarm.getActiveAlarm())
|| threadPoolNotifyAlarm.getCapacityAlarm() != capacityAlarm) { || (capacityAlarm != null && capacityAlarm != threadPoolNotifyAlarm.getCapacityAlarm())) {
checkNotifyAlarm = true; checkNotifyAlarm = true;
threadPoolNotifyAlarm.setAlarm(isAlarm); threadPoolNotifyAlarm.setAlarm(Optional.ofNullable(isAlarm).orElse(threadPoolNotifyAlarm.getAlarm()));
threadPoolNotifyAlarm.setActiveAlarm(activeAlarm); threadPoolNotifyAlarm.setActiveAlarm(Optional.ofNullable(activeAlarm).orElse(threadPoolNotifyAlarm.getActiveAlarm()));
threadPoolNotifyAlarm.setCapacityAlarm(capacityAlarm); threadPoolNotifyAlarm.setCapacityAlarm(Optional.ofNullable(capacityAlarm).orElse(threadPoolNotifyAlarm.getCapacityAlarm()));
} }
} }
if (checkNotifyConfig || checkNotifyAlarm) { if (checkNotifyConfig || checkNotifyAlarm) {
log.info("[{}] Dynamic thread pool notification property changes.", properties.getThreadPoolId()); log.info("[{}] Dynamic thread pool notification property changes.", executorProperties.getThreadPoolId());
} }
} }
@ -192,15 +205,15 @@ public class DynamicThreadPoolRefreshListener implements ApplicationListener<Hip
if (executor == null) { if (executor == null) {
return false; return false;
} }
boolean result = !Objects.equals(beforeProperties.getCorePoolSize(), properties.getCorePoolSize()) boolean result = (properties.getCorePoolSize() != null && !Objects.equals(beforeProperties.getCorePoolSize(), properties.getCorePoolSize()))
|| !Objects.equals(beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize()) || (properties.getMaximumPoolSize() != null && !Objects.equals(beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize()))
|| !Objects.equals(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut()) || (properties.getAllowCoreThreadTimeOut() != null && !Objects.equals(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut()))
|| !Objects.equals(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut()) || (properties.getExecuteTimeOut() != null && !Objects.equals(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut()))
|| !Objects.equals(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime()) || (properties.getKeepAliveTime() != null && !Objects.equals(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime()))
|| !Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler()) || (properties.getRejectedHandler() != null && !Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler()))
|| ||
(!Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity()) ((properties.getQueueCapacity() != null && !Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity())
&& Objects.equals(BlockingQueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.getName(), executor.getQueue().getClass().getSimpleName())); && Objects.equals(BlockingQueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.getName(), executor.getQueue().getClass().getSimpleName())));
return result; return result;
} }
@ -229,15 +242,15 @@ public class DynamicThreadPoolRefreshListener implements ApplicationListener<Hip
executor.setCorePoolSize(properties.getCorePoolSize()); executor.setCorePoolSize(properties.getCorePoolSize());
} }
} }
if (!Objects.equals(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut())) { if (properties.getAllowCoreThreadTimeOut() != null && !Objects.equals(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut())) {
executor.allowCoreThreadTimeOut(properties.getAllowCoreThreadTimeOut()); executor.allowCoreThreadTimeOut(properties.getAllowCoreThreadTimeOut());
} }
if (!Objects.equals(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut())) { if (properties.getExecuteTimeOut() != null && !Objects.equals(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut())) {
if (executor instanceof AbstractDynamicExecutorSupport) { if (executor instanceof AbstractDynamicExecutorSupport) {
((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(properties.getExecuteTimeOut()); ((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(properties.getExecuteTimeOut());
} }
} }
if (!Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler())) { if (properties.getRejectedHandler() != null && !Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler())) {
RejectedExecutionHandler rejectedExecutionHandler = RejectedPolicyTypeEnum.createPolicy(properties.getRejectedHandler()); RejectedExecutionHandler rejectedExecutionHandler = RejectedPolicyTypeEnum.createPolicy(properties.getRejectedHandler());
if (executor instanceof AbstractDynamicExecutorSupport) { if (executor instanceof AbstractDynamicExecutorSupport) {
DynamicThreadPoolExecutor dynamicExecutor = (DynamicThreadPoolExecutor) executor; DynamicThreadPoolExecutor dynamicExecutor = (DynamicThreadPoolExecutor) executor;
@ -247,10 +260,10 @@ public class DynamicThreadPoolRefreshListener implements ApplicationListener<Hip
} }
executor.setRejectedExecutionHandler(rejectedExecutionHandler); executor.setRejectedExecutionHandler(rejectedExecutionHandler);
} }
if (!Objects.equals(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime())) { if (properties.getKeepAliveTime() != null && !Objects.equals(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime())) {
executor.setKeepAliveTime(properties.getKeepAliveTime(), TimeUnit.SECONDS); executor.setKeepAliveTime(properties.getKeepAliveTime(), TimeUnit.SECONDS);
} }
if (!Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity()) if (properties.getQueueCapacity() != null && !Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity())
&& Objects.equals(BlockingQueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.getName(), executor.getQueue().getClass().getSimpleName())) { && Objects.equals(BlockingQueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.getName(), executor.getQueue().getClass().getSimpleName())) {
if (executor.getQueue() instanceof ResizableCapacityLinkedBlockingQueue) { if (executor.getQueue() instanceof ResizableCapacityLinkedBlockingQueue) {
ResizableCapacityLinkedBlockingQueue<?> queue = (ResizableCapacityLinkedBlockingQueue<?>) executor.getQueue(); ResizableCapacityLinkedBlockingQueue<?> queue = (ResizableCapacityLinkedBlockingQueue<?>) executor.getQueue();

@ -26,7 +26,8 @@ import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage; import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.*; import cn.hippo4j.core.executor.support.CommonDynamicThreadPool;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapterChoose; import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapterChoose;
import cn.hippo4j.core.springboot.starter.config.BootstrapConfigProperties; import cn.hippo4j.core.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.core.springboot.starter.config.DynamicThreadPoolNotifyProperties; import cn.hippo4j.core.springboot.starter.config.DynamicThreadPoolNotifyProperties;
@ -52,7 +53,7 @@ import java.util.concurrent.TimeUnit;
@AllArgsConstructor @AllArgsConstructor
public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
private final BootstrapConfigProperties bootstrapConfigProperties; private final BootstrapConfigProperties configProperties;
@Override @Override
public Object postProcessBeforeInitialization(Object bean, String beanName) { public Object postProcessBeforeInitialization(Object bean, String beanName) {
@ -87,20 +88,11 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
} }
if (bean instanceof DynamicThreadPoolWrapper) { if (bean instanceof DynamicThreadPoolWrapper) {
DynamicThreadPoolWrapper wrap = (DynamicThreadPoolWrapper) bean; DynamicThreadPoolWrapper wrap = (DynamicThreadPoolWrapper) bean;
registerAndSubscribe(wrap); fillPoolAndRegister(wrap);
} }
return bean; return bean;
} }
/**
* Register and subscribe.
*
* @param dynamicThreadPoolWrap
*/
protected void registerAndSubscribe(DynamicThreadPoolWrapper dynamicThreadPoolWrap) {
fillPoolAndRegister(dynamicThreadPoolWrap);
}
/** /**
* Fill the thread pool and register. * Fill the thread pool and register.
* *
@ -110,26 +102,14 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
String threadPoolId = dynamicThreadPoolWrapper.getThreadPoolId(); String threadPoolId = dynamicThreadPoolWrapper.getThreadPoolId();
ThreadPoolExecutor newDynamicPoolExecutor = dynamicThreadPoolWrapper.getExecutor(); ThreadPoolExecutor newDynamicPoolExecutor = dynamicThreadPoolWrapper.getExecutor();
ExecutorProperties executorProperties = null; ExecutorProperties executorProperties = null;
if (null != bootstrapConfigProperties.getExecutors()) { if (configProperties.getExecutors() != null) {
executorProperties = bootstrapConfigProperties.getExecutors() executorProperties = configProperties.getExecutors()
.stream() .stream()
.filter(each -> Objects.equals(threadPoolId, each.getThreadPoolId())) .filter(each -> Objects.equals(threadPoolId, each.getThreadPoolId()))
.findFirst() .findFirst()
.orElse(null); .orElseThrow(() -> new RuntimeException("The thread pool id does not exist in the configuration."));
if (executorProperties != null) {
try { try {
BlockingQueue workQueue = BlockingQueueTypeEnum.createBlockingQueue(executorProperties.getBlockingQueue(), executorProperties.getQueueCapacity()); newDynamicPoolExecutor = buildNewDynamicThreadPool(executorProperties);
String threadNamePrefix = executorProperties.getThreadNamePrefix();
newDynamicPoolExecutor = ThreadPoolBuilder.builder()
.dynamicPool()
.workQueue(workQueue)
.threadFactory(StringUtil.isNotBlank(threadNamePrefix) ? threadNamePrefix : threadPoolId)
.executeTimeOut(Optional.ofNullable(executorProperties.getExecuteTimeOut()).orElse(0L))
.poolThreadSize(executorProperties.getCorePoolSize(), executorProperties.getMaximumPoolSize())
.keepAliveTime(executorProperties.getKeepAliveTime(), TimeUnit.SECONDS)
.rejected(RejectedPolicyTypeEnum.createPolicy(executorProperties.getRejectedHandler()))
.allowCoreThreadTimeOut(executorProperties.getAllowCoreThreadTimeOut())
.build();
} catch (Exception ex) { } catch (Exception ex) {
log.error("Failed to initialize thread pool configuration. error: {}", ex); log.error("Failed to initialize thread pool configuration. error: {}", ex);
} finally { } finally {
@ -138,29 +118,14 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
} }
dynamicThreadPoolWrapper.setInitFlag(Boolean.TRUE); dynamicThreadPoolWrapper.setInitFlag(Boolean.TRUE);
} }
} ThreadPoolNotifyAlarm threadPoolNotifyAlarm = buildThreadPoolNotifyAlarm(executorProperties);
if (dynamicThreadPoolWrapper.getExecutor() instanceof AbstractDynamicExecutorSupport) {
DynamicThreadPoolNotifyProperties notify = Optional.ofNullable(executorProperties).map(ExecutorProperties::getNotify).orElse(null);
boolean isAlarm = Optional.ofNullable(executorProperties.getAlarm())
.orElseGet(() -> bootstrapConfigProperties.getAlarm() != null ? bootstrapConfigProperties.getAlarm() : true);
int activeAlarm = Optional.ofNullable(executorProperties.getActiveAlarm())
.orElseGet(() -> bootstrapConfigProperties.getActiveAlarm() != null ? bootstrapConfigProperties.getActiveAlarm() : 80);
int capacityAlarm = Optional.ofNullable(executorProperties.getCapacityAlarm())
.orElseGet(() -> bootstrapConfigProperties.getCapacityAlarm() != null ? bootstrapConfigProperties.getCapacityAlarm() : 80);
int interval = Optional.ofNullable(notify)
.map(each -> each.getInterval()).orElseGet(() -> bootstrapConfigProperties.getAlarmInterval() != null ? bootstrapConfigProperties.getAlarmInterval() : 5);
String receive = Optional.ofNullable(notify)
.map(each -> each.getReceives()).orElseGet(() -> StringUtil.isNotEmpty(bootstrapConfigProperties.getReceives()) ? bootstrapConfigProperties.getReceives() : "");
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(isAlarm, activeAlarm, capacityAlarm);
threadPoolNotifyAlarm.setInterval(interval);
threadPoolNotifyAlarm.setReceives(receive);
GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm); GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm);
TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrapper.getExecutor()).getTaskDecorator(); DynamicThreadPoolExecutor actualDynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) dynamicThreadPoolWrapper.getExecutor();
TaskDecorator taskDecorator = actualDynamicThreadPoolExecutor.getTaskDecorator();
((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskDecorator(taskDecorator); ((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskDecorator(taskDecorator);
long awaitTerminationMillis = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrapper.getExecutor()).awaitTerminationMillis; long awaitTerminationMillis = actualDynamicThreadPoolExecutor.awaitTerminationMillis;
boolean waitForTasksToCompleteOnShutdown = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrapper.getExecutor()).waitForTasksToCompleteOnShutdown; boolean waitForTasksToCompleteOnShutdown = actualDynamicThreadPoolExecutor.waitForTasksToCompleteOnShutdown;
((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown); ((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown);
}
dynamicThreadPoolWrapper.setExecutor(newDynamicPoolExecutor); dynamicThreadPoolWrapper.setExecutor(newDynamicPoolExecutor);
} }
GlobalThreadPoolManage.registerPool(dynamicThreadPoolWrapper.getThreadPoolId(), dynamicThreadPoolWrapper); GlobalThreadPoolManage.registerPool(dynamicThreadPoolWrapper.getThreadPoolId(), dynamicThreadPoolWrapper);
@ -168,17 +133,14 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
threadPoolId, threadPoolId,
executorProperties == null executorProperties == null
? buildExecutorProperties(threadPoolId, newDynamicPoolExecutor) ? buildExecutorProperties(threadPoolId, newDynamicPoolExecutor)
: executorProperties); : buildActualExecutorProperties(executorProperties));
return newDynamicPoolExecutor; return newDynamicPoolExecutor;
} }
/** private ExecutorProperties buildActualExecutorProperties(ExecutorProperties executorProperties) {
* Build executor properties. return Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> buildExecutorProperties(executorProperties)).orElse(executorProperties);
* }
* @param threadPoolId
* @param executor
* @return
*/
private ExecutorProperties buildExecutorProperties(String threadPoolId, ThreadPoolExecutor executor) { private ExecutorProperties buildExecutorProperties(String threadPoolId, ThreadPoolExecutor executor) {
ExecutorProperties executorProperties = new ExecutorProperties(); ExecutorProperties executorProperties = new ExecutorProperties();
BlockingQueue<Runnable> blockingQueue = executor.getQueue(); BlockingQueue<Runnable> blockingQueue = executor.getQueue();
@ -197,4 +159,64 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
.setThreadPoolId(threadPoolId); .setThreadPoolId(threadPoolId);
return executorProperties; return executorProperties;
} }
private ThreadPoolExecutor buildNewDynamicThreadPool(ExecutorProperties executorProperties) {
String threadNamePrefix = executorProperties.getThreadNamePrefix();
ExecutorProperties newExecutorProperties = buildExecutorProperties(executorProperties);
ThreadPoolExecutor newDynamicPoolExecutor = ThreadPoolBuilder.builder()
.dynamicPool()
.poolThreadSize(newExecutorProperties.getCorePoolSize(), newExecutorProperties.getMaximumPoolSize())
.workQueue(BlockingQueueTypeEnum.createBlockingQueue(newExecutorProperties.getBlockingQueue(), newExecutorProperties.getQueueCapacity()))
.threadFactory(StringUtil.isNotBlank(threadNamePrefix) ? threadNamePrefix : executorProperties.getThreadPoolId())
.executeTimeOut(newExecutorProperties.getExecuteTimeOut())
.keepAliveTime(newExecutorProperties.getKeepAliveTime(), TimeUnit.SECONDS)
.rejected(RejectedPolicyTypeEnum.createPolicy(newExecutorProperties.getRejectedHandler()))
.allowCoreThreadTimeOut(newExecutorProperties.getAllowCoreThreadTimeOut())
.build();
return newDynamicPoolExecutor;
}
private ExecutorProperties buildExecutorProperties(ExecutorProperties executorProperties) {
ExecutorProperties newExecutorProperties = ExecutorProperties.builder()
.corePoolSize(Optional.ofNullable(executorProperties.getCorePoolSize())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getCorePoolSize()).get()))
.maximumPoolSize(Optional.ofNullable(executorProperties.getMaximumPoolSize())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getMaximumPoolSize()).get()))
.allowCoreThreadTimeOut(Optional.ofNullable(executorProperties.getAllowCoreThreadTimeOut())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getAllowCoreThreadTimeOut()).get()))
.keepAliveTime(Optional.ofNullable(executorProperties.getKeepAliveTime())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getKeepAliveTime()).get()))
.blockingQueue(Optional.ofNullable(executorProperties.getBlockingQueue())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getBlockingQueue()).get()))
.executeTimeOut(Optional.ofNullable(executorProperties.getExecuteTimeOut())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getExecuteTimeOut()).orElse(0L)))
.queueCapacity(Optional.ofNullable(executorProperties.getQueueCapacity())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getQueueCapacity()).get()))
.rejectedHandler(Optional.ofNullable(executorProperties.getRejectedHandler())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getRejectedHandler()).get()))
.threadNamePrefix(StringUtil.isBlank(executorProperties.getThreadNamePrefix()) ? executorProperties.getThreadPoolId() : executorProperties.getThreadNamePrefix())
.threadPoolId(executorProperties.getThreadPoolId())
.build();
return newExecutorProperties;
}
private ThreadPoolNotifyAlarm buildThreadPoolNotifyAlarm(ExecutorProperties executorProperties) {
DynamicThreadPoolNotifyProperties notify = Optional.ofNullable(executorProperties).map(ExecutorProperties::getNotify).orElse(null);
boolean isAlarm = Optional.ofNullable(executorProperties.getAlarm())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getAlarm()).orElse(true));
int activeAlarm = Optional.ofNullable(executorProperties.getActiveAlarm())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getActiveAlarm()).orElse(80));
int capacityAlarm = Optional.ofNullable(executorProperties.getCapacityAlarm())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getCapacityAlarm()).orElse(80));
int interval = Optional.ofNullable(notify)
.map(each -> each.getInterval())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getNotify()).map(each -> each.getInterval()).orElse(5));
String receive = Optional.ofNullable(notify)
.map(each -> each.getReceives())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getNotify()).map(each -> each.getReceives()).orElse(""));
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(isAlarm, activeAlarm, capacityAlarm);
threadPoolNotifyAlarm.setInterval(interval);
threadPoolNotifyAlarm.setReceives(receive);
return threadPoolNotifyAlarm;
}
} }

Loading…
Cancel
Save