重构 Hippo4J Starter 报警通知模块.

pull/10/head
chen.ma 3 years ago
parent 76d9a6b1be
commit 60273f1cfa

@ -0,0 +1,49 @@
package cn.hippo4j.starter.alarm;
import lombok.Data;
/**
* .
*
* @author chen.ma
* @date 2021/11/17 22:12
*/
@Data
public class AlarmNotifyDTO {
/**
* id
*/
private String tenantId;
/**
* id
*/
private String itemId;
/**
* 线id
*/
private String tpId;
/**
*
*/
private String platform;
/**
*
*/
private String secretKey;
/**
*
*/
private Integer interval;
/**
*
*/
private String receives;
}

@ -2,13 +2,23 @@ package cn.hippo4j.starter.alarm;
import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.model.PoolParameterInfo; import cn.hippo4j.common.model.PoolParameterInfo;
import cn.hippo4j.common.toolkit.GroupKey;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.starter.config.BootstrapProperties;
import cn.hippo4j.starter.core.DynamicThreadPoolExecutor; import cn.hippo4j.starter.core.DynamicThreadPoolExecutor;
import cn.hippo4j.starter.core.GlobalThreadPoolManage;
import cn.hippo4j.starter.remote.HttpAgent;
import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NonNull; import lombok.NonNull;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -23,37 +33,119 @@ import java.util.Map;
public class BaseSendMessageService implements InitializingBean, SendMessageService { public class BaseSendMessageService implements InitializingBean, SendMessageService {
@NonNull @NonNull
private final List<NotifyConfig> notifyConfigs; private final HttpAgent httpAgent;
private final List<SendMessageHandler> sendMessageHandlers = new ArrayList(4); @NonNull
private final BootstrapProperties properties;
private final static Map<String, List<AlarmNotifyDTO>> ALARM_NOTIFY_CONFIG = Maps.newHashMap();
private final Map<String, SendMessageHandler> sendMessageHandlers = Maps.newHashMap();
@Override @Override
public void sendAlarmMessage(DynamicThreadPoolExecutor threadPoolExecutor) { public void sendAlarmMessage(DynamicThreadPoolExecutor threadPoolExecutor) {
for (SendMessageHandler messageHandler : sendMessageHandlers) { List<AlarmNotifyDTO> notifyList = ALARM_NOTIFY_CONFIG.get(threadPoolExecutor.getThreadPoolId());
if (CollUtil.isEmpty(notifyList)) {
log.warn("Please configure alarm notification on the server.");
return;
}
notifyList.forEach(each -> {
try { try {
messageHandler.sendAlarmMessage(notifyConfigs, threadPoolExecutor); SendMessageHandler messageHandler = sendMessageHandlers.get(each.getPlatform());
if (messageHandler == null) {
log.warn("Please configure alarm notification on the server.");
return;
}
messageHandler.sendAlarmMessage(each, threadPoolExecutor);
} catch (Exception ex) { } catch (Exception ex) {
log.warn("Failed to send thread pool alarm notification.", ex); log.warn("Failed to send thread pool alarm notification.", ex);
} }
} });
} }
@Override @Override
public void sendChangeMessage(PoolParameterInfo parameter) { public void sendChangeMessage(PoolParameterInfo parameter) {
for (SendMessageHandler messageHandler : sendMessageHandlers) { List<AlarmNotifyDTO> notifyList = ALARM_NOTIFY_CONFIG.get(parameter.getTpId());
if (CollUtil.isEmpty(notifyList)) {
log.warn("Please configure alarm notification on the server.");
return;
}
notifyList.forEach(each -> {
try { try {
messageHandler.sendChangeMessage(notifyConfigs, parameter); SendMessageHandler messageHandler = sendMessageHandlers.get(each.getPlatform());
if (messageHandler == null) {
log.warn("Please configure alarm notification on the server.");
return;
}
messageHandler.sendChangeMessage(each, parameter);
} catch (Exception ex) { } catch (Exception ex) {
log.warn("Failed to send thread pool change notification.", ex); log.warn("Failed to send thread pool change notification.", ex);
} }
} });
} }
@Override @Override
public void afterPropertiesSet() { public void afterPropertiesSet() {
Map<String, SendMessageHandler> sendMessageHandlerMap = Map<String, SendMessageHandler> sendMessageHandlerMap =
ApplicationContextHolder.getBeansOfType(SendMessageHandler.class); ApplicationContextHolder.getBeansOfType(SendMessageHandler.class);
sendMessageHandlerMap.values().forEach(each -> sendMessageHandlers.add(each)); sendMessageHandlerMap.values().forEach(each -> sendMessageHandlers.put(each.getType(), each));
List<String> threadPoolIds = GlobalThreadPoolManage.listThreadPoolId();
if (CollUtil.isEmpty(threadPoolIds)) {
log.warn("The client does not have a dynamic thread pool instance configured.");
return;
}
List<String> groupKeys = Lists.newArrayList();
threadPoolIds.forEach(each -> {
String groupKey = GroupKey.getKeyTenant(each, properties.getItemId(), properties.getNamespace());
groupKeys.add(groupKey);
});
Result result = null;
try {
result = httpAgent.httpPostByDiscovery("/v1/cs/alarm/list/config", new ThreadPoolAlarmReqDTO(groupKeys));
} catch (Throwable ex) {
log.error("Get dynamic thread pool alarm configuration error.", ex);
throw ex;
}
if (result.isSuccess() || result.getData() != null) {
String resultDataStr = JSON.toJSONString(result.getData());
List<ThreadPoolAlarmNotify> resultData = JSON.parseArray(resultDataStr, ThreadPoolAlarmNotify.class);
resultData.forEach(each -> ALARM_NOTIFY_CONFIG.put(each.getThreadPoolId(), each.getAlarmNotifyList()));
}
}
@Data
@AllArgsConstructor
static class ThreadPoolAlarmReqDTO {
/**
* groupKeys
*/
private List<String> groupKeys;
}
@Data
static class ThreadPoolAlarmNotify {
/**
* 线ID
*/
private String threadPoolId;
/**
*
*/
private List<AlarmNotifyDTO> alarmNotifyList;
} }
} }

