重构消息报警通知模块, 以报警平台、类型纬度拆分.

pull/10/head
chen.ma 3 years ago
parent e91e989447
commit de18fd509e

@ -1,15 +1,15 @@
package cn.hippo4j.config.mapper; package cn.hippo4j.config.mapper;
import cn.hippo4j.config.model.AlarmInfo; import cn.hippo4j.config.model.NotifyInfo;
import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Mapper;
/** /**
* Alarm info mapper. * Notify info mapper.
* *
* @author chen.ma * @author chen.ma
* @date 2021/11/17 22:04 * @date 2021/11/17 22:04
*/ */
@Mapper @Mapper
public interface AlarmInfoMapper extends BaseMapper<AlarmInfo> { public interface NotifyInfoMapper extends BaseMapper<NotifyInfo> {
} }

@ -1,5 +1,6 @@
package cn.hippo4j.config.model; package cn.hippo4j.config.model;
import com.baomidou.mybatisplus.annotation.FieldFill;
import com.baomidou.mybatisplus.annotation.TableField; import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName; import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data; import lombok.Data;
@ -7,14 +8,14 @@ import lombok.Data;
import java.util.Date; import java.util.Date;
/** /**
* . * .
* *
* @author chen.ma * @author chen.ma
* @date 2021/11/17 22:03 * @date 2021/11/17 22:03
*/ */
@Data @Data
@TableName("alarm") @TableName("notify")
public class AlarmInfo { public class NotifyInfo {
/** /**
* id * id
@ -41,6 +42,11 @@ public class AlarmInfo {
*/ */
private String platform; private String platform;
/**
*
*/
private String type;
/** /**
* *
*/ */
@ -60,16 +66,24 @@ public class AlarmInfo {
/** /**
* *
*/ */
@TableField(fill = FieldFill.INSERT)
private Date gmtCreate; private Date gmtCreate;
/** /**
* *
*/ */
@TableField(fill = FieldFill.INSERT_UPDATE)
private Date gmtModified; private Date gmtModified;
/**
*
*/
private Integer enable;
/** /**
* *
*/ */
@TableField(fill = FieldFill.INSERT)
private Integer delFlag; private Integer delFlag;
} }

@ -1,29 +0,0 @@
package cn.hippo4j.config.model.biz.alarm;
import cn.hippo4j.config.model.AlarmInfo;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.util.List;
/**
* Alarm list resp dto.
*
* @author chen.ma
* @date 2021/11/17 22:53
*/
@Data
@AllArgsConstructor
public class AlarmListRespDTO {
/**
* 线ID
*/
private String threadPoolId;
/**
*
*/
private List<AlarmInfo> alarmNotifyList;
}

@ -1,22 +0,0 @@
package cn.hippo4j.config.model.biz.alarm;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.Data;
import java.util.List;
/**
* Alarm query req dto.
*
* @author chen.ma
* @date 2021/11/17 22:52
*/
@Data
public class AlarmQueryReqDTO extends Page {
/**
* groupKeys
*/
private List<String> groupKeys;
}

@ -0,0 +1,29 @@
package cn.hippo4j.config.model.biz.notify;
import cn.hippo4j.config.model.NotifyInfo;
import lombok.AllArgsConstructor;
import lombok.Data;
import java.util.List;
/**
* Notify list resp dto.
*
* @author chen.ma
* @date 2021/11/17 22:53
*/
@Data
@AllArgsConstructor
public class NotifyListRespDTO {
/**
* Key
*/
private String notifyKey;
/**
*
*/
private List<NotifyInfo> notifyList;
}

@ -0,0 +1,37 @@
package cn.hippo4j.config.model.biz.notify;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.Data;
import java.util.List;
/**
* Notify query req dto.
*
* @author chen.ma
* @date 2021/11/17 22:52
*/
@Data
public class NotifyQueryReqDTO extends Page {
/**
* groupKeys
*/
private List<String> groupKeys;
/**
* id
*/
private String tenantId;
/**
* id
*/
private String itemId;
/**
* 线id
*/
private String tpId;
}

@ -0,0 +1,59 @@
package cn.hippo4j.config.model.biz.notify;
import lombok.Data;
/**
* .
*
* @author chen.ma
* @date 2021/11/18 20:15
*/
@Data
public class NotifyReqDTO {
/**
* id
*/
private String tenantId;
/**
* id
*/
private String itemId;
/**
* 线id
*/
private String tpId;
/**
*
*/
private String platform;
/**
*
*/
private String type;
/**
*
*/
private String secretKey;
/**
*
*/
private Integer interval;
/**
*
*/
private String receives;
/**
*
*/
private Integer enable;
}

@ -0,0 +1,59 @@
package cn.hippo4j.config.model.biz.notify;
import lombok.Data;
/**
* .
*
* @author chen.ma
* @date 2021/11/18 20:07
*/
@Data
public class NotifyRespDTO {
/**
* id
*/
private String tenantId;
/**
* id
*/
private String itemId;
/**
* 线id
*/
private String tpId;
/**
*
*/
private String platform;
/**
*
*/
private String type;
/**
*
*/
private String secretKey;
/**
*
*/
private Integer interval;
/**
*
*/
private String receives;
/**
*
*/
private Integer enable;
}

@ -1,24 +0,0 @@
package cn.hippo4j.config.service.biz;
import cn.hippo4j.config.model.biz.alarm.AlarmListRespDTO;
import cn.hippo4j.config.model.biz.alarm.AlarmQueryReqDTO;
import java.util.List;
/**
* .
*
* @author chen.ma
* @date 2021/11/17 22:01
*/
public interface AlarmService {
/**
* .
*
* @param reqDTO
* @return
*/
List<AlarmListRespDTO> listAlarmConfig(AlarmQueryReqDTO reqDTO);
}

@ -0,0 +1,56 @@
package cn.hippo4j.config.service.biz;
import cn.hippo4j.config.model.biz.notify.NotifyListRespDTO;
import cn.hippo4j.config.model.biz.notify.NotifyQueryReqDTO;
import cn.hippo4j.config.model.biz.notify.NotifyReqDTO;
import cn.hippo4j.config.model.biz.notify.NotifyRespDTO;
import com.baomidou.mybatisplus.core.metadata.IPage;
import java.util.List;
/**
* .
*
* @author chen.ma
* @date 2021/11/17 22:01
*/
public interface NotifyService {
/**
* .
*
* @param reqDTO
* @return
*/
List<NotifyListRespDTO> listNotifyConfig(NotifyQueryReqDTO reqDTO);
/**
* .
*
* @param reqDTO
* @return
*/
IPage<NotifyRespDTO> queryPage(NotifyQueryReqDTO reqDTO);
/**
* .
*
* @param reqDTO
*/
void save(NotifyReqDTO reqDTO);
/**
* .
*
* @param reqDTO
*/
void update(NotifyReqDTO reqDTO);
/**
* .
*
* @param reqDTO
*/
void delete(NotifyReqDTO reqDTO);
}

@ -1,48 +0,0 @@
package cn.hippo4j.config.service.biz.impl;
import cn.hippo4j.common.toolkit.GroupKey;
import cn.hippo4j.config.mapper.AlarmInfoMapper;
import cn.hippo4j.config.model.AlarmInfo;
import cn.hippo4j.config.model.biz.alarm.AlarmListRespDTO;
import cn.hippo4j.config.model.biz.alarm.AlarmQueryReqDTO;
import cn.hippo4j.config.service.biz.AlarmService;
import cn.hutool.core.collection.CollUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* .
*
* @author chen.ma
* @date 2021/11/17 22:02
*/
@Service
@AllArgsConstructor
public class AlarmServiceImpl implements AlarmService {
private final AlarmInfoMapper alarmInfoMapper;
@Override
public List<AlarmListRespDTO> listAlarmConfig(AlarmQueryReqDTO reqDTO) {
List<AlarmListRespDTO> alarmListRespList = Lists.newArrayList();
reqDTO.getGroupKeys().forEach(each -> {
String[] parseKey = GroupKey.parseKey(each);
LambdaQueryWrapper<AlarmInfo> queryWrapper = Wrappers.lambdaQuery(AlarmInfo.class)
.eq(AlarmInfo::getTenantId, parseKey[2])
.eq(AlarmInfo::getItemId, parseKey[1])
.eq(AlarmInfo::getTpId, parseKey[0]);
List<AlarmInfo> alarmInfos = alarmInfoMapper.selectList(queryWrapper);
if (CollUtil.isNotEmpty(alarmInfos)) {
alarmListRespList.add(new AlarmListRespDTO(parseKey[0], alarmInfos));
}
});
return alarmListRespList;
}
}

@ -0,0 +1,104 @@
package cn.hippo4j.config.service.biz.impl;
import cn.hippo4j.common.toolkit.GroupKey;
import cn.hippo4j.config.enums.DelEnum;
import cn.hippo4j.config.mapper.NotifyInfoMapper;
import cn.hippo4j.config.model.NotifyInfo;
import cn.hippo4j.config.model.biz.notify.NotifyListRespDTO;
import cn.hippo4j.config.model.biz.notify.NotifyQueryReqDTO;
import cn.hippo4j.config.model.biz.notify.NotifyReqDTO;
import cn.hippo4j.config.model.biz.notify.NotifyRespDTO;
import cn.hippo4j.config.service.biz.NotifyService;
import cn.hippo4j.config.toolkit.BeanUtil;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.google.common.collect.Lists;
import lombok.AllArgsConstructor;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* .
*
* @author chen.ma
* @date 2021/11/17 22:02
*/
@Service
@AllArgsConstructor
public class NotifyServiceImpl implements NotifyService {
private final NotifyInfoMapper notifyInfoMapper;
@Override
public List<NotifyListRespDTO> listNotifyConfig(NotifyQueryReqDTO reqDTO) {
List<NotifyListRespDTO> notifyListRespList = Lists.newArrayList();
reqDTO.getGroupKeys().forEach(each -> {
String[] parseKey = GroupKey.parseKey(each);
List<NotifyInfo> notifyInfos = listNotifyCommon("CONFIG", parseKey);
if (CollUtil.isNotEmpty(notifyInfos)) {
notifyListRespList.add(new NotifyListRespDTO(StrUtil.builder(parseKey[0], "+", "CONFIG").toString(), notifyInfos));
}
List<NotifyInfo> alarmInfos = listNotifyCommon("ALARM", parseKey);
if (CollUtil.isNotEmpty(alarmInfos)) {
notifyListRespList.add(new NotifyListRespDTO(StrUtil.builder(parseKey[0], "+", "ALARM").toString(), alarmInfos));
}
});
return notifyListRespList;
}
@Override
public IPage<NotifyRespDTO> queryPage(NotifyQueryReqDTO reqDTO) {
LambdaQueryWrapper<NotifyInfo> queryWrapper = Wrappers.lambdaQuery(NotifyInfo.class)
.eq(StrUtil.isNotBlank(reqDTO.getTenantId()), NotifyInfo::getTenantId, reqDTO.getTenantId())
.eq(StrUtil.isNotBlank(reqDTO.getItemId()), NotifyInfo::getItemId, reqDTO.getItemId())
.eq(StrUtil.isNotBlank(reqDTO.getTpId()), NotifyInfo::getTpId, reqDTO.getTpId());
IPage<NotifyInfo> resultPage = notifyInfoMapper.selectPage(reqDTO, queryWrapper);
return resultPage.convert(each -> BeanUtil.convert(each, NotifyRespDTO.class));
}
@Override
public void save(NotifyReqDTO reqDTO) {
notifyInfoMapper.insert(BeanUtil.convert(reqDTO, NotifyInfo.class));
}
@Override
public void update(NotifyReqDTO reqDTO) {
NotifyInfo notifyInfo = BeanUtil.convert(reqDTO, NotifyInfo.class);
LambdaUpdateWrapper<NotifyInfo> updateWrapper = Wrappers.lambdaUpdate(NotifyInfo.class)
.eq(NotifyInfo::getTenantId, reqDTO.getTenantId())
.eq(NotifyInfo::getItemId, reqDTO.getItemId())
.eq(NotifyInfo::getTpId, reqDTO.getTpId());
notifyInfoMapper.update(notifyInfo, updateWrapper);
}
@Override
public void delete(NotifyReqDTO reqDTO) {
LambdaUpdateWrapper<NotifyInfo> updateWrapper = Wrappers.lambdaUpdate(NotifyInfo.class)
.eq(NotifyInfo::getTenantId, reqDTO.getTenantId())
.eq(NotifyInfo::getItemId, reqDTO.getItemId())
.eq(NotifyInfo::getTpId, reqDTO.getTpId());
notifyInfoMapper.delete(updateWrapper);
}
private List<NotifyInfo> listNotifyCommon(String type, String[] parseKey) {
LambdaQueryWrapper<NotifyInfo> queryWrapper = Wrappers.lambdaQuery(NotifyInfo.class)
.eq(NotifyInfo::getTenantId, parseKey[2])
.eq(NotifyInfo::getItemId, parseKey[1])
.eq(NotifyInfo::getTpId, parseKey[0])
.eq(NotifyInfo::getEnable, DelEnum.NORMAL)
.eq(NotifyInfo::getType, type);
List<NotifyInfo> notifyInfos = notifyInfoMapper.selectList(queryWrapper);
return notifyInfos;
}
}

@ -1,36 +0,0 @@
package cn.hippo4j.console.controller;
import cn.hippo4j.common.constant.Constants;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.common.web.base.Results;
import cn.hippo4j.config.model.biz.alarm.AlarmListRespDTO;
import cn.hippo4j.config.model.biz.alarm.AlarmQueryReqDTO;
import cn.hippo4j.config.service.biz.AlarmService;
import lombok.AllArgsConstructor;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
/**
* .
*
* @author chen.ma
* @date 2021/11/17 22:00
*/
@RestController
@AllArgsConstructor
@RequestMapping(Constants.BASE_PATH + "/alarm")
public class AlarmController {
private final AlarmService alarmService;
@PostMapping("/list/config")
public Result<List<AlarmListRespDTO>> listAlarmConfig(@RequestBody AlarmQueryReqDTO reqDTO) {
List<AlarmListRespDTO> resultData = alarmService.listAlarmConfig(reqDTO);
return Results.success(resultData);
}
}

@ -0,0 +1,60 @@
package cn.hippo4j.console.controller;
import cn.hippo4j.common.constant.Constants;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.common.web.base.Results;
import cn.hippo4j.config.model.biz.notify.NotifyListRespDTO;
import cn.hippo4j.config.model.biz.notify.NotifyQueryReqDTO;
import cn.hippo4j.config.model.biz.notify.NotifyReqDTO;
import cn.hippo4j.config.model.biz.notify.NotifyRespDTO;
import cn.hippo4j.config.service.biz.NotifyService;
import com.baomidou.mybatisplus.core.metadata.IPage;
import lombok.AllArgsConstructor;
import org.springframework.web.bind.annotation.*;
import java.util.List;
/**
* .
*
* @author chen.ma
* @date 2021/11/17 22:00
*/
@RestController
@AllArgsConstructor
@RequestMapping(Constants.BASE_PATH + "/notify")
public class NotifyController {
private final NotifyService notifyService;
@PostMapping("/list/config")
public Result<List<NotifyListRespDTO>> listNotifyConfig(@RequestBody NotifyQueryReqDTO reqDTO) {
List<NotifyListRespDTO> resultData = notifyService.listNotifyConfig(reqDTO);
return Results.success(resultData);
}
@PostMapping("/query/page")
public Result<IPage<NotifyRespDTO>> queryPage(@RequestBody NotifyQueryReqDTO reqDTO) {
IPage<NotifyRespDTO> resultPage = notifyService.queryPage(reqDTO);
return Results.success(resultPage);
}
@PostMapping("/save")
public Result<Void> saveNotifyConfig(@RequestBody NotifyReqDTO reqDTO) {
notifyService.save(reqDTO);
return Results.success();
}
@PostMapping("/update")
public Result<Void> updateNotifyConfig(@RequestBody NotifyReqDTO reqDTO) {
notifyService.update(reqDTO);
return Results.success();
}
@DeleteMapping("/remove")
public Result<Void> removeNotifyConfig(@RequestBody NotifyReqDTO reqDTO) {
notifyService.delete(reqDTO);
return Results.success();
}
}

@ -1,5 +1,6 @@
package cn.hippo4j.starter.alarm; package cn.hippo4j.starter.alarm;
import cn.hutool.core.util.StrUtil;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;
@ -18,6 +19,11 @@ public class AlarmControlDTO {
*/ */
private String threadPool; private String threadPool;
/**
*
*/
private String platform;
/** /**
* *
*/ */
@ -29,7 +35,7 @@ public class AlarmControlDTO {
* @return * @return
*/ */
public String buildPk() { public String buildPk() {
return threadPool + "_" + typeEnum; return StrUtil.builder(threadPool, "+", platform).toString();
} }
} }

@ -2,10 +2,11 @@ package cn.hippo4j.starter.alarm;
import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.google.common.cache.Cache; import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder; import com.google.common.collect.Maps;
import java.util.concurrent.TimeUnit; import java.util.Map;
/** /**
* . * .
@ -15,13 +16,7 @@ import java.util.concurrent.TimeUnit;
*/ */
public class AlarmControlHandler { public class AlarmControlHandler {
private final Cache<String, String> cache; public static Map<String, Cache<String, String>> THREAD_POOL_ALARM_CACHE = Maps.newConcurrentMap();
public AlarmControlHandler(long alarmInterval) {
cache = CacheBuilder.newBuilder()
.expireAfterWrite(alarmInterval, TimeUnit.MINUTES)
.build();
}
/** /**
* . * .
@ -29,13 +24,17 @@ public class AlarmControlHandler {
* @param alarmControl * @param alarmControl
* @return * @return
*/ */
public boolean isSend(AlarmControlDTO alarmControl) { public boolean isSendAlarm(AlarmControlDTO alarmControl) {
String pkId = cache.getIfPresent(alarmControl.buildPk()); Cache<String, String> cache = THREAD_POOL_ALARM_CACHE.get(alarmControl.buildPk());
if (cache != null) {
String pkId = cache.getIfPresent(alarmControl.getTypeEnum().name());
if (StrUtil.isBlank(pkId)) { if (StrUtil.isBlank(pkId)) {
// val 无意义 // val 无意义
cache.put(alarmControl.buildPk(), IdUtil.simpleUUID()); cache.put(alarmControl.getTypeEnum().name(), IdUtil.simpleUUID());
return true; return true;
} else {
System.out.println(JSON.toJSONString(alarmControl));
}
} }
return false; return false;

@ -9,7 +9,10 @@ import cn.hippo4j.starter.core.DynamicThreadPoolExecutor;
import cn.hippo4j.starter.core.GlobalThreadPoolManage; import cn.hippo4j.starter.core.GlobalThreadPoolManage;
import cn.hippo4j.starter.remote.HttpAgent; import cn.hippo4j.starter.remote.HttpAgent;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
@ -21,6 +24,7 @@ import org.springframework.beans.factory.InitializingBean;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit;
/** /**
* Base send message service. * Base send message service.
@ -38,14 +42,17 @@ public class BaseSendMessageService implements InitializingBean, SendMessageServ
@NonNull @NonNull
private final BootstrapProperties properties; private final BootstrapProperties properties;
private final static Map<String, List<AlarmNotifyDTO>> ALARM_NOTIFY_CONFIG = Maps.newHashMap(); @NonNull
private final AlarmControlHandler alarmControlHandler;
public final static Map<String, List<NotifyDTO>> ALARM_NOTIFY_CONFIG = Maps.newHashMap();
private final Map<String, SendMessageHandler> sendMessageHandlers = Maps.newHashMap(); private final Map<String, SendMessageHandler> sendMessageHandlers = Maps.newHashMap();
@Override @Override
public void sendAlarmMessage(DynamicThreadPoolExecutor threadPoolExecutor) { public void sendAlarmMessage(MessageTypeEnum typeEnum, DynamicThreadPoolExecutor executor) {
List<AlarmNotifyDTO> notifyList = ALARM_NOTIFY_CONFIG.get(threadPoolExecutor.getThreadPoolId()); String buildKey = StrUtil.builder(executor.getThreadPoolId(), "+", "ALARM").toString();
List<NotifyDTO> notifyList = ALARM_NOTIFY_CONFIG.get(buildKey);
if (CollUtil.isEmpty(notifyList)) { if (CollUtil.isEmpty(notifyList)) {
log.warn("Please configure alarm notification on the server."); log.warn("Please configure alarm notification on the server.");
return; return;
@ -59,7 +66,9 @@ public class BaseSendMessageService implements InitializingBean, SendMessageServ
return; return;
} }
messageHandler.sendAlarmMessage(each, threadPoolExecutor); if (isSendAlarm(each.getTpId(), each.setTypeEnum(typeEnum))) {
messageHandler.sendAlarmMessage(each, executor);
}
} catch (Exception ex) { } catch (Exception ex) {
log.warn("Failed to send thread pool alarm notification.", ex); log.warn("Failed to send thread pool alarm notification.", ex);
} }
@ -68,7 +77,8 @@ public class BaseSendMessageService implements InitializingBean, SendMessageServ
@Override @Override
public void sendChangeMessage(PoolParameterInfo parameter) { public void sendChangeMessage(PoolParameterInfo parameter) {
List<AlarmNotifyDTO> notifyList = ALARM_NOTIFY_CONFIG.get(parameter.getTpId()); String buildKey = StrUtil.builder(parameter.getTpId(), "+", "CONFIG").toString();
List<NotifyDTO> notifyList = ALARM_NOTIFY_CONFIG.get(buildKey);
if (CollUtil.isEmpty(notifyList)) { if (CollUtil.isEmpty(notifyList)) {
log.warn("Please configure alarm notification on the server."); log.warn("Please configure alarm notification on the server.");
return; return;
@ -109,22 +119,41 @@ public class BaseSendMessageService implements InitializingBean, SendMessageServ
Result result = null; Result result = null;
try { try {
result = httpAgent.httpPostByDiscovery("/v1/cs/alarm/list/config", new ThreadPoolAlarmReqDTO(groupKeys)); result = httpAgent.httpPostByDiscovery("/v1/cs/notify/list/config", new ThreadPoolNotifyReqDTO(groupKeys));
} catch (Throwable ex) { } catch (Throwable ex) {
log.error("Get dynamic thread pool alarm configuration error.", ex); log.error("Get dynamic thread pool notify configuration error.", ex);
throw ex;
} }
if (result.isSuccess() || result.getData() != null) { if (result != null && result.isSuccess() && result.getData() != null) {
String resultDataStr = JSON.toJSONString(result.getData()); String resultDataStr = JSON.toJSONString(result.getData());
List<ThreadPoolAlarmNotify> resultData = JSON.parseArray(resultDataStr, ThreadPoolAlarmNotify.class); List<ThreadPoolNotify> resultData = JSON.parseArray(resultDataStr, ThreadPoolNotify.class);
resultData.forEach(each -> ALARM_NOTIFY_CONFIG.put(each.getThreadPoolId(), each.getAlarmNotifyList())); resultData.forEach(each -> ALARM_NOTIFY_CONFIG.put(each.getNotifyKey(), each.getNotifyList()));
ALARM_NOTIFY_CONFIG.forEach((key, val) ->
val.stream().filter(each -> StrUtil.equals("ALARM", each.getType()))
.forEach(each -> {
Cache<String, String> cache = CacheBuilder.newBuilder()
.expireAfterWrite(each.getInterval(), TimeUnit.MINUTES)
.build();
AlarmControlHandler.THREAD_POOL_ALARM_CACHE.put(StrUtil.builder(each.getTpId(), "+", each.getPlatform()).toString(), cache);
})
);
}
} }
private boolean isSendAlarm(String threadPoolId, NotifyDTO notifyInfo) {
AlarmControlDTO alarmControl = AlarmControlDTO.builder()
.threadPool(threadPoolId)
.platform(notifyInfo.getPlatform())
.typeEnum(notifyInfo.getTypeEnum())
.build();
return alarmControlHandler.isSendAlarm(alarmControl);
} }
@Data @Data
@AllArgsConstructor @AllArgsConstructor
static class ThreadPoolAlarmReqDTO { static class ThreadPoolNotifyReqDTO {
/** /**
* groupKeys * groupKeys
@ -134,17 +163,17 @@ public class BaseSendMessageService implements InitializingBean, SendMessageServ
} }
@Data @Data
static class ThreadPoolAlarmNotify { static class ThreadPoolNotify {
/** /**
* 线ID * Key
*/ */
private String threadPoolId; private String notifyKey;
/** /**
* *
*/ */
private List<AlarmNotifyDTO> alarmNotifyList; private List<NotifyDTO> notifyList;
} }

@ -37,20 +37,20 @@ public class DingSendMessageHandler implements SendMessageHandler {
@Override @Override
public String getType() { public String getType() {
return SendMessageEnum.DING.name(); return NotifyPlatformEnum.DING.name();
} }
@Override @Override
public void sendAlarmMessage(AlarmNotifyDTO notifyConfig, DynamicThreadPoolExecutor pool) { public void sendAlarmMessage(NotifyDTO notifyConfig, DynamicThreadPoolExecutor pool) {
dingAlarmSendMessage(notifyConfig, pool); dingAlarmSendMessage(notifyConfig, pool);
} }
@Override @Override
public void sendChangeMessage(AlarmNotifyDTO notifyConfig, PoolParameterInfo parameter) { public void sendChangeMessage(NotifyDTO notifyConfig, PoolParameterInfo parameter) {
dingChangeSendMessage(notifyConfig, parameter); dingChangeSendMessage(notifyConfig, parameter);
} }
private void dingAlarmSendMessage(AlarmNotifyDTO notifyConfig, DynamicThreadPoolExecutor pool) { private void dingAlarmSendMessage(NotifyDTO notifyConfig, DynamicThreadPoolExecutor pool) {
List<String> receives = StrUtil.split(notifyConfig.getReceives(), ','); List<String> receives = StrUtil.split(notifyConfig.getReceives(), ',');
String afterReceives = Joiner.on(", @").join(receives); String afterReceives = Joiner.on(", @").join(receives);
@ -61,6 +61,7 @@ public class DingSendMessageHandler implements SendMessageHandler {
"<font color='#708090' size=2>线程池ID%s</font> \n\n " + "<font color='#708090' size=2>线程池ID%s</font> \n\n " +
"<font color='#708090' size=2>应用名称:%s</font> \n\n " + "<font color='#708090' size=2>应用名称:%s</font> \n\n " +
"<font color='#778899' size=2>应用实例:%s</font> \n\n " + "<font color='#778899' size=2>应用实例:%s</font> \n\n " +
"<font color='#778899' size=2>报警类型:%s</font> \n\n " +
" --- \n\n " + " --- \n\n " +
"<font color='#708090' size=2>核心线程数:%d</font> \n\n " + "<font color='#708090' size=2>核心线程数:%d</font> \n\n " +
"<font color='#708090' size=2>最大线程数:%d</font> \n\n " + "<font color='#708090' size=2>最大线程数:%d</font> \n\n " +
@ -89,6 +90,8 @@ public class DingSendMessageHandler implements SendMessageHandler {
instanceInfo.getAppName(), instanceInfo.getAppName(),
// 实例信息 // 实例信息
instanceInfo.getIdentify(), instanceInfo.getIdentify(),
// 报警类型
notifyConfig.getTypeEnum(),
// 核心线程数 // 核心线程数
pool.getCorePoolSize(), pool.getCorePoolSize(),
// 最大线程数 // 最大线程数
@ -124,7 +127,7 @@ public class DingSendMessageHandler implements SendMessageHandler {
execute(notifyConfig, "动态线程池告警", text, receives); execute(notifyConfig, "动态线程池告警", text, receives);
} }
private void dingChangeSendMessage(AlarmNotifyDTO notifyConfig, PoolParameterInfo parameter) { private void dingChangeSendMessage(NotifyDTO notifyConfig, PoolParameterInfo parameter) {
String threadPoolId = parameter.getTpId(); String threadPoolId = parameter.getTpId();
DynamicThreadPoolWrapper poolWrap = GlobalThreadPoolManage.getExecutorService(threadPoolId); DynamicThreadPoolWrapper poolWrap = GlobalThreadPoolManage.getExecutorService(threadPoolId);
if (poolWrap == null) { if (poolWrap == null) {
@ -190,7 +193,7 @@ public class DingSendMessageHandler implements SendMessageHandler {
execute(notifyConfig, "动态线程池通知", text, receives); execute(notifyConfig, "动态线程池通知", text, receives);
} }
private void execute(AlarmNotifyDTO notifyConfig, String title, String text, List<String> mobiles) { private void execute(NotifyDTO notifyConfig, String title, String text, List<String> mobiles) {
String url = "https://oapi.dingtalk.com/robot/send?access_token="; String url = "https://oapi.dingtalk.com/robot/send?access_token=";
String serverUrl = url + notifyConfig.getSecretKey(); String serverUrl = url + notifyConfig.getSecretKey();
DingTalkClient dingTalkClient = new DefaultDingTalkClient(serverUrl); DingTalkClient dingTalkClient = new DefaultDingTalkClient(serverUrl);

@ -1,15 +1,17 @@
package cn.hippo4j.starter.alarm; package cn.hippo4j.starter.alarm;
import lombok.Data; import lombok.Data;
import lombok.experimental.Accessors;
/** /**
* . * .
* *
* @author chen.ma * @author chen.ma
* @date 2021/11/17 22:12 * @date 2021/11/17 22:12
*/ */
@Data @Data
public class AlarmNotifyDTO { @Accessors(chain = true)
public class NotifyDTO {
/** /**
* id * id
@ -31,6 +33,11 @@ public class AlarmNotifyDTO {
*/ */
private String platform; private String platform;
/**
*
*/
private String type;
/** /**
* *
*/ */
@ -46,4 +53,9 @@ public class AlarmNotifyDTO {
*/ */
private String receives; private String receives;
/**
*
*/
private MessageTypeEnum typeEnum;
} }

@ -1,12 +1,12 @@
package cn.hippo4j.starter.alarm; package cn.hippo4j.starter.alarm;
/** /**
* Send message enum. * Notify platform enum.
* *
* @author chen.ma * @author chen.ma
* @date 2021/8/15 15:50 * @date 2021/8/15 15:50
*/ */
public enum SendMessageEnum { public enum NotifyPlatformEnum {
DING DING

@ -24,7 +24,7 @@ public interface SendMessageHandler {
* @param notifyConfig * @param notifyConfig
* @param threadPoolExecutor * @param threadPoolExecutor
*/ */
void sendAlarmMessage(AlarmNotifyDTO notifyConfig, DynamicThreadPoolExecutor threadPoolExecutor); void sendAlarmMessage(NotifyDTO notifyConfig, DynamicThreadPoolExecutor threadPoolExecutor);
/** /**
* Send change message. * Send change message.
@ -32,6 +32,6 @@ public interface SendMessageHandler {
* @param notifyConfig * @param notifyConfig
* @param parameter * @param parameter
*/ */
void sendChangeMessage(AlarmNotifyDTO notifyConfig, PoolParameterInfo parameter); void sendChangeMessage(NotifyDTO notifyConfig, PoolParameterInfo parameter);
} }

@ -14,9 +14,10 @@ public interface SendMessageService {
/** /**
* Send alarm message. * Send alarm message.
* *
* @param typeEnum
* @param threadPoolExecutor * @param threadPoolExecutor
*/ */
void sendAlarmMessage(DynamicThreadPoolExecutor threadPoolExecutor); void sendAlarmMessage(MessageTypeEnum typeEnum, DynamicThreadPoolExecutor threadPoolExecutor);
/** /**
* Send change message. * Send change message.

@ -5,10 +5,10 @@ import cn.hippo4j.common.model.PoolParameterInfo;
import cn.hippo4j.starter.config.MessageAlarmConfig; import cn.hippo4j.starter.config.MessageAlarmConfig;
import cn.hippo4j.starter.core.DynamicThreadPoolExecutor; import cn.hippo4j.starter.core.DynamicThreadPoolExecutor;
import cn.hippo4j.starter.toolkit.CalculateUtil; import cn.hippo4j.starter.toolkit.CalculateUtil;
import cn.hippo4j.starter.toolkit.thread.ResizableCapacityLinkedBlockIngQueue;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.BlockingQueue;
/** /**
* Alarm manage. * Alarm manage.
@ -24,19 +24,11 @@ public class ThreadPoolAlarmManage {
*/ */
private static final SendMessageService SEND_MESSAGE_SERVICE; private static final SendMessageService SEND_MESSAGE_SERVICE;
/**
*
*/
private static final AlarmControlHandler ALARM_CONTROL_HANDLER;
static { static {
SEND_MESSAGE_SERVICE = Optional.ofNullable(ApplicationContextHolder.getInstance()) SEND_MESSAGE_SERVICE = Optional.ofNullable(ApplicationContextHolder.getInstance())
.map(each -> each.getBean(MessageAlarmConfig.SEND_MESSAGE_BEAN_NAME, SendMessageService.class)) .map(each -> each.getBean(MessageAlarmConfig.SEND_MESSAGE_BEAN_NAME, SendMessageService.class))
.orElse(null); .orElse(null);
System.out.println();
ALARM_CONTROL_HANDLER = Optional.ofNullable(ApplicationContextHolder.getInstance())
.map(each -> each.getBean(AlarmControlHandler.class))
.orElse(null);
} }
/** /**
@ -49,16 +41,16 @@ public class ThreadPoolAlarmManage {
return; return;
} }
ThreadPoolAlarm threadPoolAlarm = threadPoolExecutor.getThreadPoolAlarm(); ThreadPoolAlarm threadPoolAlarm = threadPoolExecutor.getThreadPoolAlarm();
ResizableCapacityLinkedBlockIngQueue blockIngQueue = BlockingQueue blockIngQueue = threadPoolExecutor.getQueue();
(ResizableCapacityLinkedBlockIngQueue) threadPoolExecutor.getQueue();
int queueSize = blockIngQueue.size(); int queueSize = blockIngQueue.size();
int capacity = queueSize + blockIngQueue.remainingCapacity(); int capacity = queueSize + blockIngQueue.remainingCapacity();
int divide = CalculateUtil.divide(queueSize, capacity); int divide = CalculateUtil.divide(queueSize, capacity);
boolean isSend = divide > threadPoolAlarm.getCapacityAlarm() boolean isSend = threadPoolAlarm.getIsAlarm()
&& divide > threadPoolAlarm.getCapacityAlarm()
&& isSendMessage(threadPoolExecutor, MessageTypeEnum.CAPACITY); && isSendMessage(threadPoolExecutor, MessageTypeEnum.CAPACITY);
if (isSend) { if (isSend) {
SEND_MESSAGE_SERVICE.sendAlarmMessage(threadPoolExecutor); SEND_MESSAGE_SERVICE.sendAlarmMessage(MessageTypeEnum.CAPACITY, threadPoolExecutor);
} }
} }
@ -76,10 +68,13 @@ public class ThreadPoolAlarmManage {
int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize(); int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
int divide = CalculateUtil.divide(activeCount, maximumPoolSize); int divide = CalculateUtil.divide(activeCount, maximumPoolSize);
boolean isSend = divide > threadPoolExecutor.getThreadPoolAlarm().getLivenessAlarm() ThreadPoolAlarm threadPoolAlarm = threadPoolExecutor.getThreadPoolAlarm();
boolean isSend = threadPoolAlarm.getIsAlarm()
&& divide > threadPoolAlarm.getLivenessAlarm()
&& isSendMessage(threadPoolExecutor, MessageTypeEnum.CAPACITY); && isSendMessage(threadPoolExecutor, MessageTypeEnum.CAPACITY);
if (isSend) { if (isSend) {
SEND_MESSAGE_SERVICE.sendAlarmMessage(threadPoolExecutor); SEND_MESSAGE_SERVICE.sendAlarmMessage(MessageTypeEnum.CAPACITY, threadPoolExecutor);
} }
} }
@ -93,8 +88,9 @@ public class ThreadPoolAlarmManage {
return; return;
} }
if (isSendMessage(threadPoolExecutor, MessageTypeEnum.REJECT)) { ThreadPoolAlarm threadPoolAlarm = threadPoolExecutor.getThreadPoolAlarm();
SEND_MESSAGE_SERVICE.sendAlarmMessage(threadPoolExecutor); if (threadPoolAlarm.getIsAlarm() && isSendMessage(threadPoolExecutor, MessageTypeEnum.REJECT)) {
SEND_MESSAGE_SERVICE.sendAlarmMessage(MessageTypeEnum.REJECT, threadPoolExecutor);
} }
} }
@ -119,11 +115,8 @@ public class ThreadPoolAlarmManage {
* @return * @return
*/ */
private static boolean isSendMessage(DynamicThreadPoolExecutor threadPoolExecutor, MessageTypeEnum typeEnum) { private static boolean isSendMessage(DynamicThreadPoolExecutor threadPoolExecutor, MessageTypeEnum typeEnum) {
AlarmControlDTO alarmControl = AlarmControlDTO.builder() // ignore
.threadPool(threadPoolExecutor.getThreadPoolId()) return true;
.typeEnum(typeEnum)
.build();
return ALARM_CONTROL_HANDLER.isSend(alarmControl);
} }
} }

@ -28,8 +28,8 @@ public class MessageAlarmConfig {
@DependsOn("applicationContextHolder") @DependsOn("applicationContextHolder")
@Bean(MessageAlarmConfig.SEND_MESSAGE_BEAN_NAME) @Bean(MessageAlarmConfig.SEND_MESSAGE_BEAN_NAME)
public SendMessageService sendMessageService(HttpAgent httpAgent) { public SendMessageService sendMessageService(HttpAgent httpAgent, AlarmControlHandler alarmControlHandler) {
return new BaseSendMessageService(httpAgent, properties); return new BaseSendMessageService(httpAgent, properties, alarmControlHandler);
} }
@Bean @Bean
@ -40,8 +40,7 @@ public class MessageAlarmConfig {
@Bean @Bean
public AlarmControlHandler alarmControlHandler() { public AlarmControlHandler alarmControlHandler() {
Long alarmInterval = properties.getAlarmInterval(); return new AlarmControlHandler();
return new AlarmControlHandler(alarmInterval);
} }
} }

Loading…
Cancel
Save