refactor: refactor DynamicThreadPoolRefreshListener function code

pull/1555/head
Song 2 weeks ago
parent a5ec18d7fc
commit ba7538de8e

@ -38,7 +38,6 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.annotation.Order;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -46,6 +45,7 @@ import java.util.Optional;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT;
@ -164,48 +164,63 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener<Ex
* @param executorProperties
*/
private void checkNotifyConsistencyAndReplace(ExecutorProperties executorProperties) {
boolean checkNotifyConfig = false;
boolean checkNotifyAlarm = false;
Map<String, List<NotifyConfigDTO>> newDynamicThreadPoolNotifyMap =
configModeNotifyConfigBuilder.buildSingleNotifyConfig(executorProperties);
Map<String, List<NotifyConfigDTO>> notifyConfigs = threadPoolBaseSendMessageService.getNotifyConfigs();
if (CollectionUtil.isNotEmpty(notifyConfigs)) {
for (Map.Entry<String, List<NotifyConfigDTO>> each : newDynamicThreadPoolNotifyMap.entrySet()) {
if (checkNotifyConfig) {
break;
}
List<NotifyConfigDTO> notifyConfigDTOS = notifyConfigs.get(each.getKey());
for (NotifyConfigDTO notifyConfig : each.getValue()) {
if (!notifyConfigDTOS.contains(notifyConfig)) {
checkNotifyConfig = true;
break;
}
}
}
boolean checkNotifyConfig = checkAndReplaceNotifyConfig(newDynamicThreadPoolNotifyMap, notifyConfigs);
boolean checkNotifyAlarm = checkAndReplaceNotifyAlarm(executorProperties);
if (checkNotifyConfig || checkNotifyAlarm) {
log.info("[{}] Dynamic thread pool notification property changes.", executorProperties.getThreadPoolId());
}
if (checkNotifyConfig) {
configModeNotifyConfigBuilder.initCacheAndLock(newDynamicThreadPoolNotifyMap);
threadPoolBaseSendMessageService.putPlatform(newDynamicThreadPoolNotifyMap);
}
private boolean checkAndReplaceNotifyConfig(Map<String, List<NotifyConfigDTO>> newConfigs,
Map<String, List<NotifyConfigDTO>> currentConfigs) {
if (CollectionUtil.isEmpty(currentConfigs)) {
return false;
}
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(executorProperties.getThreadPoolId());
if (threadPoolNotifyAlarm != null) {
Boolean isAlarm = executorProperties.getAlarm();
Integer activeAlarm = executorProperties.getActiveAlarm();
Integer capacityAlarm = executorProperties.getCapacityAlarm();
if ((isAlarm != null && !Objects.equals(isAlarm, threadPoolNotifyAlarm.getAlarm()))
|| (activeAlarm != null && !Objects.equals(activeAlarm, threadPoolNotifyAlarm.getActiveAlarm()))
|| (capacityAlarm != null && !Objects.equals(capacityAlarm, threadPoolNotifyAlarm.getCapacityAlarm()))) {
checkNotifyAlarm = true;
threadPoolNotifyAlarm.setAlarm(Optional.ofNullable(isAlarm).orElse(threadPoolNotifyAlarm.getAlarm()));
threadPoolNotifyAlarm.setActiveAlarm(Optional.ofNullable(activeAlarm).orElse(threadPoolNotifyAlarm.getActiveAlarm()));
threadPoolNotifyAlarm.setCapacityAlarm(Optional.ofNullable(capacityAlarm).orElse(threadPoolNotifyAlarm.getCapacityAlarm()));
for (Map.Entry<String, List<NotifyConfigDTO>> entry : newConfigs.entrySet()) {
String key = entry.getKey();
List<NotifyConfigDTO> newNotifyConfigList = entry.getValue();
List<NotifyConfigDTO> currentNotifyConfigList = currentConfigs.get(key);
if (currentNotifyConfigList == null || !currentNotifyConfigList.containsAll(newNotifyConfigList)) {
configModeNotifyConfigBuilder.initCacheAndLock(newConfigs);
threadPoolBaseSendMessageService.putPlatform(newConfigs);
return true;
}
}
if (checkNotifyConfig || checkNotifyAlarm) {
log.info("[{}] Dynamic thread pool notification property changes.", executorProperties.getThreadPoolId());
return false;
}
private boolean checkAndReplaceNotifyAlarm(ExecutorProperties executorProperties) {
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(executorProperties.getThreadPoolId());
if (threadPoolNotifyAlarm == null) {
return false;
}
boolean checkNotifyAlarm = false;
Boolean isAlarm = executorProperties.getAlarm();
Integer activeAlarm = executorProperties.getActiveAlarm();
Integer capacityAlarm = executorProperties.getCapacityAlarm();
if ((isAlarm != null && !Objects.equals(isAlarm, threadPoolNotifyAlarm.getAlarm()))
|| (activeAlarm != null && !Objects.equals(activeAlarm, threadPoolNotifyAlarm.getActiveAlarm()))
|| (capacityAlarm != null && !Objects.equals(capacityAlarm, threadPoolNotifyAlarm.getCapacityAlarm()))) {
checkNotifyAlarm = true;
threadPoolNotifyAlarm.setAlarm(isAlarm != null ? isAlarm : threadPoolNotifyAlarm.getAlarm());
threadPoolNotifyAlarm.setActiveAlarm(activeAlarm != null ? activeAlarm : threadPoolNotifyAlarm.getActiveAlarm());
threadPoolNotifyAlarm.setCapacityAlarm(capacityAlarm != null ? capacityAlarm : threadPoolNotifyAlarm.getCapacityAlarm());
}
return checkNotifyAlarm;
}
/**
* Check consistency.
*
@ -216,19 +231,26 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener<Ex
ThreadPoolExecutorHolder executorHolder = ThreadPoolExecutorRegistry.getHolder(threadPoolId);
ExecutorProperties beforeProperties = executorHolder.getExecutorProperties();
ThreadPoolExecutor executor = executorHolder.getExecutor();
if (executor == null) {
return false;
}
boolean result = (properties.getCorePoolSize() != null && !Objects.equals(beforeProperties.getCorePoolSize(), properties.getCorePoolSize()))
|| (properties.getMaximumPoolSize() != null && !Objects.equals(beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize()))
|| (properties.getAllowCoreThreadTimeOut() != null && !Objects.equals(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut()))
|| (properties.getExecuteTimeOut() != null && !Objects.equals(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut()))
|| (properties.getKeepAliveTime() != null && !Objects.equals(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime()))
|| (properties.getRejectedHandler() != null && !Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler()))
||
((properties.getQueueCapacity() != null && !Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity())
&& Objects.equals(BlockingQueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.getName(), executor.getQueue().getClass().getSimpleName())));
return result;
return Stream.of(
hasPropertyChanged(beforeProperties.getCorePoolSize(), properties.getCorePoolSize()),
hasPropertyChanged(beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize()),
hasPropertyChanged(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut()),
hasPropertyChanged(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut()),
hasPropertyChanged(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime()),
hasPropertyChanged(beforeProperties.getRejectedHandler(), properties.getRejectedHandler()),
isQueueCapacityChanged(beforeProperties, properties, executor)
).anyMatch(Boolean::booleanValue);
}
private boolean isQueueCapacityChanged(ExecutorProperties beforeProperties, ExecutorProperties properties, ThreadPoolExecutor executor) {
return properties.getQueueCapacity() != null &&
!Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity()) &&
Objects.equals(BlockingQueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.getName(), executor.getQueue().getClass().getSimpleName());
}
/**
@ -238,41 +260,62 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener<Ex
* @param properties
*/
private void dynamicRefreshPool(String threadPoolId, ExecutorProperties properties) {
ExecutorProperties beforeProperties = ThreadPoolExecutorRegistry.getHolder(threadPoolId).getExecutorProperties();
ThreadPoolExecutor executor = ThreadPoolExecutorRegistry.getHolder(threadPoolId).getExecutor();
if (properties.getMaximumPoolSize() != null && properties.getCorePoolSize() != null) {
ThreadPoolExecutorUtil.safeSetPoolSize(executor, properties.getCorePoolSize(), properties.getMaximumPoolSize());
ThreadPoolExecutorHolder holder = ThreadPoolExecutorRegistry.getHolder(threadPoolId);
ExecutorProperties beforeProperties = holder.getExecutorProperties();
ThreadPoolExecutor executor = holder.getExecutor();
if (executor == null) {
log.warn("Executor is null for threadPoolId: {}", threadPoolId);
return;
}
setPoolSizes(executor, properties);
updateExecutorProperties(executor, beforeProperties, properties);
updateQueueCapacity(executor, beforeProperties, properties);
}
private void setPoolSizes(ThreadPoolExecutor executor, ExecutorProperties properties) {
Integer corePoolSize = properties.getCorePoolSize();
Integer maximumPoolSize = properties.getMaximumPoolSize();
if (corePoolSize != null && maximumPoolSize != null) {
ThreadPoolExecutorUtil.safeSetPoolSize(executor, corePoolSize, maximumPoolSize);
} else {
if (properties.getMaximumPoolSize() != null) {
executor.setMaximumPoolSize(properties.getMaximumPoolSize());
if (maximumPoolSize != null) {
executor.setMaximumPoolSize(maximumPoolSize);
}
if (properties.getCorePoolSize() != null) {
executor.setCorePoolSize(properties.getCorePoolSize());
if (corePoolSize != null) {
executor.setCorePoolSize(corePoolSize);
}
}
if (properties.getAllowCoreThreadTimeOut() != null && !Objects.equals(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut())) {
}
private void updateExecutorProperties(ThreadPoolExecutor executor, ExecutorProperties beforeProperties, ExecutorProperties properties) {
if (hasPropertyChanged(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut())) {
executor.allowCoreThreadTimeOut(properties.getAllowCoreThreadTimeOut());
}
if (properties.getExecuteTimeOut() != null && !Objects.equals(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut())) {
if (executor instanceof DynamicThreadPoolExecutor) {
((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(properties.getExecuteTimeOut());
}
if (hasPropertyChanged(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut()) && executor instanceof DynamicThreadPoolExecutor) {
((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(properties.getExecuteTimeOut());
}
if (properties.getRejectedHandler() != null && !Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler())) {
if (hasPropertyChanged(beforeProperties.getRejectedHandler(), properties.getRejectedHandler())) {
RejectedExecutionHandler rejectedExecutionHandler = RejectedPolicyTypeEnum.createPolicy(properties.getRejectedHandler());
executor.setRejectedExecutionHandler(rejectedExecutionHandler);
}
if (properties.getKeepAliveTime() != null && !Objects.equals(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime())) {
if (hasPropertyChanged(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime())) {
executor.setKeepAliveTime(properties.getKeepAliveTime(), TimeUnit.SECONDS);
}
if (properties.getQueueCapacity() != null && !Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity())
&& Objects.equals(BlockingQueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.getName(), executor.getQueue().getClass().getSimpleName())) {
if (executor.getQueue() instanceof ResizableCapacityLinkedBlockingQueue) {
ResizableCapacityLinkedBlockingQueue<?> queue = (ResizableCapacityLinkedBlockingQueue<?>) executor.getQueue();
queue.setCapacity(properties.getQueueCapacity());
} else {
log.warn("The queue length cannot be modified. Queue type mismatch. Current queue type: {}", executor.getQueue().getClass().getSimpleName());
}
}
private void updateQueueCapacity(ThreadPoolExecutor executor, ExecutorProperties beforeProperties, ExecutorProperties properties) {
if (hasPropertyChanged(beforeProperties.getQueueCapacity(), properties.getQueueCapacity())
&& executor.getQueue() instanceof ResizableCapacityLinkedBlockingQueue) {
((ResizableCapacityLinkedBlockingQueue<?>) executor.getQueue()).setCapacity(properties.getQueueCapacity());
} else {
log.warn("The queue length cannot be modified. Queue type mismatch. Current queue type: {}", executor.getQueue().getClass().getSimpleName());
}
}
private <T> boolean hasPropertyChanged(T before, T after) {
return after != null && !Objects.equals(before, after);
}
}

Loading…
Cancel
Save