Merge pull request #411 from mabaiwan/develop

Notify related parameters to add dynamic change function (#407)
pull/413/head
小马哥 2 years ago committed by GitHub
commit 2260365eae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -19,62 +19,64 @@ package cn.hippo4j.message.dto;
import cn.hippo4j.message.enums.NotifyTypeEnum; import cn.hippo4j.message.enums.NotifyTypeEnum;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
/** /**
* Notify config dto. * Notify config dto.
*/ */
@Data @Data
@EqualsAndHashCode
@Accessors(chain = true) @Accessors(chain = true)
public class NotifyConfigDTO { public class NotifyConfigDTO {
/** /**
* id * Tenant id
*/ */
private String tenantId; private String tenantId;
/** /**
* id * Item id
*/ */
private String itemId; private String itemId;
/** /**
* 线id * Thread-pool id
*/ */
private String tpId; private String tpId;
/** /**
* * Platform
*/ */
private String platform; private String platform;
/** /**
* * Type
*/ */
private String type; private String type;
/** /**
* * Secret key
*/ */
private String secretKey; private String secretKey;
/** /**
* * Secret
*/ */
private String secret; private String secret;
/** /**
* * Interval
*/ */
private Integer interval; private Integer interval;
/** /**
* * Receives
*/ */
private String receives; private String receives;
/** /**
* * Type enum
*/ */
private NotifyTypeEnum typeEnum; private NotifyTypeEnum typeEnum;
} }

@ -27,6 +27,7 @@ import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import lombok.Getter;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner; import org.springframework.boot.CommandLineRunner;
@ -45,7 +46,8 @@ public class HippoBaseSendMessageService implements HippoSendMessageService, Com
private final AlarmControlHandler alarmControlHandler; private final AlarmControlHandler alarmControlHandler;
private final Map<String, List<NotifyConfigDTO>> notifyConfigs = Maps.newHashMap(); @Getter
public final Map<String, List<NotifyConfigDTO>> notifyConfigs = Maps.newHashMap();
private final Map<String, SendMessageHandler> sendMessageHandlers = Maps.newHashMap(); private final Map<String, SendMessageHandler> sendMessageHandlers = Maps.newHashMap();

@ -33,36 +33,39 @@ import java.util.Map;
public class ThreadPoolNotifyAlarm { public class ThreadPoolNotifyAlarm {
/** /**
* isAlarm * Is alarm
*/ */
@NonNull @NonNull
private Boolean isAlarm; private Boolean isAlarm;
/** /**
* activeAlarm * Active alarm
*/ */
@NonNull @NonNull
private Integer activeAlarm; private Integer activeAlarm;
/** /**
* capacityAlarm * Capacity alarm
*/ */
@NonNull @NonNull
private Integer capacityAlarm; private Integer capacityAlarm;
/** /**
* interval * Interval
*/ */
private Integer interval; private Integer interval;
/** /**
* receive * Receive
*/ */
private String receive; private String receive;
/** /**
* receives * Receives
* ps *
* <p>
* Do not enable this configuration for the time being, it may be useful if you develop mailboxes in the future.
* </p>
*/ */
@Deprecated @Deprecated
private Map<String, String> receives; private Map<String, String> receives;

@ -36,6 +36,7 @@ import cn.hippo4j.core.springboot.starter.support.ThreadPoolAdapterRegister;
import cn.hippo4j.message.api.NotifyConfigBuilder; import cn.hippo4j.message.api.NotifyConfigBuilder;
import cn.hippo4j.message.config.MessageConfiguration; import cn.hippo4j.message.config.MessageConfiguration;
import cn.hippo4j.message.service.AlarmControlHandler; import cn.hippo4j.message.service.AlarmControlHandler;
import cn.hippo4j.message.service.HippoBaseSendMessageService;
import cn.hippo4j.message.service.HippoSendMessageService; import cn.hippo4j.message.service.HippoSendMessageService;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration; import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
@ -107,8 +108,11 @@ public class DynamicThreadPoolCoreAutoConfiguration {
} }
@Bean @Bean
public ExecutorsListener hippo4jExecutorsListener(ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler) { @SuppressWarnings("all")
return new ExecutorsListener(threadPoolNotifyAlarmHandler); public ExecutorsListener hippo4jExecutorsListener(ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler,
CoreNotifyConfigBuilder coreNotifyConfigBuilder,
HippoBaseSendMessageService hippoBaseSendMessageService) {
return new ExecutorsListener(threadPoolNotifyAlarmHandler, coreNotifyConfigBuilder, hippoBaseSendMessageService);
} }
@Bean @Bean

@ -64,7 +64,9 @@ public class CoreNotifyConfigBuilder implements NotifyConfigBuilder {
return resultMap; return resultMap;
} }
for (ExecutorProperties executor : executors) { for (ExecutorProperties executor : executors) {
resultMap.putAll(buildSingleNotifyConfig(executor)); Map<String, List<NotifyConfigDTO>> buildSingleNotifyConfig = buildSingleNotifyConfig(executor);
initCacheAndLock(buildSingleNotifyConfig);
resultMap.putAll(buildSingleNotifyConfig);
} }
return resultMap; return resultMap;
} }
@ -80,7 +82,6 @@ public class CoreNotifyConfigBuilder implements NotifyConfigBuilder {
String threadPoolId = executor.getThreadPoolId(); String threadPoolId = executor.getThreadPoolId();
String alarmBuildKey = threadPoolId + "+ALARM"; String alarmBuildKey = threadPoolId + "+ALARM";
List<NotifyConfigDTO> alarmNotifyConfigs = Lists.newArrayList(); List<NotifyConfigDTO> alarmNotifyConfigs = Lists.newArrayList();
List<NotifyPlatformProperties> notifyPlatforms = bootstrapCoreProperties.getNotifyPlatforms(); List<NotifyPlatformProperties> notifyPlatforms = bootstrapCoreProperties.getNotifyPlatforms();
for (NotifyPlatformProperties platformProperties : notifyPlatforms) { for (NotifyPlatformProperties platformProperties : notifyPlatforms) {
NotifyConfigDTO notifyConfig = new NotifyConfigDTO(); NotifyConfigDTO notifyConfig = new NotifyConfigDTO();
@ -97,10 +98,8 @@ public class CoreNotifyConfigBuilder implements NotifyConfigBuilder {
alarmNotifyConfigs.add(notifyConfig); alarmNotifyConfigs.add(notifyConfig);
} }
resultMap.put(alarmBuildKey, alarmNotifyConfigs); resultMap.put(alarmBuildKey, alarmNotifyConfigs);
String changeBuildKey = threadPoolId + "+CONFIG"; String changeBuildKey = threadPoolId + "+CONFIG";
List<NotifyConfigDTO> changeNotifyConfigs = Lists.newArrayList(); List<NotifyConfigDTO> changeNotifyConfigs = Lists.newArrayList();
for (NotifyPlatformProperties platformProperties : notifyPlatforms) { for (NotifyPlatformProperties platformProperties : notifyPlatforms) {
NotifyConfigDTO notifyConfig = new NotifyConfigDTO(); NotifyConfigDTO notifyConfig = new NotifyConfigDTO();
notifyConfig.setPlatform(platformProperties.getPlatform()); notifyConfig.setPlatform(platformProperties.getPlatform());
@ -112,13 +111,14 @@ public class CoreNotifyConfigBuilder implements NotifyConfigBuilder {
changeNotifyConfigs.add(notifyConfig); changeNotifyConfigs.add(notifyConfig);
} }
resultMap.put(changeBuildKey, changeNotifyConfigs); resultMap.put(changeBuildKey, changeNotifyConfigs);
return resultMap;
}
resultMap.forEach( public void initCacheAndLock(Map<String, List<NotifyConfigDTO>> buildSingleNotifyConfig) {
buildSingleNotifyConfig.forEach(
(key, val) -> val.stream() (key, val) -> val.stream()
.filter(each -> StrUtil.equals("ALARM", each.getType())) .filter(each -> StrUtil.equals("ALARM", each.getType()))
.forEach(each -> alarmControlHandler.initCacheAndLock(each.getTpId(), each.getPlatform(), each.getInterval()))); .forEach(each -> alarmControlHandler.initCacheAndLock(each.getTpId(), each.getPlatform(), each.getInterval())));
return resultMap;
} }
private String buildReceive(ExecutorProperties executor, NotifyPlatformProperties platformProperties) { private String buildReceive(ExecutorProperties executor, NotifyPlatformProperties platformProperties) {

@ -17,9 +17,10 @@
package cn.hippo4j.core.springboot.starter.refresher.event; package cn.hippo4j.core.springboot.starter.refresher.event;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest; import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport; import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.core.executor.support.QueueTypeEnum; import cn.hippo4j.core.executor.support.QueueTypeEnum;
@ -28,13 +29,20 @@ import cn.hippo4j.core.executor.support.ResizableCapacityLinkedBlockingQueue;
import cn.hippo4j.core.proxy.RejectedProxyUtil; import cn.hippo4j.core.proxy.RejectedProxyUtil;
import cn.hippo4j.core.springboot.starter.config.BootstrapCoreProperties; import cn.hippo4j.core.springboot.starter.config.BootstrapCoreProperties;
import cn.hippo4j.core.springboot.starter.config.ExecutorProperties; import cn.hippo4j.core.springboot.starter.config.ExecutorProperties;
import cn.hippo4j.core.springboot.starter.notify.CoreNotifyConfigBuilder;
import cn.hippo4j.core.springboot.starter.support.GlobalCoreThreadPoolManage; import cn.hippo4j.core.springboot.starter.support.GlobalCoreThreadPoolManage;
import cn.hippo4j.message.dto.NotifyConfigDTO;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.message.service.HippoBaseSendMessageService;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import com.google.common.collect.Lists;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener; import org.springframework.context.ApplicationListener;
import org.springframework.core.annotation.Order; import org.springframework.core.annotation.Order;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@ -55,20 +63,27 @@ public class ExecutorsListener implements ApplicationListener<Hippo4jCoreDynamic
private final ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler; private final ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler;
private final CoreNotifyConfigBuilder coreNotifyConfigBuilder;
private final HippoBaseSendMessageService hippoBaseSendMessageService;
@Override @Override
public void onApplicationEvent(Hippo4jCoreDynamicRefreshEvent threadPoolDynamicRefreshEvent) { public void onApplicationEvent(Hippo4jCoreDynamicRefreshEvent threadPoolDynamicRefreshEvent) {
BootstrapCoreProperties bindableCoreProperties = threadPoolDynamicRefreshEvent.getBootstrapCoreProperties(); BootstrapCoreProperties bindableCoreProperties = threadPoolDynamicRefreshEvent.getBootstrapCoreProperties();
List<ExecutorProperties> executors = bindableCoreProperties.getExecutors(); List<ExecutorProperties> executors = bindableCoreProperties.getExecutors();
for (ExecutorProperties properties : executors) { for (ExecutorProperties properties : executors) {
String threadPoolId = properties.getThreadPoolId(); String threadPoolId = properties.getThreadPoolId();
// Check whether the notification configuration is consistent.
// this operation will not trigger the notification.
checkNotifyConsistencyAndReplace(properties);
if (!checkConsistency(threadPoolId, properties)) { if (!checkConsistency(threadPoolId, properties)) {
continue; continue;
} }
// refresh executor pool // refresh executor pool.
dynamicRefreshPool(threadPoolId, properties); dynamicRefreshPool(threadPoolId, properties);
// old properties // old properties.
ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId()); ExecutorProperties beforeProperties = GlobalCoreThreadPoolManage.getProperties(properties.getThreadPoolId());
// refresh executor properties // refresh executor properties.
GlobalCoreThreadPoolManage.refresh(threadPoolId, properties); GlobalCoreThreadPoolManage.refresh(threadPoolId, properties);
log.info(CHANGE_THREAD_POOL_TEXT, log.info(CHANGE_THREAD_POOL_TEXT,
threadPoolId.toUpperCase(), threadPoolId.toUpperCase(),
@ -116,6 +131,56 @@ public class ExecutorsListener implements ApplicationListener<Hippo4jCoreDynamic
return changeRequest; return changeRequest;
} }
/**
* Check notify consistency and replace.
*
* @param properties
*/
private void checkNotifyConsistencyAndReplace(ExecutorProperties properties) {
boolean checkNotifyConfig = false;
boolean checkNotifyAlarm = false;
List<String> changeKeys = Lists.newArrayList();
Map<String, List<NotifyConfigDTO>> newDynamicThreadPoolNotifyMap = coreNotifyConfigBuilder.buildSingleNotifyConfig(properties);
Map<String, List<NotifyConfigDTO>> notifyConfigs = hippoBaseSendMessageService.getNotifyConfigs();
if (CollectionUtil.isNotEmpty(notifyConfigs)) {
for (Map.Entry<String, List<NotifyConfigDTO>> each : newDynamicThreadPoolNotifyMap.entrySet()) {
if (checkNotifyConfig) {
break;
}
List<NotifyConfigDTO> notifyConfigDTOS = notifyConfigs.get(each.getKey());
for (NotifyConfigDTO notifyConfig : each.getValue()) {
if (!notifyConfigDTOS.contains(notifyConfig)) {
checkNotifyConfig = true;
changeKeys.add(each.getKey());
break;
}
}
}
}
if (checkNotifyConfig) {
coreNotifyConfigBuilder.initCacheAndLock(newDynamicThreadPoolNotifyMap);
hippoBaseSendMessageService.putPlatform(newDynamicThreadPoolNotifyMap);
}
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(properties.getThreadPoolId());
if (threadPoolNotifyAlarm != null && properties.getNotify() != null) {
ThreadPoolNotifyAlarm notify = properties.getNotify();
boolean isAlarm = notify.getIsAlarm();
Integer activeAlarm = notify.getActiveAlarm();
Integer capacityAlarm = notify.getCapacityAlarm();
if (threadPoolNotifyAlarm.getIsAlarm() != isAlarm
|| threadPoolNotifyAlarm.getActiveAlarm() != activeAlarm
|| threadPoolNotifyAlarm.getCapacityAlarm() != capacityAlarm) {
checkNotifyAlarm = true;
threadPoolNotifyAlarm.setIsAlarm(isAlarm);
threadPoolNotifyAlarm.setActiveAlarm(activeAlarm);
threadPoolNotifyAlarm.setCapacityAlarm(capacityAlarm);
}
}
if (checkNotifyConfig || checkNotifyAlarm) {
log.info("[{}] Dynamic thread pool notification property changes.", properties.getThreadPoolId());
}
}
/** /**
* Check consistency. * Check consistency.
* *

Loading…
Cancel
Save