修复控制消息推送报警频率方法并发安全问题. (#82)

pull/84/head
chen.ma 3 years ago
parent 1e912e2223
commit 417b3ba156

@ -1,11 +1,15 @@
package cn.hippo4j.starter.alarm;
import cn.hippo4j.common.constant.Constants;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Maps;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* .
@ -15,7 +19,9 @@ import java.util.Map;
*/
public class AlarmControlHandler {
public static Map<String, Cache<String, String>> THREAD_POOL_ALARM_CACHE = Maps.newConcurrentMap();
private final Map<String, ReentrantLock> threadPoolLock = Maps.newHashMap();
private final Map<String, Cache<String, String>> threadPoolAlarmCache = Maps.newConcurrentMap();
/**
* .
@ -24,17 +30,48 @@ public class AlarmControlHandler {
* @return
*/
public boolean isSendAlarm(AlarmControlDTO alarmControl) {
Cache<String, String> cache = THREAD_POOL_ALARM_CACHE.get(alarmControl.buildPk());
if (cache != null) {
String pkId = cache.getIfPresent(alarmControl.getTypeEnum().name());
if (StrUtil.isBlank(pkId)) {
// val 无意义
cache.put(alarmControl.getTypeEnum().name(), IdUtil.simpleUUID());
return true;
String threadPoolKey = alarmControl.buildPk();
Cache<String, String> cache = threadPoolAlarmCache.get(threadPoolKey);
if (cache == null) {
return false;
}
String pkId = cache.getIfPresent(alarmControl.getTypeEnum().name());
if (StrUtil.isBlank(pkId)) {
ReentrantLock lock = threadPoolLock.get(threadPoolKey);
lock.lock();
try {
pkId = cache.getIfPresent(alarmControl.getTypeEnum().name());
if (StrUtil.isBlank(pkId)) {
// val 无意义
cache.put(alarmControl.getTypeEnum().name(), IdUtil.simpleUUID());
return true;
}
} finally {
lock.unlock();
}
}
return false;
}
/**
* Init cache and lock.
*
* @param threadPoolId
* @param platform
* @param interval
*/
public void initCacheAndLock(String threadPoolId, String platform, Integer interval) {
String threadPoolKey = StrUtil.builder(threadPoolId, Constants.GROUP_KEY_DELIMITER, platform).toString();
Cache<String, String> cache = CacheBuilder.newBuilder()
.expireAfterWrite(interval, TimeUnit.MINUTES)
.build();
threadPoolAlarmCache.put(threadPoolKey, cache);
// Set the lock to prevent false sending of alarm information.
ReentrantLock reentrantLock = new ReentrantLock();
threadPoolLock.put(threadPoolKey, reentrantLock);
}
}

@ -1,7 +1,6 @@
package cn.hippo4j.starter.alarm;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.constant.Constants;
import cn.hippo4j.common.model.PoolParameterInfo;
import cn.hippo4j.common.toolkit.GroupKey;
import cn.hippo4j.common.toolkit.JSONUtil;
@ -12,8 +11,6 @@ import cn.hippo4j.starter.core.GlobalThreadPoolManage;
import cn.hippo4j.starter.remote.HttpAgent;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.AllArgsConstructor;
@ -25,7 +22,6 @@ import org.springframework.beans.factory.InitializingBean;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import static cn.hippo4j.common.constant.Constants.BASE_PATH;
@ -136,12 +132,7 @@ public class BaseSendMessageService implements InitializingBean, SendMessageServ
ALARM_NOTIFY_CONFIG.forEach((key, val) ->
val.stream().filter(each -> StrUtil.equals("ALARM", each.getType()))
.forEach(each -> {
Cache<String, String> cache = CacheBuilder.newBuilder()
.expireAfterWrite(each.getInterval(), TimeUnit.MINUTES)
.build();
AlarmControlHandler.THREAD_POOL_ALARM_CACHE.put(StrUtil.builder(each.getTpId(), Constants.GROUP_KEY_DELIMITER, each.getPlatform()).toString(), cache);
})
.forEach(each -> alarmControlHandler.initCacheAndLock(each.getTpId(), each.getPlatform(), each.getInterval()))
);
}
}

Loading…
Cancel
Save