@ -1,25 +1,23 @@
package cn.hippo4j.starter.alarm; package cn.hippo4j.starter.alarm;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import com.dingtalk.api.DefaultDingTalkClient;
import com.dingtalk.api.DingTalkClient;
import com.dingtalk.api.request.OapiRobotSendRequest;
import cn.hippo4j.common.model.InstanceInfo; import cn.hippo4j.common.model.InstanceInfo;
import cn.hippo4j.common.model.PoolParameterInfo; import cn.hippo4j.common.model.PoolParameterInfo;
import cn.hippo4j.starter.core.GlobalThreadPoolManage;
import cn.hippo4j.starter.core.DynamicThreadPoolExecutor; import cn.hippo4j.starter.core.DynamicThreadPoolExecutor;
import cn.hippo4j.starter.core.GlobalThreadPoolManage;
import cn.hippo4j.starter.toolkit.thread.QueueTypeEnum; import cn.hippo4j.starter.toolkit.thread.QueueTypeEnum;
import cn.hippo4j.starter.toolkit.thread.RejectedTypeEnum; import cn.hippo4j.starter.toolkit.thread.RejectedTypeEnum;
import cn.hippo4j.starter.wrapper.DynamicThreadPoolWrapper; import cn.hippo4j.starter.wrapper.DynamicThreadPoolWrapper;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import com.dingtalk.api.DefaultDingTalkClient;
import com.dingtalk.api.DingTalkClient;
import com.dingtalk.api.request.OapiRobotSendRequest;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.taobao.api.ApiException; import com.taobao.api.ApiException;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.List; import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -35,8 +33,6 @@ public class DingSendMessageHandler implements SendMessageHandler {
private String active; private String active;
private Long alarmInterval;
private InstanceInfo instanceInfo; private InstanceInfo instanceInfo;
@Override @Override
@ -45,22 +41,16 @@ public class DingSendMessageHandler implements SendMessageHandler {
} }
@Override @Override
public void sendAlarmMessage(List<NotifyConfig> notifyConfigs, DynamicThreadPoolExecutor pool) { public void sendAlarmMessage(AlarmNotifyDTO notifyConfig, DynamicThreadPoolExecutor pool) {
Optional<NotifyConfig> notifyConfigOptional = notifyConfigs.stream() dingAlarmSendMessage(notifyConfig, pool);
.filter(each -> Objects.equals(each.getType(), getType()))
.findFirst();
notifyConfigOptional.ifPresent(each -> dingAlarmSendMessage(each, pool));
} }
@Override @Override
public void sendChangeMessage(List<NotifyConfig> notifyConfigs, PoolParameterInfo parameter) { public void sendChangeMessage(AlarmNotifyDTO notifyConfig, PoolParameterInfo parameter) {
Optional<NotifyConfig> changeConfigOptional = notifyConfigs.stream() dingChangeSendMessage(notifyConfig, parameter);
.filter(each -> Objects.equals(each.getType(), getType()))
.findFirst();
changeConfigOptional.ifPresent(each -> dingChangeSendMessage(each, parameter));
} }
private void dingAlarmSendMessage(NotifyConfig notifyConfig, DynamicThreadPoolExecutor pool) { private void dingAlarmSendMessage(AlarmNotifyDTO notifyConfig, DynamicThreadPoolExecutor pool) {
List<String> receives = StrUtil.split(notifyConfig.getReceives(), ','); List<String> receives = StrUtil.split(notifyConfig.getReceives(), ',');
String afterReceives = Joiner.on(", @").join(receives); String afterReceives = Joiner.on(", @").join(receives);
@ -126,7 +116,7 @@ public class DingSendMessageHandler implements SendMessageHandler {
// 告警手机号 // 告警手机号
afterReceives, afterReceives,
// 报警频率 // 报警频率
alarmInterval, notifyConfig.getInterval(),
// 当前时间 // 当前时间
DateUtil.now() DateUtil.now()
); );
@ -134,7 +124,7 @@ public class DingSendMessageHandler implements SendMessageHandler {
execute(notifyConfig, "动态线程池告警", text, receives); execute(notifyConfig, "动态线程池告警", text, receives);
} }
private void dingChangeSendMessage(NotifyConfig notifyConfig, PoolParameterInfo parameter) { private void dingChangeSendMessage(AlarmNotifyDTO notifyConfig, PoolParameterInfo parameter) {
String threadPoolId = parameter.getTpId(); String threadPoolId = parameter.getTpId();
DynamicThreadPoolWrapper poolWrap = GlobalThreadPoolManage.getExecutorService(threadPoolId); DynamicThreadPoolWrapper poolWrap = GlobalThreadPoolManage.getExecutorService(threadPoolId);
if (poolWrap == null) { if (poolWrap == null) {
@ -200,8 +190,9 @@ public class DingSendMessageHandler implements SendMessageHandler {
execute(notifyConfig, "动态线程池通知", text, receives); execute(notifyConfig, "动态线程池通知", text, receives);
} }
private void execute(NotifyConfig notifyConfigs, String title, String text, List<String> mobiles) { private void execute(AlarmNotifyDTO notifyConfig, String title, String text, List<String> mobiles) {
String serverUrl = notifyConfigs.getUrl() + notifyConfigs.getToken(); String url = "https://oapi.dingtalk.com/robot/send?access_token=";
String serverUrl = url + notifyConfig.getSecretKey();
DingTalkClient dingTalkClient = new DefaultDingTalkClient(serverUrl); DingTalkClient dingTalkClient = new DefaultDingTalkClient(serverUrl);
OapiRobotSendRequest request = new OapiRobotSendRequest(); OapiRobotSendRequest request = new OapiRobotSendRequest();
request.setMsgtype("markdown"); request.setMsgtype("markdown");

@ -3,8 +3,6 @@ package cn.hippo4j.starter.alarm;
import cn.hippo4j.common.model.PoolParameterInfo; import cn.hippo4j.common.model.PoolParameterInfo;
import cn.hippo4j.starter.core.DynamicThreadPoolExecutor; import cn.hippo4j.starter.core.DynamicThreadPoolExecutor;
import java.util.List;
/** /**
* Send message handler. * Send message handler.
* *
@ -23,17 +21,17 @@ public interface SendMessageHandler {
/** /**
* Send alarm message. * Send alarm message.
* *
* @param notifyConfigs * @param notifyConfig
* @param threadPoolExecutor * @param threadPoolExecutor
*/ */
void sendAlarmMessage(List<NotifyConfig> notifyConfigs, DynamicThreadPoolExecutor threadPoolExecutor); void sendAlarmMessage(AlarmNotifyDTO notifyConfig, DynamicThreadPoolExecutor threadPoolExecutor);
/** /**
* Send change message. * Send change message.
* *
* @param notifyConfigs * @param notifyConfig
* @param parameter * @param parameter
*/ */
void sendChangeMessage(List<NotifyConfig> notifyConfigs, PoolParameterInfo parameter); void sendChangeMessage(AlarmNotifyDTO notifyConfig, PoolParameterInfo parameter);
} }

@ -2,14 +2,13 @@ package cn.hippo4j.starter.config;
import cn.hippo4j.common.model.InstanceInfo; import cn.hippo4j.common.model.InstanceInfo;
import cn.hippo4j.starter.alarm.*; import cn.hippo4j.starter.alarm.*;
import cn.hippo4j.starter.remote.HttpAgent;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import org.apache.logging.log4j.util.Strings; import org.apache.logging.log4j.util.Strings;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.DependsOn; import org.springframework.context.annotation.DependsOn;
import org.springframework.core.env.ConfigurableEnvironment; import org.springframework.core.env.ConfigurableEnvironment;
import java.util.Optional;
/** /**
* Message alarm config. * Message alarm config.
* *
@ -29,15 +28,14 @@ public class MessageAlarmConfig {
@DependsOn("applicationContextHolder") @DependsOn("applicationContextHolder")
@Bean(MessageAlarmConfig.SEND_MESSAGE_BEAN_NAME) @Bean(MessageAlarmConfig.SEND_MESSAGE_BEAN_NAME)
public SendMessageService sendMessageService() { public SendMessageService sendMessageService(HttpAgent httpAgent) {
return new BaseSendMessageService(properties.getNotifys()); return new BaseSendMessageService(httpAgent, properties);
} }
@Bean @Bean
public SendMessageHandler dingSendMessageHandler() { public SendMessageHandler dingSendMessageHandler() {
String active = environment.getProperty("spring.profiles.active", Strings.EMPTY); String active = environment.getProperty("spring.profiles.active", Strings.EMPTY);
Long alarmInterval = Optional.ofNullable(properties.getAlarmInterval()).orElse(5L); return new DingSendMessageHandler(active, instanceInfo);
return new DingSendMessageHandler(active, alarmInterval, instanceInfo);
} }
@Bean @Bean

@ -1,8 +1,10 @@
package cn.hippo4j.starter.core; package cn.hippo4j.starter.core;
import cn.hippo4j.starter.wrapper.DynamicThreadPoolWrapper;
import cn.hippo4j.common.model.PoolParameter; import cn.hippo4j.common.model.PoolParameter;
import cn.hippo4j.starter.wrapper.DynamicThreadPoolWrapper;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -39,4 +41,8 @@ public class GlobalThreadPoolManage {
POOL_PARAMETER.put(tpId, poolParameter); POOL_PARAMETER.put(tpId, poolParameter);
} }
public static List<String> listThreadPoolId() {
return Lists.newArrayList(POOL_PARAMETER.keySet());
}
} }

Loading…
Cancel
Save