增强线程池动态刷新能力. (#139)

pull/140/head
chen.ma 2 years ago
parent c2da994e31
commit 3bec271ea1

@ -38,6 +38,11 @@ public class DynamicThreadPoolWrapper implements DisposableBean {
*/
private boolean subscribeFlag;
/**
* Init flag
*/
private boolean initFlag;
/**
* executor
* {@link DynamicThreadPoolExecutor}

@ -3,7 +3,7 @@ package cn.hippo4j.core.starter.config;
import cn.hippo4j.common.api.NotifyConfigBuilder;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.notify.AlarmControlHandler;
import cn.hippo4j.common.notify.BaseSendMessageServiceImpl;
import cn.hippo4j.common.notify.HippoBaseSendMessageService;
import cn.hippo4j.common.notify.HippoSendMessageService;
import cn.hippo4j.common.notify.SendMessageHandler;
import cn.hippo4j.common.notify.platform.DingSendMessageHandler;
@ -67,7 +67,7 @@ public class DynamicThreadPoolCoreAutoConfiguration {
@Bean
public HippoSendMessageService hippoSendMessageService(NotifyConfigBuilder notifyConfigBuilder,
AlarmControlHandler alarmControlHandler) {
return new BaseSendMessageServiceImpl(notifyConfigBuilder, alarmControlHandler);
return new HippoBaseSendMessageService(notifyConfigBuilder, alarmControlHandler);
}
@Bean

@ -2,6 +2,7 @@ package cn.hippo4j.core.starter.config;
import cn.hippo4j.common.notify.ThreadPoolNotifyAlarm;
import lombok.Data;
import lombok.experimental.Accessors;
/**
* Executor properties.
@ -10,6 +11,7 @@ import lombok.Data;
* @date 2022/2/25 00:40
*/
@Data
@Accessors(chain = true)
public class ExecutorProperties {
/**

@ -33,53 +33,67 @@ public class CoreNotifyConfigBuilder implements NotifyConfigBuilder {
List<ExecutorProperties> executors = bootstrapCoreProperties.getExecutors();
for (ExecutorProperties executor : executors) {
String threadPoolId = executor.getThreadPoolId();
String alarmBuildKey = threadPoolId + "+ALARM";
List<NotifyConfigDTO> alarmNotifyConfigs = Lists.newArrayList();
List<NotifyPlatformProperties> notifyPlatforms = bootstrapCoreProperties.getNotifyPlatforms();
for (NotifyPlatformProperties platformProperties : notifyPlatforms) {
NotifyConfigDTO notifyConfig = new NotifyConfigDTO();
notifyConfig.setPlatform(platformProperties.getPlatform());
notifyConfig.setThreadPoolId(threadPoolId);
notifyConfig.setType("ALARM");
notifyConfig.setSecretKey(platformProperties.getSecretKey());
notifyConfig.setInterval(executor.getNotify().getInterval());
Map<String, String> receives = executor.getNotify().getReceives();
String receive = receives.get(platformProperties.getPlatform());
if (StrUtil.isBlank(receive)) {
receive = platformProperties.getReceives();
}
notifyConfig.setReceives(receive);
alarmNotifyConfigs.add(notifyConfig);
}
resultMap.putAll(buildSingleNotifyConfig(executor));
}
return resultMap;
}
resultMap.put(alarmBuildKey, alarmNotifyConfigs);
String changeBuildKey = threadPoolId + "+CONFIG";
List<NotifyConfigDTO> changeNotifyConfigs = Lists.newArrayList();
for (NotifyPlatformProperties platformProperties : notifyPlatforms) {
NotifyConfigDTO notifyConfig = new NotifyConfigDTO();
notifyConfig.setPlatform(platformProperties.getPlatform());
notifyConfig.setThreadPoolId(threadPoolId);
notifyConfig.setType("CONFIG");
notifyConfig.setSecretKey(platformProperties.getSecretKey());
Map<String, String> receives = executor.getNotify().getReceives();
String receive = receives.get(platformProperties.getPlatform());
if (StrUtil.isBlank(receive)) {
receive = platformProperties.getReceives();
}
notifyConfig.setReceives(receive);
changeNotifyConfigs.add(notifyConfig);
/**
* Build single notify config.
*
* @param executor
* @return
*/
public Map<String, List<NotifyConfigDTO>> buildSingleNotifyConfig(ExecutorProperties executor) {
Map<String, List<NotifyConfigDTO>> resultMap = Maps.newHashMap();
String threadPoolId = executor.getThreadPoolId();
String alarmBuildKey = threadPoolId + "+ALARM";
List<NotifyConfigDTO> alarmNotifyConfigs = Lists.newArrayList();
List<NotifyPlatformProperties> notifyPlatforms = bootstrapCoreProperties.getNotifyPlatforms();
for (NotifyPlatformProperties platformProperties : notifyPlatforms) {
NotifyConfigDTO notifyConfig = new NotifyConfigDTO();
notifyConfig.setPlatform(platformProperties.getPlatform());
notifyConfig.setThreadPoolId(threadPoolId);
notifyConfig.setType("ALARM");
notifyConfig.setSecretKey(platformProperties.getSecretKey());
notifyConfig.setInterval(executor.getNotify().getInterval());
Map<String, String> receives = executor.getNotify().getReceives();
String receive = receives.get(platformProperties.getPlatform());
if (StrUtil.isBlank(receive)) {
receive = platformProperties.getReceives();
}
notifyConfig.setReceives(receive);
alarmNotifyConfigs.add(notifyConfig);
}
resultMap.put(alarmBuildKey, alarmNotifyConfigs);
String changeBuildKey = threadPoolId + "+CONFIG";
List<NotifyConfigDTO> changeNotifyConfigs = Lists.newArrayList();
resultMap.put(changeBuildKey, changeNotifyConfigs);
for (NotifyPlatformProperties platformProperties : notifyPlatforms) {
NotifyConfigDTO notifyConfig = new NotifyConfigDTO();
notifyConfig.setPlatform(platformProperties.getPlatform());
notifyConfig.setThreadPoolId(threadPoolId);
notifyConfig.setType("CONFIG");
notifyConfig.setSecretKey(platformProperties.getSecretKey());
Map<String, String> receives = executor.getNotify().getReceives();
String receive = receives.get(platformProperties.getPlatform());
if (StrUtil.isBlank(receive)) {
receive = platformProperties.getReceives();
}
notifyConfig.setReceives(receive);
changeNotifyConfigs.add(notifyConfig);
}
resultMap.forEach((key, val) ->
val.stream().filter(each -> StrUtil.equals("ALARM", each.getType()))
resultMap.put(changeBuildKey, changeNotifyConfigs);
resultMap.forEach(
(key, val) -> val.stream()
.filter(each -> StrUtil.equals("ALARM", each.getType()))
.forEach(each -> alarmControlHandler.initCacheAndLock(each.getThreadPoolId(), each.getPlatform(), each.getInterval()))
);

@ -1,14 +1,19 @@
package cn.hippo4j.core.starter.refresher;
import cn.hippo4j.common.api.ThreadPoolDynamicRefresh;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.notify.HippoBaseSendMessageService;
import cn.hippo4j.common.notify.NotifyConfigDTO;
import cn.hippo4j.common.notify.request.ChangeParameterNotifyRequest;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.*;
import cn.hippo4j.core.proxy.RejectedProxyUtil;
import cn.hippo4j.core.starter.config.BootstrapCoreProperties;
import cn.hippo4j.core.starter.config.ExecutorProperties;
import cn.hippo4j.core.starter.notify.CoreNotifyConfigBuilder;
import cn.hippo4j.core.starter.parser.ConfigParserHandler;
import cn.hippo4j.core.starter.support.GlobalCoreThreadPoolManage;
import lombok.AllArgsConstructor;
@ -56,6 +61,40 @@ public abstract class AbstractCoreThreadPoolDynamicRefresh implements ThreadPool
}
BootstrapCoreProperties bindableCoreProperties = BootstrapCorePropertiesBinderAdapt.bootstrapCorePropertiesBinder(configInfo, bootstrapCoreProperties);
// platforms
refreshPlatforms(bindableCoreProperties);
// executors
refreshExecutors(bindableCoreProperties);
}
/**
* Refresh platform.
*
* @param bindableCoreProperties
*/
private void refreshPlatforms(BootstrapCoreProperties bindableCoreProperties) {
List<ExecutorProperties> executors = bindableCoreProperties.getExecutors();
for (ExecutorProperties executor : executors) {
String threadPoolId = executor.getThreadPoolId();
DynamicThreadPoolWrapper wrapper = GlobalThreadPoolManage.getExecutorService(threadPoolId);
if (!wrapper.isInitFlag()) {
HippoBaseSendMessageService sendMessageService = ApplicationContextHolder.getBean(HippoBaseSendMessageService.class);
CoreNotifyConfigBuilder configBuilder = ApplicationContextHolder.getBean(CoreNotifyConfigBuilder.class);
Map<String, List<NotifyConfigDTO>> notifyConfig = configBuilder.buildSingleNotifyConfig(executor);
sendMessageService.putPlatform(notifyConfig);
wrapper.setInitFlag(Boolean.TRUE);
}
}
}
/**
* Refresh executors.
*
* @param bindableCoreProperties
*/
private void refreshExecutors(BootstrapCoreProperties bindableCoreProperties) {
List<ExecutorProperties> executors = bindableCoreProperties.getExecutors();
for (ExecutorProperties properties : executors) {
String threadPoolId = properties.getThreadPoolId();
@ -74,6 +113,7 @@ public abstract class AbstractCoreThreadPoolDynamicRefresh implements ThreadPool
changeRequest.setBlockingQueueName(beforeProperties.getBlockingQueue());
changeRequest.setBeforeQueueCapacity(beforeProperties.getQueueCapacity());
changeRequest.setBeforeRejectedName(beforeProperties.getRejectedHandler());
changeRequest.setBeforeExecuteTimeOut(beforeProperties.getExecuteTimeOut());
changeRequest.setThreadPoolId(beforeProperties.getThreadPoolId());
changeRequest.setNowCorePoolSize(properties.getCorePoolSize());
@ -82,6 +122,7 @@ public abstract class AbstractCoreThreadPoolDynamicRefresh implements ThreadPool
changeRequest.setNowKeepAliveTime(properties.getKeepAliveTime());
changeRequest.setNowQueueCapacity(properties.getQueueCapacity());
changeRequest.setNowRejectedName(properties.getRejectedHandler());
changeRequest.setNowExecuteTimeOut(properties.getExecuteTimeOut());
GlobalCoreThreadPoolManage.refresh(threadPoolId, properties);
log.info(
@ -91,6 +132,7 @@ public abstract class AbstractCoreThreadPoolDynamicRefresh implements ThreadPool
"\n queueType :: [{}]" +
"\n capacity :: [{}]" +
"\n keepAliveTime :: [{}]" +
"\n executeTimeOut :: [{}]" +
"\n rejectedType :: [{}]" +
"\n allowCoreThreadTimeOut :: [{}]",
threadPoolId.toUpperCase(),
@ -99,6 +141,7 @@ public abstract class AbstractCoreThreadPoolDynamicRefresh implements ThreadPool
String.format("%s => %s", beforeProperties.getBlockingQueue(), properties.getBlockingQueue()),
String.format("%s => %s", beforeProperties.getQueueCapacity(), properties.getQueueCapacity()),
String.format("%s => %s", beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime()),
String.format("%s => %s", beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut()),
String.format("%s => %s", beforeProperties.getRejectedHandler(), properties.getRejectedHandler()),
String.format("%s => %s", beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut())
);
@ -124,6 +167,7 @@ public abstract class AbstractCoreThreadPoolDynamicRefresh implements ThreadPool
boolean result = !Objects.equals(beforeProperties.getCorePoolSize(), properties.getCorePoolSize())
|| !Objects.equals(beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize())
|| !Objects.equals(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut())
|| !Objects.equals(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut())
|| !Objects.equals(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime())
|| !Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler())
||
@ -157,6 +201,12 @@ public abstract class AbstractCoreThreadPoolDynamicRefresh implements ThreadPool
executor.allowCoreThreadTimeOut(properties.getAllowCoreThreadTimeOut());
}
if (!Objects.equals(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut())) {
if (executor instanceof AbstractDynamicExecutorSupport) {
((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(properties.getExecuteTimeOut());
}
}
if (!Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler())) {
RejectedExecutionHandler rejectedExecutionHandler = RejectedTypeEnum.createPolicy(properties.getRejectedHandler());
if (executor instanceof AbstractDynamicExecutorSupport) {

@ -1,17 +1,16 @@
package cn.hippo4j.core.starter.refresher;
import cn.hippo4j.common.notify.NotifyPlatformEnum;
import cn.hippo4j.common.notify.ThreadPoolNotifyAlarm;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.core.starter.config.BootstrapCoreProperties;
import cn.hippo4j.core.starter.config.ExecutorProperties;
import cn.hippo4j.core.starter.config.NotifyPlatformProperties;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.bean.copier.CopyOptions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.springframework.boot.context.properties.bind.Bindable;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.boot.context.properties.source.ConfigurationPropertySource;
import org.springframework.boot.context.properties.source.MapConfigurationPropertySource;
import java.util.List;
import java.util.Map;
@ -34,8 +33,9 @@ public class BootstrapCorePropertiesBinderAdapt {
* @return
*/
public static BootstrapCoreProperties bootstrapCorePropertiesBinder(Map<Object, Object> configInfo, BootstrapCoreProperties bootstrapCoreProperties) {
BootstrapCoreProperties bindableCoreProperties = null;
try {
BootstrapCoreProperties bindableCoreProperties = adapt(configInfo);
;
/*try {
ConfigurationPropertySource sources = new MapConfigurationPropertySource(configInfo);
Binder binder = new Binder(sources);
bindableCoreProperties = binder.bind(PREFIX, Bindable.ofInstance(bootstrapCoreProperties)).get();
@ -45,7 +45,7 @@ public class BootstrapCorePropertiesBinderAdapt {
} catch (ClassNotFoundException notEx) {
bindableCoreProperties = adapt(configInfo);
}
}
}*/
return bindableCoreProperties;
}
@ -62,7 +62,14 @@ public class BootstrapCorePropertiesBinderAdapt {
// filter
Map<Object, Object> targetMap = Maps.newHashMap();
configInfo.forEach((key, val) -> {
if (key != null && StringUtil.isNotBlank((String) key) && ((String) key).indexOf(PREFIX + ".executors") != -1) {
boolean containFlag = key != null
&& StringUtil.isNotBlank((String) key)
&& (
((String) key).indexOf(PREFIX + ".executors") != -1
|| ((String) key).indexOf(PREFIX + ".notify-platforms") != -1
|| ((String) key).indexOf(PREFIX + ".notifyPlatforms") != -1
);
if (containFlag) {
String targetKey = key.toString().replace(PREFIX + ".", "");
targetMap.put(targetKey, val);
}
@ -70,35 +77,87 @@ public class BootstrapCorePropertiesBinderAdapt {
// convert
List<ExecutorProperties> executorPropertiesList = Lists.newArrayList();
List<NotifyPlatformProperties> notifyPropertiesList = Lists.newArrayList();
for (int i = 0; i < Integer.MAX_VALUE; i++) {
Map<String, Object> tarterSingleMap = Maps.newHashMap();
Map<String, Object> executorSingleMap = Maps.newHashMap();
Map<String, Object> platformSingleMap = Maps.newHashMap();
Map<String, Object> notifySingleMap = Maps.newHashMap();
for (Map.Entry entry : targetMap.entrySet()) {
String key = entry.getKey().toString();
if (key.indexOf(i + "") != -1) {
key = key.replace("executors[" + i + "].", "");
if (key.indexOf("executors[" + i + "].") != -1) {
if (key.indexOf("executors[" + i + "].notify.") != -1) {
key = key.replace("executors[" + i + "].notify.", "");
String[] notifyKeySplit = key.split("-");
if (notifyKeySplit != null && notifyKeySplit.length > 0) {
key = key.replace("-", "_");
}
notifySingleMap.put(key, entry.getValue());
} else {
key = key.replace("executors[" + i + "].", "");
String[] keySplit = key.split("-");
if (keySplit != null && keySplit.length > 0) {
key = key.replace("-", "_");
}
executorSingleMap.put(key, entry.getValue());
}
}
if (key.indexOf("notify-platforms[" + i + "].") != -1 || key.indexOf("notifyPlatforms[" + i + "].") != -1) {
if (key.indexOf("notify-platforms[" + i + "].") != -1) {
key = key.replace("notify-platforms[" + i + "].", "");
} else {
key = key.replace("notifyPlatforms[" + i + "].", "");
}
String[] keySplit = key.split("-");
if (keySplit != null && keySplit.length > 0) {
key = key.replace("-", "_");
}
tarterSingleMap.put(key, entry.getValue());
platformSingleMap.put(key, entry.getValue());
}
}
if (CollectionUtil.isEmpty(tarterSingleMap)) {
if (CollectionUtil.isEmpty(executorSingleMap) && CollectionUtil.isEmpty(platformSingleMap)) {
break;
}
ExecutorProperties executorProperties = BeanUtil.mapToBean(tarterSingleMap, ExecutorProperties.class, true, CopyOptions.create());
if (executorProperties != null) {
executorPropertiesList.add(executorProperties);
if (CollectionUtil.isNotEmpty(executorSingleMap)) {
ExecutorProperties executorProperties = BeanUtil.mapToBean(executorSingleMap, ExecutorProperties.class, true, CopyOptions.create());
if (executorProperties != null) {
if (CollectionUtil.isNotEmpty(notifySingleMap)) {
ThreadPoolNotifyAlarm alarm = BeanUtil.mapToBean(notifySingleMap, ThreadPoolNotifyAlarm.class, true, CopyOptions.create());
Map<String, String> notifyReceivesMap = Maps.newHashMap();
for (NotifyPlatformEnum value : NotifyPlatformEnum.values()) {
Object receives = targetMap.get("executors[" + i + "].notify.receives." + value.name());
if (receives != null && StringUtil.isNotBlank((String) receives)) {
notifyReceivesMap.put(value.name(), (String) receives);
}
}
alarm.setReceives(notifyReceivesMap);
executorProperties.setNotify(alarm);
}
executorPropertiesList.add(executorProperties);
}
}
if (CollectionUtil.isNotEmpty(platformSingleMap)) {
NotifyPlatformProperties notifyPlatformProperties = BeanUtil.mapToBean(platformSingleMap, NotifyPlatformProperties.class, true, CopyOptions.create());
if (notifyPlatformProperties != null) {
notifyPropertiesList.add(notifyPlatformProperties);
}
}
}
bindableCoreProperties = new BootstrapCoreProperties();
bindableCoreProperties.setExecutors(executorPropertiesList);
bindableCoreProperties.setNotifyPlatforms(notifyPropertiesList);
} catch (Exception ex) {
throw ex;
}

@ -109,42 +109,83 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
.rejected(RejectedTypeEnum.createPolicy(executorProperties.getRejectedHandler()))
.allowCoreThreadTimeOut(executorProperties.getAllowCoreThreadTimeOut())
.build();
// 设置动态线程池增强参数
ThreadPoolNotifyAlarm notify = executorProperties.getNotify();
if (dynamicThreadPoolWrap.getExecutor() instanceof AbstractDynamicExecutorSupport) {
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(
notify.getIsAlarm(),
notify.getCapacityAlarm(),
notify.getActiveAlarm()
);
threadPoolNotifyAlarm.setInterval(notify.getInterval());
threadPoolNotifyAlarm.setReceives(notify.getReceives());
GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm);
TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).getTaskDecorator();
((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskDecorator(taskDecorator);
long awaitTerminationMillis = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).awaitTerminationMillis;
boolean waitForTasksToCompleteOnShutdown = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).waitForTasksToCompleteOnShutdown;
((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown);
}
dynamicThreadPoolWrap.setExecutor(newDynamicPoolExecutor);
} catch (Exception ex) {
log.error("Failed to initialize thread pool configuration. error :: {}", ex);
} finally {
if (Objects.isNull(dynamicThreadPoolWrap.getExecutor())) {
dynamicThreadPoolWrap.setExecutor(CommonDynamicThreadPool.getInstance(threadPoolId));
}
dynamicThreadPoolWrap.setInitFlag(Boolean.TRUE);
}
}
GlobalThreadPoolManage.registerPool(dynamicThreadPoolWrap.getTpId(), dynamicThreadPoolWrap);
GlobalCoreThreadPoolManage.register(threadPoolId, executorProperties);
// 设置动态线程池增强参数
ThreadPoolNotifyAlarm notify = Optional.ofNullable(executorProperties)
.map(each -> each.getNotify())
.orElseGet(() -> {
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(true, 80, 80);
threadPoolNotifyAlarm.setInterval(2);
return threadPoolNotifyAlarm;
});
if (dynamicThreadPoolWrap.getExecutor() instanceof AbstractDynamicExecutorSupport) {
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(
notify.getIsAlarm(),
notify.getCapacityAlarm(),
notify.getActiveAlarm()
);
threadPoolNotifyAlarm.setInterval(notify.getInterval());
threadPoolNotifyAlarm.setReceives(notify.getReceives());
GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm);
TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).getTaskDecorator();
((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskDecorator(taskDecorator);
long awaitTerminationMillis = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).awaitTerminationMillis;
boolean waitForTasksToCompleteOnShutdown = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).waitForTasksToCompleteOnShutdown;
((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown);
}
dynamicThreadPoolWrap.setExecutor(newDynamicPoolExecutor);
GlobalThreadPoolManage.registerPool(dynamicThreadPoolWrap.getTpId(), dynamicThreadPoolWrap);
GlobalCoreThreadPoolManage.register(
threadPoolId,
executorProperties == null
? buildExecutorProperties(threadPoolId, newDynamicPoolExecutor)
: executorProperties
);
return newDynamicPoolExecutor;
}
/**
* Build executor properties.
*
* @param threadPoolId
* @param executor
* @return
*/
private ExecutorProperties buildExecutorProperties(String threadPoolId, ThreadPoolExecutor executor) {
ExecutorProperties executorProperties = new ExecutorProperties();
BlockingQueue<Runnable> queue = executor.getQueue();
int queueSize = queue.size();
String queueType = queue.getClass().getSimpleName();
int remainingCapacity = queue.remainingCapacity();
int queueCapacity = queueSize + remainingCapacity;
executorProperties.setCorePoolSize(executor.getCorePoolSize())
.setMaximumPoolSize(executor.getMaximumPoolSize())
.setAllowCoreThreadTimeOut(executor.allowsCoreThreadTimeOut())
.setKeepAliveTime(executor.getKeepAliveTime(TimeUnit.SECONDS))
.setBlockingQueue(queueType)
.setExecuteTimeOut(10000L)
.setQueueCapacity(queueCapacity)
.setRejectedHandler(((DynamicThreadPoolExecutor) executor).getRedundancyHandler().getClass().getSimpleName())
.setThreadPoolId(threadPoolId);
return executorProperties;
}
}

Loading…
Cancel
Save