抽象 hippo4j 核心组件, 不依赖 server 端即可完成动态调参、监控、报警等功能. (#103)

pull/110/merge
chen.ma 3 years ago
parent 4ae492c8b0
commit 9c0d660ef8

@ -46,7 +46,7 @@ public class DynamicThreadPoolExecutor extends AbstractDynamicExecutorSupport {
this.threadPoolId = threadPoolId;
// Number of dynamic proxy denial policies.
RejectedExecutionHandler rejectedProxy = RejectedProxyUtil.createProxy(handler, rejectCount);
RejectedExecutionHandler rejectedProxy = RejectedProxyUtil.createProxy(handler, threadPoolId, rejectCount);
setRejectedExecutionHandler(rejectedProxy);
// Redundant fields to avoid reflecting the acquired fields when sending change information.

@ -0,0 +1,219 @@
package cn.hippo4j.core.executor;
import cn.hippo4j.common.notify.HippoSendMessageService;
import cn.hippo4j.common.notify.NotifyTypeEnum;
import cn.hippo4j.common.notify.ThreadPoolNotifyAlarm;
import cn.hippo4j.common.notify.request.AlarmNotifyRequest;
import cn.hippo4j.common.notify.request.ChangeParameterNotifyRequest;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.toolkit.CalculateUtil;
import cn.hippo4j.core.toolkit.IdentifyUtil;
import cn.hutool.core.util.StrUtil;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import java.util.List;
import java.util.concurrent.*;
/**
* Thread pool alarm notify.
*
* @author chen.ma
* @date 2021/8/15 14:13
*/
@Slf4j
@RequiredArgsConstructor
public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner {
@NonNull
private final HippoSendMessageService hippoSendMessageService;
@Value("${spring.profiles.active:UNKNOWN}")
private String active;
@Value("${spring.dynamic.thread-pool.item-id}")
private String itemId;
@Value("${spring.application.name}")
private String applicationName;
@Value("${spring.dynamic.thread-pool.check-state-interval:5}")
private Integer checkStateInterval;
private final ScheduledExecutorService ALARM_NOTIFY_EXECUTOR = new ScheduledThreadPoolExecutor(
1,
r -> new Thread(r, "client.alarm.notify")
);
@Override
public void run(String... args) throws Exception {
ALARM_NOTIFY_EXECUTOR.scheduleWithFixedDelay(this, 0, checkStateInterval, TimeUnit.SECONDS);
}
@Override
public void run() {
List<String> listThreadPoolId = GlobalThreadPoolManage.listThreadPoolId();
listThreadPoolId.forEach(threadPoolId -> {
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(threadPoolId);
if (threadPoolNotifyAlarm != null && threadPoolNotifyAlarm.getIsAlarm()) {
DynamicThreadPoolWrapper wrapper = GlobalThreadPoolManage.getExecutorService(threadPoolId);
ThreadPoolExecutor executor = wrapper.getExecutor();
checkPoolCapacityAlarm(threadPoolId, executor);
checkPoolActivityAlarm(threadPoolId, executor);
}
});
}
/**
* Check thread pool capacity alarm.
*
* @param threadPoolId
* @param threadPoolExecutor
*/
public void checkPoolCapacityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) {
if (hippoSendMessageService == null) {
return;
}
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(threadPoolId);
BlockingQueue blockIngQueue = threadPoolExecutor.getQueue();
int queueSize = blockIngQueue.size();
int capacity = queueSize + blockIngQueue.remainingCapacity();
int divide = CalculateUtil.divide(queueSize, capacity);
boolean isSend = threadPoolNotifyAlarm.getIsAlarm()
&& divide > threadPoolNotifyAlarm.getCapacityAlarm();
if (isSend) {
AlarmNotifyRequest alarmNotifyRequest = buildAlarmNotifyReq(threadPoolExecutor);
alarmNotifyRequest.setThreadPoolId(threadPoolId);
hippoSendMessageService.sendAlarmMessage(NotifyTypeEnum.CAPACITY, alarmNotifyRequest);
}
}
/**
* Check thread pool activity alarm.
*
* @param threadPoolId
* @param threadPoolExecutor
*/
public void checkPoolActivityAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) {
int activeCount = threadPoolExecutor.getActiveCount();
int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
int divide = CalculateUtil.divide(activeCount, maximumPoolSize);
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(threadPoolId);
boolean isSend = threadPoolNotifyAlarm.getIsAlarm()
&& divide > threadPoolNotifyAlarm.getLivenessAlarm();
if (isSend) {
AlarmNotifyRequest alarmNotifyRequest = buildAlarmNotifyReq(threadPoolExecutor);
alarmNotifyRequest.setThreadPoolId(threadPoolId);
hippoSendMessageService.sendAlarmMessage(NotifyTypeEnum.ACTIVITY, alarmNotifyRequest);
}
}
/**
* Check pool rejected alarm.
*
* @param threadPoolId
*/
public void checkPoolRejectedAlarm(String threadPoolId) {
ThreadPoolExecutor threadPoolExecutor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getExecutor();
checkPoolRejectedAlarm(threadPoolId, threadPoolExecutor);
}
/**
* Check pool rejected alarm.
*
* @param threadPoolId
* @param threadPoolExecutor
*/
public void checkPoolRejectedAlarm(String threadPoolId, ThreadPoolExecutor threadPoolExecutor) {
if (threadPoolExecutor instanceof DynamicThreadPoolExecutor) {
AlarmNotifyRequest alarmNotifyRequest = buildAlarmNotifyReq(threadPoolExecutor);
alarmNotifyRequest.setThreadPoolId(threadPoolId);
hippoSendMessageService.sendAlarmMessage(NotifyTypeEnum.REJECT, alarmNotifyRequest);
}
}
/**
* Send pool config change.
*
* @param request
*/
public void sendPoolConfigChange(ChangeParameterNotifyRequest request) {
request.setActive(active.toUpperCase());
String appName = StrUtil.isBlank(itemId) ? applicationName : itemId;
request.setAppName(appName);
request.setIdentify(IdentifyUtil.getIdentify());
hippoSendMessageService.sendChangeMessage(request);
}
/**
* Build alarm notify req.
*
* @param threadPoolExecutor
* @return
*/
public AlarmNotifyRequest buildAlarmNotifyReq(ThreadPoolExecutor threadPoolExecutor) {
AlarmNotifyRequest request = new AlarmNotifyRequest();
String appName = StrUtil.isBlank(itemId) ? applicationName : itemId;
request.setAppName(appName);
// 核心线程数
int corePoolSize = threadPoolExecutor.getCorePoolSize();
// 最大线程数
int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
// 线程池当前线程数 (有锁)
int poolSize = threadPoolExecutor.getPoolSize();
// 活跃线程数 (有锁)
int activeCount = threadPoolExecutor.getActiveCount();
// 同时进入池中的最大线程数 (有锁)
int largestPoolSize = threadPoolExecutor.getLargestPoolSize();
// 线程池中执行任务总数量 (有锁)
long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
request.setActive(active.toUpperCase());
request.setIdentify(IdentifyUtil.getIdentify());
request.setCorePoolSize(corePoolSize);
request.setMaximumPoolSize(maximumPoolSize);
request.setPoolSize(poolSize);
request.setActiveCount(activeCount);
request.setLargestPoolSize(largestPoolSize);
request.setCompletedTaskCount(completedTaskCount);
BlockingQueue<Runnable> queue = threadPoolExecutor.getQueue();
// 队列元素个数
int queueSize = queue.size();
// 队列类型
String queueType = queue.getClass().getSimpleName();
// 队列剩余容量
int remainingCapacity = queue.remainingCapacity();
// 队列容量
int queueCapacity = queueSize + remainingCapacity;
request.setQueueName(queueType);
request.setCapacity(queueCapacity);
request.setQueueSize(queueSize);
request.setRemainingCapacity(remainingCapacity);
RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor instanceof DynamicThreadPoolExecutor
? ((DynamicThreadPoolExecutor) threadPoolExecutor).getRedundancyHandler()
: threadPoolExecutor.getRejectedExecutionHandler();
request.setRejectedExecutionHandlerName(rejectedExecutionHandler.getClass().getSimpleName());
long rejectCount = threadPoolExecutor instanceof DynamicThreadPoolExecutor
? ((DynamicThreadPoolExecutor) threadPoolExecutor).getRejectCountNum()
: -1L;
request.setRejectCountNum(rejectCount);
return request;
}
}

