diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/alarm/AlarmControlHandler.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/alarm/AlarmControlHandler.java index eae63a4d..cd3cd628 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/alarm/AlarmControlHandler.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/alarm/AlarmControlHandler.java @@ -1,11 +1,15 @@ package cn.hippo4j.starter.alarm; +import cn.hippo4j.common.constant.Constants; import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.StrUtil; import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.google.common.collect.Maps; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; /** * 报警控制组件. @@ -15,7 +19,9 @@ import java.util.Map; */ public class AlarmControlHandler { - public static Map> THREAD_POOL_ALARM_CACHE = Maps.newConcurrentMap(); + private final Map threadPoolLock = Maps.newHashMap(); + + private final Map> threadPoolAlarmCache = Maps.newConcurrentMap(); /** * 控制消息推送报警频率. @@ -24,17 +30,48 @@ public class AlarmControlHandler { * @return */ public boolean isSendAlarm(AlarmControlDTO alarmControl) { - Cache cache = THREAD_POOL_ALARM_CACHE.get(alarmControl.buildPk()); - if (cache != null) { - String pkId = cache.getIfPresent(alarmControl.getTypeEnum().name()); - if (StrUtil.isBlank(pkId)) { - // val 无意义 - cache.put(alarmControl.getTypeEnum().name(), IdUtil.simpleUUID()); - return true; + String threadPoolKey = alarmControl.buildPk(); + Cache cache = threadPoolAlarmCache.get(threadPoolKey); + if (cache == null) { + return false; + } + + String pkId = cache.getIfPresent(alarmControl.getTypeEnum().name()); + if (StrUtil.isBlank(pkId)) { + ReentrantLock lock = threadPoolLock.get(threadPoolKey); + lock.lock(); + try { + pkId = cache.getIfPresent(alarmControl.getTypeEnum().name()); + if (StrUtil.isBlank(pkId)) { + // val 无意义 + cache.put(alarmControl.getTypeEnum().name(), IdUtil.simpleUUID()); + return true; + } + } finally { + lock.unlock(); } } return false; } + /** + * Init cache and lock. + * + * @param threadPoolId + * @param platform + * @param interval + */ + public void initCacheAndLock(String threadPoolId, String platform, Integer interval) { + String threadPoolKey = StrUtil.builder(threadPoolId, Constants.GROUP_KEY_DELIMITER, platform).toString(); + Cache cache = CacheBuilder.newBuilder() + .expireAfterWrite(interval, TimeUnit.MINUTES) + .build(); + threadPoolAlarmCache.put(threadPoolKey, cache); + + // Set the lock to prevent false sending of alarm information. + ReentrantLock reentrantLock = new ReentrantLock(); + threadPoolLock.put(threadPoolKey, reentrantLock); + } + } diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/alarm/BaseSendMessageService.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/alarm/BaseSendMessageService.java index 5384f532..f6688686 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/alarm/BaseSendMessageService.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/alarm/BaseSendMessageService.java @@ -1,7 +1,6 @@ package cn.hippo4j.starter.alarm; import cn.hippo4j.common.config.ApplicationContextHolder; -import cn.hippo4j.common.constant.Constants; import cn.hippo4j.common.model.PoolParameterInfo; import cn.hippo4j.common.toolkit.GroupKey; import cn.hippo4j.common.toolkit.JSONUtil; @@ -12,8 +11,6 @@ import cn.hippo4j.starter.core.GlobalThreadPoolManage; import cn.hippo4j.starter.remote.HttpAgent; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.StrUtil; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import lombok.AllArgsConstructor; @@ -25,7 +22,6 @@ import org.springframework.beans.factory.InitializingBean; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; import static cn.hippo4j.common.constant.Constants.BASE_PATH; @@ -136,12 +132,7 @@ public class BaseSendMessageService implements InitializingBean, SendMessageServ ALARM_NOTIFY_CONFIG.forEach((key, val) -> val.stream().filter(each -> StrUtil.equals("ALARM", each.getType())) - .forEach(each -> { - Cache cache = CacheBuilder.newBuilder() - .expireAfterWrite(each.getInterval(), TimeUnit.MINUTES) - .build(); - AlarmControlHandler.THREAD_POOL_ALARM_CACHE.put(StrUtil.builder(each.getTpId(), Constants.GROUP_KEY_DELIMITER, each.getPlatform()).toString(), cache); - }) + .forEach(each -> alarmControlHandler.initCacheAndLock(each.getTpId(), each.getPlatform(), each.getInterval())) ); } }