From 60273f1cfab0cf1306218894d8baf963fa73ca06 Mon Sep 17 00:00:00 2001 From: "chen.ma" Date: Wed, 17 Nov 2021 23:42:46 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=20Hippo4J=20Starter=20?= =?UTF-8?q?=E6=8A=A5=E8=AD=A6=E9=80=9A=E7=9F=A5=E6=A8=A1=E5=9D=97.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../hippo4j/starter/alarm/AlarmNotifyDTO.java | 49 ++++++++ .../starter/alarm/BaseSendMessageService.java | 112 ++++++++++++++++-- .../starter/alarm/DingSendMessageHandler.java | 41 +++---- .../starter/alarm/SendMessageHandler.java | 10 +- .../starter/config/MessageAlarmConfig.java | 10 +- .../starter/core/GlobalThreadPoolManage.java | 8 +- 6 files changed, 182 insertions(+), 48 deletions(-) create mode 100644 hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/alarm/AlarmNotifyDTO.java diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/alarm/AlarmNotifyDTO.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/alarm/AlarmNotifyDTO.java new file mode 100644 index 00000000..4b92e4e5 --- /dev/null +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/alarm/AlarmNotifyDTO.java @@ -0,0 +1,49 @@ +package cn.hippo4j.starter.alarm; + +import lombok.Data; + +/** + * 报警通知实体. + * + * @author chen.ma + * @date 2021/11/17 22:12 + */ +@Data +public class AlarmNotifyDTO { + + /** + * 租户id + */ + private String tenantId; + + /** + * 项目id + */ + private String itemId; + + /** + * 线程池id + */ + private String tpId; + + /** + * 通知平台 + */ + private String platform; + + /** + * 密钥 + */ + private String secretKey; + + /** + * 报警间隔 + */ + private Integer interval; + + /** + * 接收者 + */ + private String receives; + +} diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/alarm/BaseSendMessageService.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/alarm/BaseSendMessageService.java index 5fdfa4b1..671a92e8 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/alarm/BaseSendMessageService.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/alarm/BaseSendMessageService.java @@ -2,13 +2,23 @@ package cn.hippo4j.starter.alarm; import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.model.PoolParameterInfo; +import cn.hippo4j.common.toolkit.GroupKey; +import cn.hippo4j.common.web.base.Result; +import cn.hippo4j.starter.config.BootstrapProperties; import cn.hippo4j.starter.core.DynamicThreadPoolExecutor; +import cn.hippo4j.starter.core.GlobalThreadPoolManage; +import cn.hippo4j.starter.remote.HttpAgent; +import cn.hutool.core.collection.CollUtil; +import com.alibaba.fastjson.JSON; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import lombok.AllArgsConstructor; +import lombok.Data; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.InitializingBean; -import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -23,37 +33,119 @@ import java.util.Map; public class BaseSendMessageService implements InitializingBean, SendMessageService { @NonNull - private final List notifyConfigs; + private final HttpAgent httpAgent; - private final List sendMessageHandlers = new ArrayList(4); + @NonNull + private final BootstrapProperties properties; + + private final static Map> ALARM_NOTIFY_CONFIG = Maps.newHashMap(); + + + private final Map sendMessageHandlers = Maps.newHashMap(); @Override public void sendAlarmMessage(DynamicThreadPoolExecutor threadPoolExecutor) { - for (SendMessageHandler messageHandler : sendMessageHandlers) { + List notifyList = ALARM_NOTIFY_CONFIG.get(threadPoolExecutor.getThreadPoolId()); + if (CollUtil.isEmpty(notifyList)) { + log.warn("Please configure alarm notification on the server."); + return; + } + + notifyList.forEach(each -> { try { - messageHandler.sendAlarmMessage(notifyConfigs, threadPoolExecutor); + SendMessageHandler messageHandler = sendMessageHandlers.get(each.getPlatform()); + if (messageHandler == null) { + log.warn("Please configure alarm notification on the server."); + return; + } + + messageHandler.sendAlarmMessage(each, threadPoolExecutor); } catch (Exception ex) { log.warn("Failed to send thread pool alarm notification.", ex); } - } + }); } @Override public void sendChangeMessage(PoolParameterInfo parameter) { - for (SendMessageHandler messageHandler : sendMessageHandlers) { + List notifyList = ALARM_NOTIFY_CONFIG.get(parameter.getTpId()); + if (CollUtil.isEmpty(notifyList)) { + log.warn("Please configure alarm notification on the server."); + return; + } + + notifyList.forEach(each -> { try { - messageHandler.sendChangeMessage(notifyConfigs, parameter); + SendMessageHandler messageHandler = sendMessageHandlers.get(each.getPlatform()); + if (messageHandler == null) { + log.warn("Please configure alarm notification on the server."); + return; + } + + messageHandler.sendChangeMessage(each, parameter); } catch (Exception ex) { log.warn("Failed to send thread pool change notification.", ex); } - } + }); } @Override public void afterPropertiesSet() { Map sendMessageHandlerMap = ApplicationContextHolder.getBeansOfType(SendMessageHandler.class); - sendMessageHandlerMap.values().forEach(each -> sendMessageHandlers.add(each)); + sendMessageHandlerMap.values().forEach(each -> sendMessageHandlers.put(each.getType(), each)); + + List threadPoolIds = GlobalThreadPoolManage.listThreadPoolId(); + if (CollUtil.isEmpty(threadPoolIds)) { + log.warn("The client does not have a dynamic thread pool instance configured."); + return; + } + + List groupKeys = Lists.newArrayList(); + threadPoolIds.forEach(each -> { + String groupKey = GroupKey.getKeyTenant(each, properties.getItemId(), properties.getNamespace()); + groupKeys.add(groupKey); + }); + + Result result = null; + try { + result = httpAgent.httpPostByDiscovery("/v1/cs/alarm/list/config", new ThreadPoolAlarmReqDTO(groupKeys)); + } catch (Throwable ex) { + log.error("Get dynamic thread pool alarm configuration error.", ex); + throw ex; + } + + if (result.isSuccess() || result.getData() != null) { + String resultDataStr = JSON.toJSONString(result.getData()); + List resultData = JSON.parseArray(resultDataStr, ThreadPoolAlarmNotify.class); + resultData.forEach(each -> ALARM_NOTIFY_CONFIG.put(each.getThreadPoolId(), each.getAlarmNotifyList())); + } + } + + @Data + @AllArgsConstructor + static class ThreadPoolAlarmReqDTO { + + /** + * groupKeys + */ + private List groupKeys; + + } + + @Data + static class ThreadPoolAlarmNotify { + + /** + * 线程池ID + */ + private String threadPoolId; + + /** + * 报警配置 + */ + private List alarmNotifyList; + } } diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/alarm/DingSendMessageHandler.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/alarm/DingSendMessageHandler.java index e5fbae8c..1c329e2c 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/alarm/DingSendMessageHandler.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/alarm/DingSendMessageHandler.java @@ -1,25 +1,23 @@ package cn.hippo4j.starter.alarm; -import cn.hutool.core.date.DateUtil; -import cn.hutool.core.util.StrUtil; -import com.dingtalk.api.DefaultDingTalkClient; -import com.dingtalk.api.DingTalkClient; -import com.dingtalk.api.request.OapiRobotSendRequest; import cn.hippo4j.common.model.InstanceInfo; import cn.hippo4j.common.model.PoolParameterInfo; -import cn.hippo4j.starter.core.GlobalThreadPoolManage; import cn.hippo4j.starter.core.DynamicThreadPoolExecutor; +import cn.hippo4j.starter.core.GlobalThreadPoolManage; import cn.hippo4j.starter.toolkit.thread.QueueTypeEnum; import cn.hippo4j.starter.toolkit.thread.RejectedTypeEnum; import cn.hippo4j.starter.wrapper.DynamicThreadPoolWrapper; +import cn.hutool.core.date.DateUtil; +import cn.hutool.core.util.StrUtil; +import com.dingtalk.api.DefaultDingTalkClient; +import com.dingtalk.api.DingTalkClient; +import com.dingtalk.api.request.OapiRobotSendRequest; import com.google.common.base.Joiner; import com.taobao.api.ApiException; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import java.util.List; -import java.util.Objects; -import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; @@ -35,8 +33,6 @@ public class DingSendMessageHandler implements SendMessageHandler { private String active; - private Long alarmInterval; - private InstanceInfo instanceInfo; @Override @@ -45,22 +41,16 @@ public class DingSendMessageHandler implements SendMessageHandler { } @Override - public void sendAlarmMessage(List notifyConfigs, DynamicThreadPoolExecutor pool) { - Optional notifyConfigOptional = notifyConfigs.stream() - .filter(each -> Objects.equals(each.getType(), getType())) - .findFirst(); - notifyConfigOptional.ifPresent(each -> dingAlarmSendMessage(each, pool)); + public void sendAlarmMessage(AlarmNotifyDTO notifyConfig, DynamicThreadPoolExecutor pool) { + dingAlarmSendMessage(notifyConfig, pool); } @Override - public void sendChangeMessage(List notifyConfigs, PoolParameterInfo parameter) { - Optional changeConfigOptional = notifyConfigs.stream() - .filter(each -> Objects.equals(each.getType(), getType())) - .findFirst(); - changeConfigOptional.ifPresent(each -> dingChangeSendMessage(each, parameter)); + public void sendChangeMessage(AlarmNotifyDTO notifyConfig, PoolParameterInfo parameter) { + dingChangeSendMessage(notifyConfig, parameter); } - private void dingAlarmSendMessage(NotifyConfig notifyConfig, DynamicThreadPoolExecutor pool) { + private void dingAlarmSendMessage(AlarmNotifyDTO notifyConfig, DynamicThreadPoolExecutor pool) { List receives = StrUtil.split(notifyConfig.getReceives(), ','); String afterReceives = Joiner.on(", @").join(receives); @@ -126,7 +116,7 @@ public class DingSendMessageHandler implements SendMessageHandler { // 告警手机号 afterReceives, // 报警频率 - alarmInterval, + notifyConfig.getInterval(), // 当前时间 DateUtil.now() ); @@ -134,7 +124,7 @@ public class DingSendMessageHandler implements SendMessageHandler { execute(notifyConfig, "动态线程池告警", text, receives); } - private void dingChangeSendMessage(NotifyConfig notifyConfig, PoolParameterInfo parameter) { + private void dingChangeSendMessage(AlarmNotifyDTO notifyConfig, PoolParameterInfo parameter) { String threadPoolId = parameter.getTpId(); DynamicThreadPoolWrapper poolWrap = GlobalThreadPoolManage.getExecutorService(threadPoolId); if (poolWrap == null) { @@ -200,8 +190,9 @@ public class DingSendMessageHandler implements SendMessageHandler { execute(notifyConfig, "动态线程池通知", text, receives); } - private void execute(NotifyConfig notifyConfigs, String title, String text, List mobiles) { - String serverUrl = notifyConfigs.getUrl() + notifyConfigs.getToken(); + private void execute(AlarmNotifyDTO notifyConfig, String title, String text, List mobiles) { + String url = "https://oapi.dingtalk.com/robot/send?access_token="; + String serverUrl = url + notifyConfig.getSecretKey(); DingTalkClient dingTalkClient = new DefaultDingTalkClient(serverUrl); OapiRobotSendRequest request = new OapiRobotSendRequest(); request.setMsgtype("markdown"); diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/alarm/SendMessageHandler.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/alarm/SendMessageHandler.java index 873b110a..0361967f 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/alarm/SendMessageHandler.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/alarm/SendMessageHandler.java @@ -3,8 +3,6 @@ package cn.hippo4j.starter.alarm; import cn.hippo4j.common.model.PoolParameterInfo; import cn.hippo4j.starter.core.DynamicThreadPoolExecutor; -import java.util.List; - /** * Send message handler. * @@ -23,17 +21,17 @@ public interface SendMessageHandler { /** * Send alarm message. * - * @param notifyConfigs + * @param notifyConfig * @param threadPoolExecutor */ - void sendAlarmMessage(List notifyConfigs, DynamicThreadPoolExecutor threadPoolExecutor); + void sendAlarmMessage(AlarmNotifyDTO notifyConfig, DynamicThreadPoolExecutor threadPoolExecutor); /** * Send change message. * - * @param notifyConfigs + * @param notifyConfig * @param parameter */ - void sendChangeMessage(List notifyConfigs, PoolParameterInfo parameter); + void sendChangeMessage(AlarmNotifyDTO notifyConfig, PoolParameterInfo parameter); } diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/config/MessageAlarmConfig.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/config/MessageAlarmConfig.java index d5cb903b..774aa6fc 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/config/MessageAlarmConfig.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/config/MessageAlarmConfig.java @@ -2,14 +2,13 @@ package cn.hippo4j.starter.config; import cn.hippo4j.common.model.InstanceInfo; import cn.hippo4j.starter.alarm.*; +import cn.hippo4j.starter.remote.HttpAgent; import lombok.AllArgsConstructor; import org.apache.logging.log4j.util.Strings; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.DependsOn; import org.springframework.core.env.ConfigurableEnvironment; -import java.util.Optional; - /** * Message alarm config. * @@ -29,15 +28,14 @@ public class MessageAlarmConfig { @DependsOn("applicationContextHolder") @Bean(MessageAlarmConfig.SEND_MESSAGE_BEAN_NAME) - public SendMessageService sendMessageService() { - return new BaseSendMessageService(properties.getNotifys()); + public SendMessageService sendMessageService(HttpAgent httpAgent) { + return new BaseSendMessageService(httpAgent, properties); } @Bean public SendMessageHandler dingSendMessageHandler() { String active = environment.getProperty("spring.profiles.active", Strings.EMPTY); - Long alarmInterval = Optional.ofNullable(properties.getAlarmInterval()).orElse(5L); - return new DingSendMessageHandler(active, alarmInterval, instanceInfo); + return new DingSendMessageHandler(active, instanceInfo); } @Bean diff --git a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/GlobalThreadPoolManage.java b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/GlobalThreadPoolManage.java index d7c76b94..f7bbf748 100644 --- a/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/GlobalThreadPoolManage.java +++ b/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/starter/core/GlobalThreadPoolManage.java @@ -1,8 +1,10 @@ package cn.hippo4j.starter.core; -import cn.hippo4j.starter.wrapper.DynamicThreadPoolWrapper; import cn.hippo4j.common.model.PoolParameter; +import cn.hippo4j.starter.wrapper.DynamicThreadPoolWrapper; +import com.google.common.collect.Lists; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -39,4 +41,8 @@ public class GlobalThreadPoolManage { POOL_PARAMETER.put(tpId, poolParameter); } + public static List listThreadPoolId() { + return Lists.newArrayList(POOL_PARAMETER.keySet()); + } + }