@ -0,0 +1,41 @@
package cn.hippo4j.core.executor.manage;
import cn.hippo4j.common.notify.ThreadPoolNotifyAlarm;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* Global notify alarm manage.
*
* @author chen.ma
* @date 2022/2/24 20:12
*/
public class GlobalNotifyAlarmManage {
/**
* Notify alarm map.
*/
private static final Map<String, ThreadPoolNotifyAlarm> NOTIFY_ALARM_MAP = new ConcurrentHashMap();
/**
* Get.
*
* @param key
* @return
*/
public static ThreadPoolNotifyAlarm get(String key) {
return NOTIFY_ALARM_MAP.get(key);
}
/**
* Put.
*
* @param key
* @param val
*/
public static void put(String key, ThreadPoolNotifyAlarm val) {
NOTIFY_ALARM_MAP.put(key, val);
}
}

@ -1,5 +1,7 @@
package cn.hippo4j.core.proxy;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import lombok.AllArgsConstructor;
import java.lang.reflect.InvocationHandler;
@ -18,11 +20,17 @@ public class RejectedProxyInvocationHandler implements InvocationHandler {
private final Object target;
private final String threadPoolId;
private final AtomicLong rejectCount;
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
rejectCount.incrementAndGet();
ThreadPoolNotifyAlarmHandler alarmHandler = ApplicationContextHolder.getBean(ThreadPoolNotifyAlarmHandler.class);
alarmHandler.checkPoolRejectedAlarm(threadPoolId);
try {
return method.invoke(target, args);
} catch (InvocationTargetException ex) {

@ -16,15 +16,16 @@ public class RejectedProxyUtil {
* Proxy rejected execution.
*
* @param rejectedExecutionHandler
* @param threadPoolId
* @param rejectedNum
* @return
*/
public static RejectedExecutionHandler createProxy(RejectedExecutionHandler rejectedExecutionHandler, AtomicLong rejectedNum) {
public static RejectedExecutionHandler createProxy(RejectedExecutionHandler rejectedExecutionHandler, String threadPoolId, AtomicLong rejectedNum) {
RejectedExecutionHandler rejectedProxy = (RejectedExecutionHandler) Proxy
.newProxyInstance(
rejectedExecutionHandler.getClass().getClassLoader(),
new Class[]{RejectedExecutionHandler.class},
new RejectedProxyInvocationHandler(rejectedExecutionHandler, rejectedNum)
new RejectedProxyInvocationHandler(rejectedExecutionHandler, threadPoolId, rejectedNum)
);
return rejectedProxy;

@ -2,14 +2,17 @@ package cn.hippo4j.core.refresh;
import cn.hippo4j.common.enums.EnableEnum;
import cn.hippo4j.common.model.PoolParameterInfo;
import cn.hippo4j.common.notify.request.ChangeParameterNotifyRequest;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.core.executor.support.QueueTypeEnum;
import cn.hippo4j.core.executor.support.RejectedTypeEnum;
import cn.hippo4j.core.executor.support.ResizableCapacityLinkedBlockIngQueue;
import cn.hippo4j.core.proxy.RejectedProxyUtil;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.Objects;
@ -25,19 +28,33 @@ import java.util.concurrent.atomic.AtomicLong;
* @date 2021/6/20 15:51
*/
@Slf4j
@AllArgsConstructor
public class ThreadPoolDynamicRefresh {
public static void refreshDynamicPool(String content) {
private final ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler;
/**
* Refresh dynamic pool.
*
* @param content
*/
public void refreshDynamicPool(String content) {
PoolParameterInfo parameter = JSONUtil.parseObject(content, PoolParameterInfo.class);
// TODO 抽象报警通知模块
// ThreadPoolAlarmManage.sendPoolConfigChange(parameter);
ThreadPoolDynamicRefresh.refreshDynamicPool(parameter);
}
public static void refreshDynamicPool(PoolParameterInfo parameter) {
String threadPoolId = parameter.getTpId();
ThreadPoolExecutor executor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getExecutor();
refreshDynamicPool(parameter, executor);
}
/**
* Refresh dynamic pool.
*
* @param parameter
* @param executor
*/
public void refreshDynamicPool(PoolParameterInfo parameter, ThreadPoolExecutor executor) {
String threadPoolId = parameter.getTpId();
int originalCoreSize = executor.getCorePoolSize();
int originalMaximumPoolSize = executor.getMaximumPoolSize();
String originalQuery = executor.getQueue().getClass().getSimpleName();
@ -53,9 +70,28 @@ public class ThreadPoolDynamicRefresh {
}
originalRejected = rejectedExecutionHandler.getClass().getSimpleName();
// Send change message.
ChangeParameterNotifyRequest request = new ChangeParameterNotifyRequest();
request.setBeforeCorePoolSize(originalCoreSize);
request.setBeforeMaximumPoolSize(originalMaximumPoolSize);
request.setBeforeAllowsCoreThreadTimeOut(originalAllowCoreThreadTimeOut);
request.setBeforeKeepAliveTime(originalKeepAliveTime);
request.setBlockingQueueName(originalQuery);
request.setBeforeQueueCapacity(originalCapacity);
request.setBeforeRejectedName(originalRejected);
request.setThreadPoolId(threadPoolId);
changePoolInfo(executor, parameter);
ThreadPoolExecutor afterExecutor = GlobalThreadPoolManage.getExecutorService(threadPoolId).getExecutor();
request.setNowCorePoolSize(afterExecutor.getCorePoolSize());
request.setNowMaximumPoolSize(afterExecutor.getMaximumPoolSize());
request.setNowAllowsCoreThreadTimeOut(EnableEnum.getBool(parameter.getAllowCoreThreadTimeOut()));
request.setNowKeepAliveTime(afterExecutor.getKeepAliveTime(TimeUnit.SECONDS));
request.setNowQueueCapacity((afterExecutor.getQueue().remainingCapacity() + afterExecutor.getQueue().size()));
request.setNowRejectedName(RejectedTypeEnum.getRejectedNameByType(parameter.getRejectedType()));
threadPoolNotifyAlarmHandler.sendPoolConfigChange(request);
log.info(
"[🔥 {}] Changed thread pool. " +
"\n coreSize :: [{}]" +
@ -77,7 +113,13 @@ public class ThreadPoolDynamicRefresh {
);
}
public static void changePoolInfo(ThreadPoolExecutor executor, PoolParameterInfo parameter) {
/**
* Change pool info.
*
* @param executor
* @param parameter
*/
public void changePoolInfo(ThreadPoolExecutor executor, PoolParameterInfo parameter) {
if (parameter.getCoreSize() != null) {
executor.setCorePoolSize(parameter.getCoreSize());
}
@ -106,7 +148,7 @@ public class ThreadPoolDynamicRefresh {
DynamicThreadPoolExecutor dynamicExecutor = (DynamicThreadPoolExecutor) executor;
dynamicExecutor.setRedundancyHandler(rejectedExecutionHandler);
AtomicLong rejectCount = dynamicExecutor.getRejectCount();
rejectedExecutionHandler = RejectedProxyUtil.createProxy(rejectedExecutionHandler, rejectCount);
rejectedExecutionHandler = RejectedProxyUtil.createProxy(rejectedExecutionHandler, parameter.getTpId(), rejectCount);
}
executor.setRejectedExecutionHandler(rejectedExecutionHandler);

@ -1,8 +1,8 @@
package cn.hippo4j.starter.toolkit;
package cn.hippo4j.core.toolkit;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.starter.config.BootstrapProperties;
import cn.hippo4j.starter.toolkit.inet.InetUtils;
import cn.hippo4j.core.toolkit.inet.InetUtils;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
@ -13,7 +13,6 @@ import java.util.ArrayList;
import static cn.hippo4j.common.constant.Constants.GROUP_KEY_DELIMITER;
import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL;
import static cn.hippo4j.starter.config.DynamicThreadPoolAutoConfiguration.CLIENT_IDENTIFICATION_VALUE;
/**
* Identify util.
@ -23,7 +22,9 @@ import static cn.hippo4j.starter.config.DynamicThreadPoolAutoConfiguration.CLIEN
*/
public class IdentifyUtil {
private static String identify;
private static String IDENTIFY;
public static final String CLIENT_IDENTIFICATION_VALUE = IdUtil.simpleUUID();
/**
* Generate identify.
@ -33,8 +34,8 @@ public class IdentifyUtil {
* @return
*/
public static synchronized String generate(ConfigurableEnvironment environment, InetUtils hippo4JInetUtils) {
if (StrUtil.isNotBlank(identify)) {
return identify;
if (StrUtil.isNotBlank(IDENTIFY)) {
return IDENTIFY;
}
String ip = hippo4JInetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
String port = environment.getProperty("server.port", "8080");
@ -45,7 +46,7 @@ public class IdentifyUtil {
CLIENT_IDENTIFICATION_VALUE
).toString();
identify = identification;
IDENTIFY = identification;
return identification;
}
@ -56,7 +57,7 @@ public class IdentifyUtil {
*/
@SneakyThrows
public static String getIdentify() {
while (StrUtil.isBlank(identify)) {
while (StrUtil.isBlank(IDENTIFY)) {
ConfigurableEnvironment environment = ApplicationContextHolder.getBean(ConfigurableEnvironment.class);
InetUtils inetUtils = ApplicationContextHolder.getBean(InetUtils.class);
@ -68,19 +69,20 @@ public class IdentifyUtil {
Thread.sleep(500);
}
return identify;
return IDENTIFY;
}
/**
* Get thread pool identify.
*
* @param threadPoolId
* @param properties
* @param itemId
* @param namespace
* @return
*/
public static String getThreadPoolIdentify(String threadPoolId, BootstrapProperties properties) {
public static String getThreadPoolIdentify(String threadPoolId, String itemId, String namespace) {
ArrayList<String> params = Lists.newArrayList(
threadPoolId, properties.getItemId(), properties.getNamespace(), getIdentify()
threadPoolId, itemId, namespace, getIdentify()
);
return Joiner.on(GROUP_KEY_DELIMITER).join(params);

@ -1,4 +1,4 @@
package cn.hippo4j.starter.toolkit.inet;
package cn.hippo4j.core.toolkit.inet;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import org.apache.commons.logging.Log;

@ -1,4 +1,4 @@
package cn.hippo4j.starter.toolkit.inet;
package cn.hippo4j.core.toolkit.inet;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;

@ -1,179 +0,0 @@
package cn.hippo4j.starter.alarm;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.model.PoolParameterInfo;
import cn.hippo4j.common.notify.AlarmControlDTO;
import cn.hippo4j.common.notify.AlarmControlHandler;
import cn.hippo4j.common.notify.NotifyTypeEnum;
import cn.hippo4j.common.toolkit.GroupKey;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.starter.config.BootstrapProperties;
import cn.hippo4j.starter.remote.HttpAgent;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import java.util.List;
import java.util.Map;
import static cn.hippo4j.common.constant.Constants.BASE_PATH;
/**
* Base send message service.
*
* @author chen.ma
* @date 2021/8/15 15:34
*/
@Slf4j
@RequiredArgsConstructor
public class BaseSendMessageService implements InitializingBean, SendMessageService {
@NonNull
private final HttpAgent httpAgent;
@NonNull
private final BootstrapProperties properties;
@NonNull
private final AlarmControlHandler alarmControlHandler;
public final static Map<String, List<NotifyDTO>> ALARM_NOTIFY_CONFIG = Maps.newHashMap();
private final Map<String, SendMessageHandler> sendMessageHandlers = Maps.newHashMap();
@Override
public void sendAlarmMessage(NotifyTypeEnum typeEnum, DynamicThreadPoolExecutor executor) {
String threadPoolId = executor.getThreadPoolId();
String buildKey = StrUtil.builder(executor.getThreadPoolId(), "+", "ALARM").toString();
List<NotifyDTO> notifyList = ALARM_NOTIFY_CONFIG.get(buildKey);
if (CollUtil.isEmpty(notifyList)) {
log.warn("Please configure alarm notification on the server. key :: [{}]", threadPoolId);
return;
}
notifyList.forEach(each -> {
try {
SendMessageHandler messageHandler = sendMessageHandlers.get(each.getPlatform());
if (messageHandler == null) {
log.warn("Please configure alarm notification on the server. key :: [{}]", threadPoolId);
return;
}
if (isSendAlarm(each.getTpId(), each.setTypeEnum(typeEnum))) {
messageHandler.sendAlarmMessage(each, executor);
}
} catch (Exception ex) {
log.warn("Failed to send thread pool alarm notification. key :: [{}]", threadPoolId, ex);
}
});
}
@Override
public void sendChangeMessage(PoolParameterInfo parameter) {
String threadPoolId = parameter.getTpId();
String buildKey = StrUtil.builder(parameter.getTpId(), "+", "CONFIG").toString();
List<NotifyDTO> notifyList = ALARM_NOTIFY_CONFIG.get(buildKey);
if (CollUtil.isEmpty(notifyList)) {
log.warn("Please configure alarm notification on the server. key :: [{}]", threadPoolId);
return;
}
notifyList.forEach(each -> {
try {
SendMessageHandler messageHandler = sendMessageHandlers.get(each.getPlatform());
if (messageHandler == null) {
log.warn("Please configure alarm notification on the server. key :: [{}]", threadPoolId);
return;
}
messageHandler.sendChangeMessage(each, parameter);
} catch (Exception ex) {
log.warn("Failed to send thread pool change notification. key :: [{}]", threadPoolId, ex);
}
});
}
@Override
public void afterPropertiesSet() {
Map<String, SendMessageHandler> sendMessageHandlerMap =
ApplicationContextHolder.getBeansOfType(SendMessageHandler.class);
sendMessageHandlerMap.values().forEach(each -> sendMessageHandlers.put(each.getType(), each));
List<String> threadPoolIds = GlobalThreadPoolManage.listThreadPoolId();
if (CollUtil.isEmpty(threadPoolIds)) {
log.warn("The client does not have a dynamic thread pool instance configured.");
return;
}
List<String> groupKeys = Lists.newArrayList();
threadPoolIds.forEach(each -> {
String groupKey = GroupKey.getKeyTenant(each, properties.getItemId(), properties.getNamespace());
groupKeys.add(groupKey);
});
Result result = null;
try {
result = httpAgent.httpPostByDiscovery(BASE_PATH + "/notify/list/config", new ThreadPoolNotifyReqDTO(groupKeys));
} catch (Throwable ex) {
log.error("Get dynamic thread pool notify configuration error. message :: {}", ex.getMessage());
}
if (result != null && result.isSuccess() && result.getData() != null) {
String resultDataStr = JSONUtil.toJSONString(result.getData());
List<ThreadPoolNotify> resultData = JSONUtil.parseArray(resultDataStr, ThreadPoolNotify.class);
resultData.forEach(each -> ALARM_NOTIFY_CONFIG.put(each.getNotifyKey(), each.getNotifyList()));
ALARM_NOTIFY_CONFIG.forEach((key, val) ->
val.stream().filter(each -> StrUtil.equals("ALARM", each.getType()))
.forEach(each -> alarmControlHandler.initCacheAndLock(each.getTpId(), each.getPlatform(), each.getInterval()))
);
}
}
private boolean isSendAlarm(String threadPoolId, NotifyDTO notifyInfo) {
AlarmControlDTO alarmControl = AlarmControlDTO.builder()
.threadPool(threadPoolId)
.platform(notifyInfo.getPlatform())
.typeEnum(notifyInfo.getTypeEnum())
.build();
return alarmControlHandler.isSendAlarm(alarmControl);
}
@Data
@AllArgsConstructor
static class ThreadPoolNotifyReqDTO {
/**
* groupKeys
*/
private List<String> groupKeys;
}
@Data
static class ThreadPoolNotify {
/**
* Key
*/
private String notifyKey;
/**
*
*/
private List<NotifyDTO> notifyList;
}
}

@ -1,31 +0,0 @@
package cn.hippo4j.starter.alarm;
/**
* Message type enum.
*
* @author chen.ma
* @date 2021/8/16 20:50
*/
public enum MessageTypeEnum {
/**
*
*/
CHANGE,
/**
*
*/
CAPACITY,
/**
*
*/
LIVENESS,
/**
*
*/
REJECT
}

@ -1,34 +0,0 @@
package cn.hippo4j.starter.alarm;
import lombok.Data;
/**
* Alarm config.
*
* @author chen.ma
* @date 2021/8/15 16:09
*/
@Data
public class NotifyConfig {
/**
* type
*/
private String type;
/**
* url
*/
private String url;
/**
* token
*/
private String token;
/**
* receives
*/
private String receives;
}

@ -1,37 +0,0 @@
package cn.hippo4j.starter.alarm;
import cn.hippo4j.common.model.PoolParameterInfo;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
/**
* Send message handler.
*
* @author chen.ma
* @date 2021/8/15 15:44
*/
public interface SendMessageHandler {
/**
* Get type.
*
* @return
*/
String getType();
/**
* Send alarm message.
*
* @param notifyConfig
* @param threadPoolExecutor
*/
void sendAlarmMessage(NotifyDTO notifyConfig, DynamicThreadPoolExecutor threadPoolExecutor);
/**
* Send change message.
*
* @param notifyConfig
* @param parameter
*/
void sendChangeMessage(NotifyDTO notifyConfig, PoolParameterInfo parameter);
}

@ -1,30 +0,0 @@
package cn.hippo4j.starter.alarm;
import cn.hippo4j.common.model.PoolParameterInfo;
import cn.hippo4j.common.notify.NotifyTypeEnum;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
/**
* Send msg.
*
* @author chen.ma
* @date 2021/8/15 15:31
*/
public interface SendMessageService {
/**
* Send alarm message.
*
* @param typeEnum
* @param threadPoolExecutor
*/
void sendAlarmMessage(NotifyTypeEnum typeEnum, DynamicThreadPoolExecutor threadPoolExecutor);
/**
* Send change message.
*
* @param parameter
*/
void sendChangeMessage(PoolParameterInfo parameter);
}

@ -1,134 +0,0 @@
package cn.hippo4j.starter.alarm;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.model.PoolParameterInfo;
import cn.hippo4j.common.notify.NotifyTypeEnum;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.starter.config.MessageAlarmConfiguration;
import cn.hippo4j.starter.toolkit.CalculateUtil;
import lombok.extern.slf4j.Slf4j;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
/**
* Alarm manage.
*
* @author chen.ma
* @date 2021/8/15 14:13
*/
@Slf4j
public class ThreadPoolAlarmManage {
/**
*
*/
private static final SendMessageService SEND_MESSAGE_SERVICE;
static {
SEND_MESSAGE_SERVICE = Optional.ofNullable(ApplicationContextHolder.getInstance())
.map(each -> each.getBean(MessageAlarmConfiguration.SEND_MESSAGE_BEAN_NAME, SendMessageService.class))
.orElse(null);
}
/**
* Check thread pool capacity alarm.
*
* @param threadPoolExecutor
*/
public static void checkPoolCapacityAlarm(DynamicThreadPoolExecutor threadPoolExecutor) {
if (SEND_MESSAGE_SERVICE == null) {
return;
}
/**
* TODO
* https://github.com/acmenlt/dynamic-threadpool/issues/100
*/
ThreadPoolAlarm threadPoolAlarm = new ThreadPoolAlarm(null, 80, 80);
BlockingQueue blockIngQueue = threadPoolExecutor.getQueue();
int queueSize = blockIngQueue.size();
int capacity = queueSize + blockIngQueue.remainingCapacity();
int divide = CalculateUtil.divide(queueSize, capacity);
boolean isSend = threadPoolAlarm.getIsAlarm()
&& divide > threadPoolAlarm.getCapacityAlarm()
&& isSendMessage(threadPoolExecutor, MessageTypeEnum.CAPACITY);
if (isSend) {
SEND_MESSAGE_SERVICE.sendAlarmMessage(NotifyTypeEnum.CAPACITY, threadPoolExecutor);
}
}
/**
* Check thread pool activity alarm.
*
* @param isCore
* @param threadPoolExecutor
*/
public static void checkPoolLivenessAlarm(boolean isCore, DynamicThreadPoolExecutor threadPoolExecutor) {
if (isCore || SEND_MESSAGE_SERVICE == null || !isSendMessage(threadPoolExecutor, MessageTypeEnum.LIVENESS)) {
return;
}
int activeCount = threadPoolExecutor.getActiveCount();
int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
int divide = CalculateUtil.divide(activeCount, maximumPoolSize);
/**
* TODO
* https://github.com/acmenlt/dynamic-threadpool/issues/100
*/
ThreadPoolAlarm threadPoolAlarm = new ThreadPoolAlarm(null, 80, 80);
boolean isSend = threadPoolAlarm.getIsAlarm()
&& divide > threadPoolAlarm.getLivenessAlarm()
&& isSendMessage(threadPoolExecutor, MessageTypeEnum.LIVENESS);
if (isSend) {
SEND_MESSAGE_SERVICE.sendAlarmMessage(NotifyTypeEnum.ACTIVITY, threadPoolExecutor);
}
}
/**
* Check thread pool reject policy alarm.
*
* @param threadPoolExecutor
*/
public static void checkPoolRejectAlarm(DynamicThreadPoolExecutor threadPoolExecutor) {
if (SEND_MESSAGE_SERVICE == null) {
return;
}
/**
* TODO
* https://github.com/acmenlt/dynamic-threadpool/issues/100
*/
ThreadPoolAlarm threadPoolAlarm = new ThreadPoolAlarm(null, 80, 80);
if (threadPoolAlarm.getIsAlarm() && isSendMessage(threadPoolExecutor, MessageTypeEnum.REJECT)) {
SEND_MESSAGE_SERVICE.sendAlarmMessage(NotifyTypeEnum.REJECT, threadPoolExecutor);
}
}
/**
* Send thread pool configuration change message.
*
* @param parameter
*/
public static void sendPoolConfigChange(PoolParameterInfo parameter) {
if (SEND_MESSAGE_SERVICE == null) {
return;
}
SEND_MESSAGE_SERVICE.sendChangeMessage(parameter);
}
/**
* Is send message.
*
* @param threadPoolExecutor
* @param typeEnum
* @return
*/
private static boolean isSendMessage(DynamicThreadPoolExecutor threadPoolExecutor, MessageTypeEnum typeEnum) {
// ignore
return true;
}
}

@ -1,82 +0,0 @@
package cn.hippo4j.starter.alarm.ding;
/**
* Ding alarm constants.
*
* @author chen.ma
* @date 2021/11/26 20:03
*/
public class DingAlarmConstants {
/**
* Url
*/
public static final String DING_ROBOT_SERVER_URL = "https://oapi.dingtalk.com/robot/send?access_token=";
/**
* 线
*/
public static final String DING_ALARM_TITLE = "动态线程池告警";
/**
* 线
*/
public static final String DING_NOTICE_TITLE = "动态线程池通知";
/**
* 线
*/
public static final String DING_ALARM_TXT =
"<font color='#FF0000'>[警报] </font>%s - 动态线程池运行告警 \n\n" +
" --- \n\n " +
"<font color='#708090' size=2>线程池ID%s</font> \n\n " +
"<font color='#708090' size=2>应用名称:%s</font> \n\n " +
"<font color='#778899' size=2>应用实例:%s</font> \n\n " +
"<font color='#778899' size=2>报警类型:%s</font> \n\n " +
" --- \n\n " +
"<font color='#708090' size=2>核心线程数:%d</font> \n\n " +
"<font color='#708090' size=2>最大线程数:%d</font> \n\n " +
"<font color='#708090' size=2>当前线程数:%d</font> \n\n " +
"<font color='#708090' size=2>活跃线程数:%d</font> \n\n " +
"<font color='#708090' size=2>最大任务数:%d</font> \n\n " +
"<font color='#708090' size=2>线程池任务总量:%d</font> \n\n " +
" --- \n\n " +
"<font color='#708090' size=2>队列类型:%s</font> \n\n " +
"<font color='#708090' size=2>队列容量:%d</font> \n\n " +
"<font color='#708090' size=2>队列元素个数:%d</font> \n\n " +
"<font color='#708090' size=2>队列剩余个数:%d</font> \n\n " +
" --- \n\n " +
"<font color='#708090' size=2>拒绝策略:%s</font> \n\n" +
"<font color='#708090' size=2>拒绝策略执行次数:</font><font color='#FF0000' size=2>%d</font> \n\n " +
"<font color='#708090' size=2>OWNER@%s</font> \n\n" +
"<font color='#708090' size=2>提示:%d 分钟内此线程池不会重复告警(可配置)</font> \n\n" +
" --- \n\n " +
"**播报时间:%s**";
/**
* 线
*/
public static final String DING_NOTICE_TXT =
"<font color='#2a9d8f'>[通知] </font>%s - 动态线程池参数变更 \n\n" +
" --- \n\n " +
"<font color='#708090' size=2>线程池ID%s</font> \n\n " +
"<font color='#708090' size=2>应用名称:%s</font> \n\n " +
"<font color='#778899' size=2>应用实例:%s</font> \n\n " +
" --- \n\n " +
"<font color='#708090' size=2>核心线程数:%s</font> \n\n " +
"<font color='#708090' size=2>最大线程数:%s</font> \n\n " +
"<font color='#708090' size=2>核心线程超时:%s</font> \n\n " +
"<font color='#708090' size=2>线程存活时间:%s / SECONDS</font> \n\n" +
" --- \n\n " +
"<font color='#708090' size=2>队列类型:%s</font> \n\n " +
"<font color='#708090' size=2>队列容量:%s</font> \n\n " +
" --- \n\n " +
"<font color='#708090' size=2>AGO 拒绝策略:%s</font> \n\n" +
"<font color='#708090' size=2>NOW 拒绝策略:%s</font> \n\n" +
" --- \n\n " +
"<font color='#708090' size=2>提示:动态线程池配置变更实时通知(无限制)</font> \n\n" +
"<font color='#708090' size=2>OWNER@%s</font> \n\n" +
" --- \n\n " +
"**播报时间:%s**";
}

@ -1,185 +0,0 @@
package cn.hippo4j.starter.alarm.ding;
import cn.hippo4j.common.enums.EnableEnum;
import cn.hippo4j.common.model.InstanceInfo;
import cn.hippo4j.common.model.PoolParameterInfo;
import cn.hippo4j.starter.alarm.NotifyDTO;
import cn.hippo4j.common.notify.NotifyPlatformEnum;
import cn.hippo4j.starter.alarm.SendMessageHandler;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.QueueTypeEnum;
import cn.hippo4j.core.executor.support.RejectedTypeEnum;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hutool.core.date.DateUtil;
import com.dingtalk.api.DefaultDingTalkClient;
import com.dingtalk.api.DingTalkClient;
import com.dingtalk.api.request.OapiRobotSendRequest;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.taobao.api.ApiException;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static cn.hippo4j.starter.alarm.ding.DingAlarmConstants.DING_ALARM_TXT;
import static cn.hippo4j.starter.alarm.ding.DingAlarmConstants.DING_NOTICE_TXT;
/**
* Send ding notification message.
*
* @author chen.ma
* @date 2021/8/15 15:49
*/
@Slf4j
@AllArgsConstructor
public class DingSendMessageHandler implements SendMessageHandler {
private final String active;
private final InstanceInfo instanceInfo;
@Override
public String getType() {
return NotifyPlatformEnum.DING.name();
}
@Override
public void sendAlarmMessage(NotifyDTO notifyConfig, DynamicThreadPoolExecutor pool) {
dingAlarmSendMessage(notifyConfig, pool);
}
@Override
public void sendChangeMessage(NotifyDTO notifyConfig, PoolParameterInfo parameter) {
dingChangeSendMessage(notifyConfig, parameter);
}
private void dingAlarmSendMessage(NotifyDTO notifyConfig, DynamicThreadPoolExecutor pool) {
String[] receives = notifyConfig.getReceives().split(",");
String afterReceives = Joiner.on(", @").join(receives);
BlockingQueue<Runnable> queue = pool.getQueue();
String text = String.format(
DING_ALARM_TXT,
// 环境
active.toUpperCase(),
// 线程池ID
pool.getThreadPoolId(),
// 应用名称
instanceInfo.getAppName(),
// 实例信息
instanceInfo.getIdentify(),
// 报警类型
notifyConfig.getTypeEnum(),
// 核心线程数
pool.getCorePoolSize(),
// 最大线程数
pool.getMaximumPoolSize(),
// 当前线程数
pool.getPoolSize(),
// 活跃线程数
pool.getActiveCount(),
// 最大任务数
pool.getLargestPoolSize(),
// 线程池任务总量
pool.getCompletedTaskCount(),
// 队列类型名称
queue.getClass().getSimpleName(),
// 队列容量
queue.size() + queue.remainingCapacity(),
// 队列元素个数
queue.size(),
// 队列剩余个数
queue.remainingCapacity(),
// 拒绝策略名称
pool.getRejectedExecutionHandler().getClass().getSimpleName(),
// 拒绝策略次数
pool.getRejectCountNum(),
// 告警手机号
afterReceives,
// 报警频率
notifyConfig.getInterval(),
// 当前时间
DateUtil.now()
);
execute(notifyConfig, DingAlarmConstants.DING_ALARM_TITLE, text, Lists.newArrayList(receives));
}
private void dingChangeSendMessage(NotifyDTO notifyConfig, PoolParameterInfo parameter) {
String threadPoolId = parameter.getTpId();
DynamicThreadPoolWrapper poolWrap = GlobalThreadPoolManage.getExecutorService(threadPoolId);
if (poolWrap == null) {
log.warn("Thread pool is empty when sending change notification, threadPoolId :: {}", threadPoolId);
return;
}
String[] receives = notifyConfig.getReceives().split(",");
String afterReceives = Joiner.on(", @").join(receives);
ThreadPoolExecutor customPool = poolWrap.getExecutor();
/**
* hesitant e.g.
*/
String text = String.format(
DING_NOTICE_TXT,
// 环境
active.toUpperCase(),
// 线程池名称
threadPoolId,
// 应用名称
instanceInfo.getAppName(),
// 实例信息
instanceInfo.getIdentify(),
// 核心线程数
customPool.getCorePoolSize() + " ➲ " + parameter.getCoreSize(),
// 最大线程数
customPool.getMaximumPoolSize() + " ➲ " + parameter.getMaxSize(),
// 核心线程超时
customPool.allowsCoreThreadTimeOut() + " ➲ " + EnableEnum.getBool(parameter.getAllowCoreThreadTimeOut()),
// 线程存活时间
customPool.getKeepAliveTime(TimeUnit.SECONDS) + " ➲ " + parameter.getKeepAliveTime(),
// 阻塞队列
QueueTypeEnum.getBlockingQueueNameByType(parameter.getQueueType()),
// 阻塞队列容量
(customPool.getQueue().size() + customPool.getQueue().remainingCapacity()) + " ➲ " + parameter.getCapacity(),
// 拒绝策略
customPool.getRejectedExecutionHandler().getClass().getSimpleName(),
RejectedTypeEnum.getRejectedNameByType(parameter.getRejectedType()),
// 告警手机号
afterReceives,
// 当前时间
DateUtil.now()
);
execute(notifyConfig, DingAlarmConstants.DING_NOTICE_TITLE, text, Lists.newArrayList(receives));
}
private void execute(NotifyDTO notifyConfig, String title, String text, List<String> mobiles) {
String serverUrl = DingAlarmConstants.DING_ROBOT_SERVER_URL + notifyConfig.getSecretKey();
DingTalkClient dingTalkClient = new DefaultDingTalkClient(serverUrl);
OapiRobotSendRequest request = new OapiRobotSendRequest();
request.setMsgtype("markdown");
OapiRobotSendRequest.Markdown markdown = new OapiRobotSendRequest.Markdown();
markdown.setTitle(title);
markdown.setText(text);
OapiRobotSendRequest.At at = new OapiRobotSendRequest.At();
at.setAtMobiles(mobiles);
request.setAt(at);
request.setMarkdown(markdown);
try {
dingTalkClient.execute(request);
} catch (ApiException ex) {
log.error("Ding failed to send message", ex);
}
}
}

@ -1,42 +0,0 @@
package cn.hippo4j.starter.alarm.lark;
/**
* lark alarm constants.
*
* @author imyzt
* @date 2021-11-23 19:29
*/
public class LarkAlarmConstants {
/**
* lark bot url
*/
public static final String LARK_BOT_URL = "https://open.feishu.cn/open-apis/bot/v2/hook/";
/**
* lark json
*/
public static final String ALARM_JSON_PATH = "classpath:properties/lark/alarm.json";
/**
* lark json
*/
public static final String NOTICE_JSON_PATH = "classpath:properties/lark/notice.json";
/**
* lark at format. openid
* openid,@
*/
public static final String LARK_AT_FORMAT_OPENID = "<at id='%s'></at>";
/**
* lark at format. username
* username,@username,@@
*/
public static final String LARK_AT_FORMAT_USERNAME = "<at id=''>%s</at>";
/**
* lark openid prefix
*/
public static final String LARK_OPENID_PREFIX = "ou_";
}

@ -1,61 +0,0 @@
package cn.hippo4j.starter.alarm.wechat;
/**
* Ding alarm constants.
*
* @author chen.ma
* @date 2021/11/26 20:03
*/
public class WeChatAlarmConstants {
/**
* Url
*/
public static final String WE_CHAT_SERVER_URL = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=";
/**
* 线
*/
public static final String WE_CHAT_ALARM_TXT =
"### <font color='#FF0000'>[警报] </font>%s - 动态线程池运行告警 \n" +
"> 线程池ID<font color=\"warning\">%s</font> \n" +
"> 应用名称:<font color=\"warning\">%s</font> \n" +
"> 应用实例:%s \n" +
"> 报警类型:%s \n" +
"> 核心线程数:%s \n" +
"> 最大线程数:%s \n" +
"> 当前线程数:%s \n" +
"> 活跃线程数:%s \n" +
"> 最大任务数:%s \n" +
"> 线程池任务总量:%s \n" +
"> 队列类型:%s \n" +
"> 队列容量:%s \n" +
"> 队列元素个数:%s \n" +
"> 队列剩余个数:%s \n" +
"> 拒绝策略:%s \n" +
"> 拒绝策略执行次数:%s \n" +
"> OWNER@%s \n" +
"> 提示:%d 分钟内此线程池不会重复告警(可配置) \n\n" +
"**播报时间:%s**";
/**
* 线
*/
public static final String WE_CHAT_NOTICE_TXT =
"### <font color=\"info\">[通知] </font>%s - 动态线程池参数变更 \n" +
"> 线程池ID<font color=\"warning\">%s</font> \n" +
"> 应用名称:<font color=\"warning\">%s</font> \n" +
"> 应用实例:%s \n" +
"> 核心线程数:%s \n" +
"> 最大线程数:%s \n" +
"> 核心线程超时:%s \n" +
"> 线程存活时间:%s / SECONDS \n" +
"> 队列类型:%s \n" +
"> 队列容量:%s \n" +
"> AGO 拒绝策略:%s \n" +
"> NOW 拒绝策略:%s \n" +
"> OWNER<@%s> \n" +
"> 提示:动态线程池配置变更实时通知(无限制) \n\n" +
"**播报时间:%s**";
}

@ -1,191 +0,0 @@
package cn.hippo4j.starter.alarm.wechat;
import cn.hippo4j.common.enums.EnableEnum;
import cn.hippo4j.common.model.InstanceInfo;
import cn.hippo4j.common.model.PoolParameterInfo;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.starter.alarm.NotifyDTO;
import cn.hippo4j.common.notify.NotifyPlatformEnum;
import cn.hippo4j.starter.alarm.SendMessageHandler;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.QueueTypeEnum;
import cn.hippo4j.core.executor.support.RejectedTypeEnum;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hutool.core.date.DateUtil;
import cn.hutool.http.HttpRequest;
import com.google.common.base.Joiner;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static cn.hippo4j.starter.alarm.wechat.WeChatAlarmConstants.*;
/**
* WeChat send message handler.
*
* @author chen.ma
* @date 2021/11/26 20:06
*/
@Slf4j
@AllArgsConstructor
public class WeChatSendMessageHandler implements SendMessageHandler {
private final String active;
private final InstanceInfo instanceInfo;
@Override
public String getType() {
return NotifyPlatformEnum.WECHAT.name();
}
@Override
public void sendAlarmMessage(NotifyDTO notifyConfig, DynamicThreadPoolExecutor pool) {
String[] receives = notifyConfig.getReceives().split(",");
String afterReceives = Joiner.on(", @").join(receives);
BlockingQueue<Runnable> queue = pool.getQueue();
String text = String.format(
WE_CHAT_ALARM_TXT,
// 环境
active.toUpperCase(),
// 线程池ID
pool.getThreadPoolId(),
// 应用名称
instanceInfo.getAppName(),
// 实例信息
instanceInfo.getIdentify(),
// 报警类型
notifyConfig.getTypeEnum(),
// 核心线程数
pool.getCorePoolSize(),
// 最大线程数
pool.getMaximumPoolSize(),
// 当前线程数
pool.getPoolSize(),
// 活跃线程数
pool.getActiveCount(),
// 最大任务数
pool.getLargestPoolSize(),
// 线程池任务总量
pool.getCompletedTaskCount(),
// 队列类型名称
queue.getClass().getSimpleName(),
// 队列容量
queue.size() + queue.remainingCapacity(),
// 队列元素个数
queue.size(),
// 队列剩余个数
queue.remainingCapacity(),
// 拒绝策略名称
pool.getRejectedExecutionHandler().getClass().getSimpleName(),
// 拒绝策略次数
pool.getRejectCountNum(),
// 告警手机号
afterReceives,
// 报警频率
notifyConfig.getInterval(),
// 当前时间
DateUtil.now()
);
execute(notifyConfig, text);
}
@Override
public void sendChangeMessage(NotifyDTO notifyConfig, PoolParameterInfo parameter) {
String threadPoolId = parameter.getTpId();
DynamicThreadPoolWrapper poolWrap = GlobalThreadPoolManage.getExecutorService(threadPoolId);
if (poolWrap == null) {
log.warn("Thread pool is empty when sending change notification, threadPoolId :: {}", threadPoolId);
return;
}
String[] receives = notifyConfig.getReceives().split(",");
String afterReceives = Joiner.on("><@").join(receives);
ThreadPoolExecutor customPool = poolWrap.getExecutor();
String text = String.format(
WE_CHAT_NOTICE_TXT,
// 环境
active.toUpperCase(),
// 线程池名称
threadPoolId,
// 应用名称
instanceInfo.getAppName(),
// 实例信息
instanceInfo.getIdentify(),
// 核心线程数
customPool.getCorePoolSize() + " ➲ " + parameter.getCoreSize(),
// 最大线程数
customPool.getMaximumPoolSize() + " ➲ " + parameter.getMaxSize(),
// 核心线程超时
customPool.allowsCoreThreadTimeOut() + " ➲ " + EnableEnum.getBool(parameter.getAllowCoreThreadTimeOut()),
// 线程存活时间
customPool.getKeepAliveTime(TimeUnit.SECONDS) + " ➲ " + parameter.getKeepAliveTime(),
// 阻塞队列
QueueTypeEnum.getBlockingQueueNameByType(parameter.getQueueType()),
// 阻塞队列容量
(customPool.getQueue().size() + customPool.getQueue().remainingCapacity()) + " ➲ " + parameter.getCapacity(),
// 拒绝策略
customPool.getRejectedExecutionHandler().getClass().getSimpleName(),
RejectedTypeEnum.getRejectedNameByType(parameter.getRejectedType()),
// 告警手机号
afterReceives,
// 当前时间
DateUtil.now()
);
execute(notifyConfig, text);
}
private void execute(NotifyDTO notifyConfig, String text) {
String serverUrl = WE_CHAT_SERVER_URL + notifyConfig.getSecretKey();
try {
WeChatReqDTO weChatReq = new WeChatReqDTO();
weChatReq.setMsgtype("markdown");
Markdown markdown = new Markdown();
markdown.setContent(text);
weChatReq.setMarkdown(markdown);
HttpRequest.post(serverUrl).body(JSONUtil.toJSONString(weChatReq)).execute();
} catch (Exception ex) {
log.error("WeChat failed to send message", ex);
}
}
@Data
@Accessors(chain = true)
public static class WeChatReqDTO {
/**
* msgType
*/
private String msgtype;
/**
* markdown
*/
private Markdown markdown;
}
@Data
public static class Markdown {
/**
* content
*/
private String content;
}
}

@ -2,10 +2,10 @@ package cn.hippo4j.starter.config;
import cn.hippo4j.common.model.InstanceInfo;
import cn.hippo4j.common.toolkit.ContentUtil;
import cn.hippo4j.core.toolkit.IdentifyUtil;
import cn.hippo4j.core.toolkit.inet.InetUtils;
import cn.hippo4j.starter.core.DiscoveryClient;
import cn.hippo4j.starter.remote.HttpAgent;
import cn.hippo4j.starter.toolkit.IdentifyUtil;
import cn.hippo4j.starter.toolkit.inet.InetUtils;
import cn.hutool.core.text.StrBuilder;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
@ -15,7 +15,7 @@ import org.springframework.core.env.ConfigurableEnvironment;
import java.net.InetAddress;
import static cn.hippo4j.common.constant.Constants.GROUP_KEY_DELIMITER;
import static cn.hippo4j.starter.config.DynamicThreadPoolAutoConfiguration.CLIENT_IDENTIFICATION_VALUE;
import static cn.hippo4j.core.toolkit.IdentifyUtil.CLIENT_IDENTIFICATION_VALUE;
import static cn.hippo4j.starter.toolkit.CloudCommonIdUtil.getDefaultInstanceId;
import static cn.hippo4j.starter.toolkit.CloudCommonIdUtil.getIpApplicationName;

@ -2,6 +2,9 @@ package cn.hippo4j.starter.config;
import cn.hippo4j.common.api.ThreadDetailState;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.core.refresh.ThreadPoolDynamicRefresh;
import cn.hippo4j.core.toolkit.IdentifyUtil;
import cn.hippo4j.core.toolkit.inet.InetUtils;
import cn.hippo4j.starter.controller.PoolRunStateController;
import cn.hippo4j.starter.controller.WebThreadPoolController;
import cn.hippo4j.starter.core.ConfigService;
@ -23,9 +26,6 @@ import cn.hippo4j.starter.monitor.send.MessageSender;
import cn.hippo4j.starter.remote.HttpAgent;
import cn.hippo4j.starter.remote.HttpScheduledHealthCheck;
import cn.hippo4j.starter.remote.ServerHealthCheck;
import cn.hippo4j.starter.toolkit.IdentifyUtil;
import cn.hippo4j.starter.toolkit.inet.InetUtils;
import cn.hutool.core.util.IdUtil;
import lombok.AllArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.ImportAutoConfiguration;
@ -51,15 +51,13 @@ import org.springframework.core.env.ConfigurableEnvironment;
@ConditionalOnBean(MarkerConfiguration.Marker.class)
@EnableConfigurationProperties(BootstrapProperties.class)
@ConditionalOnProperty(prefix = BootstrapProperties.PREFIX, value = "enable", matchIfMissing = true, havingValue = "true")
@ImportAutoConfiguration({HttpClientConfiguration.class, DiscoveryConfiguration.class, MessageAlarmConfiguration.class, UtilAutoConfiguration.class})
@ImportAutoConfiguration({HttpClientConfiguration.class, DiscoveryConfiguration.class, MessageNotifyConfiguration.class, UtilAutoConfiguration.class})
public class DynamicThreadPoolAutoConfiguration {
private final BootstrapProperties properties;
private final ConfigurableEnvironment environment;
public static final String CLIENT_IDENTIFICATION_VALUE = IdUtil.simpleUUID();
@Bean
public DynamicThreadPoolBannerHandler threadPoolBannerHandler() {
return new DynamicThreadPoolBannerHandler(properties);
@ -85,9 +83,11 @@ public class DynamicThreadPoolAutoConfiguration {
@Bean
@SuppressWarnings("all")
public DynamicThreadPoolPostProcessor threadPoolBeanPostProcessor(HttpAgent httpAgent, ThreadPoolOperation threadPoolOperation,
ApplicationContextHolder hippo4JApplicationContextHolder) {
return new DynamicThreadPoolPostProcessor(properties, httpAgent, threadPoolOperation);
public DynamicThreadPoolPostProcessor threadPoolBeanPostProcessor(HttpAgent httpAgent,
ThreadPoolOperation threadPoolOperation,
ApplicationContextHolder hippo4JApplicationContextHolder,
ThreadPoolDynamicRefresh threadPoolDynamicRefresh) {
return new DynamicThreadPoolPostProcessor(properties, httpAgent, threadPoolOperation, threadPoolDynamicRefresh);
}
@Bean

@ -1,65 +0,0 @@
package cn.hippo4j.starter.config;
import cn.hippo4j.common.model.InstanceInfo;
import cn.hippo4j.common.notify.AlarmControlHandler;
import cn.hippo4j.starter.alarm.BaseSendMessageService;
import cn.hippo4j.starter.alarm.SendMessageHandler;
import cn.hippo4j.starter.alarm.SendMessageService;
import cn.hippo4j.starter.alarm.ding.DingSendMessageHandler;
import cn.hippo4j.starter.alarm.lark.LarkSendMessageHandler;
import cn.hippo4j.starter.alarm.wechat.WeChatSendMessageHandler;
import cn.hippo4j.starter.remote.HttpAgent;
import lombok.AllArgsConstructor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.DependsOn;
import org.springframework.core.env.ConfigurableEnvironment;
/**
* Message alarm config.
*
* @author chen.ma
* @date 2021/8/15 15:39
*/
@AllArgsConstructor
public class MessageAlarmConfiguration {
private final BootstrapProperties properties;
private final InstanceInfo instanceInfo;
private ConfigurableEnvironment environment;
public static final String ACTIVE_DEFAULT = "unknown";
public static final String SEND_MESSAGE_BEAN_NAME = "hippo4JSendMessageService";
@DependsOn("hippo4JApplicationContextHolder")
@Bean(MessageAlarmConfiguration.SEND_MESSAGE_BEAN_NAME)
public SendMessageService hippo4JSendMessageService(HttpAgent httpAgent, AlarmControlHandler alarmControlHandler) {
return new BaseSendMessageService(httpAgent, properties, alarmControlHandler);
}
@Bean
public SendMessageHandler dingSendMessageHandler() {
String active = environment.getProperty("spring.profiles.active", ACTIVE_DEFAULT);
return new DingSendMessageHandler(active, instanceInfo);
}
@Bean
public SendMessageHandler larkSendMessageHandler() {
String active = environment.getProperty("spring.profiles.active", ACTIVE_DEFAULT);
return new LarkSendMessageHandler(active, instanceInfo);
}
@Bean
public SendMessageHandler weChatSendMessageHandler() {
String active = environment.getProperty("spring.profiles.active", ACTIVE_DEFAULT);
return new WeChatSendMessageHandler(active, instanceInfo);
}
@Bean
public AlarmControlHandler alarmControlHandler() {
return new AlarmControlHandler();
}
}

@ -0,0 +1,66 @@
package cn.hippo4j.starter.config;
import cn.hippo4j.common.notify.*;
import cn.hippo4j.common.notify.platform.DingSendMessageHandler;
import cn.hippo4j.common.notify.platform.LarkSendMessageHandler;
import cn.hippo4j.common.notify.platform.WeChatSendMessageHandler;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.refresh.ThreadPoolDynamicRefresh;
import cn.hippo4j.starter.notify.ServerNotifyConfigBuilder;
import cn.hippo4j.starter.remote.HttpAgent;
import lombok.AllArgsConstructor;
import org.springframework.context.annotation.Bean;
/**
* Message notify config.
*
* @author chen.ma
* @date 2021/8/15 15:39
*/
@AllArgsConstructor
public class MessageNotifyConfiguration {
@Bean
public AlarmControlHandler alarmControlHandler() {
return new AlarmControlHandler();
}
@Bean
public NotifyConfigBuilder notifyConfigBuilder(HttpAgent httpAgent,
BootstrapProperties properties,
AlarmControlHandler alarmControlHandler) {
return new ServerNotifyConfigBuilder(httpAgent, properties, alarmControlHandler);
}
@Bean
public HippoSendMessageService hippoSendMessageService(NotifyConfigBuilder notifyConfigBuilder,
AlarmControlHandler alarmControlHandler) {
return new BaseSendMessageServiceImpl(notifyConfigBuilder, alarmControlHandler);
}
@Bean
public ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler(HippoSendMessageService hippoSendMessageService) {
return new ThreadPoolNotifyAlarmHandler(hippoSendMessageService);
}
@Bean
public SendMessageHandler dingSendMessageHandler() {
return new DingSendMessageHandler();
}
@Bean
public SendMessageHandler larkSendMessageHandler() {
return new LarkSendMessageHandler();
}
@Bean
public SendMessageHandler weChatSendMessageHandler() {
return new WeChatSendMessageHandler();
}
@Bean
public ThreadPoolDynamicRefresh threadPoolDynamicRefresh(ThreadPoolNotifyAlarmHandler threadPoolNotifyAlarmHandler) {
return new ThreadPoolDynamicRefresh(threadPoolNotifyAlarmHandler);
}
}

@ -1,7 +1,7 @@
package cn.hippo4j.starter.config;
import cn.hippo4j.starter.toolkit.inet.InetUtils;
import cn.hippo4j.starter.toolkit.inet.InetUtilsProperties;
import cn.hippo4j.core.toolkit.inet.InetUtils;
import cn.hippo4j.core.toolkit.inet.InetUtilsProperties;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;

@ -1,22 +0,0 @@
package cn.hippo4j.starter.core;
import cn.hippo4j.core.refresh.ThreadPoolDynamicRefresh;
/**
* Config adapter.
*
* @author chen.ma
* @date 2021/6/22 21:29
*/
public class ConfigAdapter {
/**
* Callback Config.
*
* @param config
*/
public void callbackConfig(String config) {
ThreadPoolDynamicRefresh.refreshDynamicPool(config);
}
}

@ -4,16 +4,19 @@ import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.constant.Constants;
import cn.hippo4j.common.enums.EnableEnum;
import cn.hippo4j.common.model.PoolParameterInfo;
import cn.hippo4j.common.notify.ThreadPoolNotifyAlarm;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.refresh.ThreadPoolDynamicRefresh;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.*;
import cn.hippo4j.core.refresh.ThreadPoolDynamicRefresh;
import cn.hippo4j.starter.config.BootstrapProperties;
import cn.hippo4j.starter.remote.HttpAgent;
import cn.hippo4j.starter.toolkit.DynamicThreadPoolAnnotationUtil;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hutool.core.util.BooleanUtil;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
@ -46,6 +49,8 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
private final ThreadPoolOperation threadPoolOperation;
private final ThreadPoolDynamicRefresh threadPoolDynamicRefresh;
private final ExecutorService executorService = ThreadPoolBuilder.builder()
.corePoolSize(2)
.maxPoolNum(4)
@ -143,6 +148,13 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
// 设置动态线程池增强参数
if (dynamicThreadPoolWrap.getExecutor() instanceof AbstractDynamicExecutorSupport) {
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(
BooleanUtil.toBoolean(ppi.getIsAlarm().toString()),
ppi.getCapacityAlarm(),
ppi.getLivenessAlarm()
);
GlobalNotifyAlarmManage.put(tpId, threadPoolNotifyAlarm);
TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).getTaskDecorator();
((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskDecorator(taskDecorator);
@ -180,7 +192,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
*/
protected void subscribeConfig(DynamicThreadPoolWrapper dynamicThreadPoolWrap) {
if (dynamicThreadPoolWrap.isSubscribeFlag()) {
threadPoolOperation.subscribeConfig(dynamicThreadPoolWrap.getTpId(), executorService, config -> ThreadPoolDynamicRefresh.refreshDynamicPool(config));
threadPoolOperation.subscribeConfig(dynamicThreadPoolWrap.getTpId(), executorService, config -> threadPoolDynamicRefresh.refreshDynamicPool(config));
}
}

@ -4,7 +4,7 @@ import cn.hippo4j.common.model.PoolBaseInfo;
import cn.hippo4j.common.model.PoolRunStateInfo;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.starter.toolkit.CalculateUtil;
import cn.hippo4j.core.toolkit.CalculateUtil;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hutool.core.date.DateUtil;

@ -6,8 +6,8 @@ import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.core.toolkit.inet.InetUtils;
import cn.hippo4j.starter.toolkit.ByteConvertUtil;
import cn.hippo4j.starter.toolkit.inet.InetUtils;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.system.RuntimeInfo;
@ -17,7 +17,7 @@ import org.springframework.core.env.ConfigurableEnvironment;
import java.util.concurrent.ThreadPoolExecutor;
import static cn.hippo4j.starter.config.DynamicThreadPoolAutoConfiguration.CLIENT_IDENTIFICATION_VALUE;
import static cn.hippo4j.core.toolkit.IdentifyUtil.CLIENT_IDENTIFICATION_VALUE;
/**
* Thread pool run state service.

@ -14,7 +14,7 @@ import lombok.AllArgsConstructor;
import java.util.List;
import static cn.hippo4j.starter.toolkit.IdentifyUtil.getThreadPoolIdentify;
import static cn.hippo4j.core.toolkit.IdentifyUtil.getThreadPoolIdentify;
/**
* Thread pool runtime data collection.
@ -36,7 +36,7 @@ public class RunTimeInfoCollector extends AbstractThreadPoolRuntime implements C
for (String each : listThreadPoolId) {
PoolRunStateInfo poolRunState = getPoolRunState(each);
RuntimeMessage runtimeMessage = BeanUtil.toBean(poolRunState, RuntimeMessage.class);
runtimeMessage.setGroupKey(getThreadPoolIdentify(each, properties));
runtimeMessage.setGroupKey(getThreadPoolIdentify(each, properties.getItemId(), properties.getNamespace()));
runtimeMessages.add(runtimeMessage);
}

@ -0,0 +1,78 @@
package cn.hippo4j.starter.notify;
import cn.hippo4j.common.notify.AlarmControlHandler;
import cn.hippo4j.common.notify.NotifyConfigBuilder;
import cn.hippo4j.common.notify.NotifyConfigDTO;
import cn.hippo4j.common.notify.ThreadPoolNotifyDTO;
import cn.hippo4j.common.notify.request.ThreadPoolNotifyRequest;
import cn.hippo4j.common.toolkit.GroupKey;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.starter.config.BootstrapProperties;
import cn.hippo4j.starter.remote.HttpAgent;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Map;
import static cn.hippo4j.common.constant.Constants.BASE_PATH;
/**
* Server notify config builder.
*
* @author chen.ma
* @date 2022/2/24 19:57
*/
@Slf4j
@AllArgsConstructor
public class ServerNotifyConfigBuilder implements NotifyConfigBuilder {
private final HttpAgent httpAgent;
private final BootstrapProperties properties;
private final AlarmControlHandler alarmControlHandler;
@Override
public Map<String, List<NotifyConfigDTO>> buildNotify() {
Map<String, List<NotifyConfigDTO>> resultMap = Maps.newHashMap();
List<String> threadPoolIds = GlobalThreadPoolManage.listThreadPoolId();
if (CollUtil.isEmpty(threadPoolIds)) {
log.warn("The client does not have a dynamic thread pool instance configured.");
return resultMap;
}
List<String> groupKeys = Lists.newArrayList();
threadPoolIds.forEach(each -> {
String groupKey = GroupKey.getKeyTenant(each, properties.getItemId(), properties.getNamespace());
groupKeys.add(groupKey);
});
Result result = null;
try {
result = httpAgent.httpPostByDiscovery(BASE_PATH + "/notify/list/config", new ThreadPoolNotifyRequest(groupKeys));
} catch (Throwable ex) {
log.error("Get dynamic thread pool notify configuration error. message :: {}", ex.getMessage());
}
if (result != null && result.isSuccess() && result.getData() != null) {
String resultDataStr = JSONUtil.toJSONString(result.getData());
List<ThreadPoolNotifyDTO> resultData = JSONUtil.parseArray(resultDataStr, ThreadPoolNotifyDTO.class);
resultData.forEach(each -> resultMap.put(each.getNotifyKey(), each.getNotifyList()));
resultMap.forEach((key, val) ->
val.stream().filter(each -> StrUtil.equals("ALARM", each.getType()))
.forEach(each -> alarmControlHandler.initCacheAndLock(each.getThreadPoolId(), each.getPlatform(), each.getInterval()))
);
}
return resultMap;
}
}

@ -1,6 +1,6 @@
package cn.hippo4j.starter.toolkit;
import cn.hippo4j.starter.toolkit.inet.InetUtils;
import cn.hippo4j.core.toolkit.inet.InetUtils;
import lombok.SneakyThrows;
import org.springframework.core.env.PropertyResolver;

Loading…
Cancel
Save