From 280adef6342c3d1f82d0d7896d171abff42d58f3 Mon Sep 17 00:00:00 2001 From: Pan-YuJie <646836760@qq.com> Date: Mon, 26 Aug 2024 23:29:18 +0800 Subject: [PATCH] feat:Agent dynamic alarm Initialize --- .../spring-plugin-common/pom.xml | 6 + .../alarm/AgentModeNotifyConfigBuilder.java | 200 +++++++++++++++++ .../spring/common/conf/SpringBootConfig.java | 14 ++ .../support/SpringPropertiesLoader.java | 23 +- .../SpringThreadPoolRegisterSupport.java | 169 ++++++++++++-- .../support/ThreadPoolCheckAlarmSupport.java | 130 +++++++++++ .../common/toolkit/SpringPropertyBinder.java | 208 ++++++++++++++++++ .../agent/agent-example-core/pom.xml | 47 ++++ .../core/config}/ThreadPoolConfiguration.java | 4 +- .../core/inittest/AlarmSendMessageTest.java | 87 ++++++++ .../agent/config-apollo/pom.xml | 6 + .../AgentConfigApolloExampleApplication.java | 2 +- .../agent/config-nacos/pom.xml | 5 + .../AgentConfigNacosExampleApplication.java | 2 +- .../config/nacos/ThreadPoolConfiguration.java | 55 ----- examples/threadpool-example/agent/pom.xml | 1 + .../cn/hippo4j/common/constant/Constants.java | 9 + 17 files changed, 895 insertions(+), 73 deletions(-) create mode 100644 agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/alarm/AgentModeNotifyConfigBuilder.java create mode 100644 agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/support/ThreadPoolCheckAlarmSupport.java create mode 100644 agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/toolkit/SpringPropertyBinder.java create mode 100644 examples/threadpool-example/agent/agent-example-core/pom.xml rename examples/threadpool-example/agent/{config-apollo/src/main/java/cn/hippo4j/example/agent/config/apollo => agent-example-core/src/main/java/cn/hippo4j/example/agent/core/config}/ThreadPoolConfiguration.java (92%) create mode 100644 examples/threadpool-example/agent/agent-example-core/src/main/java/cn/hippo4j/example/agent/core/inittest/AlarmSendMessageTest.java delete mode 100644 examples/threadpool-example/agent/config-nacos/src/main/java/cn/hippo4j/example/agent/config/nacos/ThreadPoolConfiguration.java diff --git a/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/pom.xml b/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/pom.xml index f0336260..c3780048 100644 --- a/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/pom.xml +++ b/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/pom.xml @@ -43,5 +43,11 @@ ${project.version} provided + + cn.hippo4j + hippo4j-threadpool-kernel-alarm + ${project.version} + provided + \ No newline at end of file diff --git a/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/alarm/AgentModeNotifyConfigBuilder.java b/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/alarm/AgentModeNotifyConfigBuilder.java new file mode 100644 index 00000000..d1345334 --- /dev/null +++ b/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/alarm/AgentModeNotifyConfigBuilder.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.agent.plugin.spring.common.alarm; + +import cn.hippo4j.agent.plugin.spring.common.support.ThreadPoolCheckAlarmSupport; +import cn.hippo4j.common.api.IExecutorProperties; +import cn.hippo4j.common.model.executor.ExecutorNotifyProperties; +import cn.hippo4j.common.model.executor.ExecutorProperties; +import cn.hippo4j.common.toolkit.CollectionUtil; +import cn.hippo4j.common.toolkit.StringUtil; +import cn.hippo4j.threadpool.dynamic.mode.config.properties.NotifyPlatformProperties; +import cn.hippo4j.threadpool.message.api.NotifyConfigBuilder; +import cn.hippo4j.threadpool.message.api.NotifyConfigDTO; +import cn.hippo4j.threadpool.message.core.service.AlarmControlHandler; +import lombok.AllArgsConstructor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +import static cn.hippo4j.agent.plugin.spring.common.support.SpringPropertiesLoader.BOOTSTRAP_CONFIG_PROPERTIES; +import static cn.hippo4j.common.constant.Constants.DEFAULT_INTERVAL; + +/** + * This class is responsible for building the notification configurations for thread pools in an agent mode. + * It implements the {@link NotifyConfigBuilder} interface and provides methods to build and initialize + * notification configurations for various platforms and types (e.g., ALARM, CONFIG). + * + *

The configuration is based on the properties loaded from the bootstrap configuration and includes + * handling for alarm control and notification intervals.

+ */ +@AllArgsConstructor +public class AgentModeNotifyConfigBuilder implements NotifyConfigBuilder { + + private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolCheckAlarmSupport.class); + + private final AlarmControlHandler alarmControlHandler; + + /** + * Builds the notification configurations for all executors defined in the bootstrap configuration. + * + *

This method filters the executors based on their alarm settings and constructs the notification + * configurations accordingly. If global alarm settings are disabled and there are no specific alarms + * configured for any executor, the method returns an empty map.

+ * + * @return A map containing the notification configurations, keyed by the notification type (e.g., ALARM, CONFIG). + */ + public Map> buildNotify() { + Map> resultMap = new HashMap<>(); + boolean globalAlarm = Optional.ofNullable(BOOTSTRAP_CONFIG_PROPERTIES.getDefaultExecutor()) + .map(ExecutorProperties::getAlarm) + .orElse(true); + List executors = BOOTSTRAP_CONFIG_PROPERTIES.getExecutors(); + if (CollectionUtil.isEmpty(executors)) { + LOGGER.warn("Failed to build notify, executors configuration is empty."); + return resultMap; + } + List actual = executors.stream() + .filter(each -> Optional.ofNullable(each.getAlarm()) + .orElse(false)) + .collect(Collectors.toList()); + if (!globalAlarm && CollectionUtil.isEmpty(actual)) { + return resultMap; + } + for (ExecutorProperties executorProperties : executors) { + Map> buildSingleNotifyConfig = buildSingleNotifyConfig(executorProperties); + initCacheAndLock(buildSingleNotifyConfig); + resultMap.putAll(buildSingleNotifyConfig); + } + + return resultMap; + } + + /** + * Builds the notification configurations for a single executor. + * + *

This method generates two types of notifications: ALARM and CONFIG. For each type, it creates + * notification configurations based on the platforms defined in the bootstrap configuration.

+ * + * @param executorProperties The properties of the executor for which to build the notification configurations. + * @return A map containing the notification configurations for the given executor, keyed by the notification type. + */ + public Map> buildSingleNotifyConfig(IExecutorProperties executorProperties) { + String threadPoolId = executorProperties.getThreadPoolId(); + Map> resultMap = new HashMap<>(); + String alarmBuildKey = threadPoolId + "+ALARM"; + List alarmNotifyConfigs = new ArrayList<>(); + List notifyPlatforms = BOOTSTRAP_CONFIG_PROPERTIES.getNotifyPlatforms(); + for (NotifyPlatformProperties platformProperties : notifyPlatforms) { + NotifyConfigDTO notifyConfig = new NotifyConfigDTO(); + notifyConfig.setPlatform(platformProperties.getPlatform()); + notifyConfig.setTpId(threadPoolId); + notifyConfig.setType("ALARM"); + notifyConfig.setSecret(platformProperties.getSecret()); + notifyConfig.setSecretKey(getToken(platformProperties)); + notifyConfig.setInterval(buildInterval(executorProperties)); + notifyConfig.setReceives(buildReceive(executorProperties)); + alarmNotifyConfigs.add(notifyConfig); + } + resultMap.put(alarmBuildKey, alarmNotifyConfigs); + String changeBuildKey = threadPoolId + "+CONFIG"; + List changeNotifyConfigs = new ArrayList<>(); + for (NotifyPlatformProperties platformProperties : notifyPlatforms) { + NotifyConfigDTO notifyConfig = new NotifyConfigDTO(); + notifyConfig.setPlatform(platformProperties.getPlatform()); + notifyConfig.setTpId(threadPoolId); + notifyConfig.setType("CONFIG"); + notifyConfig.setSecretKey(getToken(platformProperties)); + notifyConfig.setSecret(platformProperties.getSecret()); + notifyConfig.setReceives(buildReceive(executorProperties)); + changeNotifyConfigs.add(notifyConfig); + } + resultMap.put(changeBuildKey, changeNotifyConfigs); + return resultMap; + } + + /** + * Retrieves the token for the given notification platform properties. + * + *

If the token is not explicitly set, the method returns the secret key as the fallback.

+ * + * @param platformProperties The platform properties from which to retrieve the token. + * @return The token or secret key associated with the given platform properties. + */ + private String getToken(NotifyPlatformProperties platformProperties) { + return StringUtil.isNotBlank(platformProperties.getToken()) ? platformProperties.getToken() : platformProperties.getSecretKey(); + } + + /** + * Builds the notification interval for the given executor properties. + * + *

This method first checks the executor's specific notify configuration. If not set, it falls back + * to the default executor configuration in the bootstrap properties.

+ * + * @param executorProperties The properties of the executor for which to build the notification interval. + * @return The notification interval in seconds. + */ + private int buildInterval(IExecutorProperties executorProperties) { + return Optional.ofNullable(executorProperties.getNotify()) + .map(ExecutorNotifyProperties::getInterval) + .orElse(Optional.ofNullable(BOOTSTRAP_CONFIG_PROPERTIES.getDefaultExecutor()) + .map(ExecutorProperties::getNotify) + .map(ExecutorNotifyProperties::getInterval) + .orElse(DEFAULT_INTERVAL)); + } + + /** + * Builds the notification recipients for the given executor properties. + * + *

This method first checks the executor's specific notify configuration. If not set, it falls back + * to the default executor configuration in the bootstrap properties.

+ * + * @param executorProperties The properties of the executor for which to build the notification recipients. + * @return A string containing the recipients of the notifications. + */ + private String buildReceive(IExecutorProperties executorProperties) { + return Optional.ofNullable(executorProperties.getNotify()) + .map(ExecutorNotifyProperties::getReceives) + .orElse(Optional.ofNullable(BOOTSTRAP_CONFIG_PROPERTIES.getDefaultExecutor()) + .map(ExecutorProperties::getNotify) + .map(ExecutorNotifyProperties::getReceives).orElse("")); + } + + /** + * Initializes the cache and lock mechanisms for the given notification configurations. + * + *

This method is primarily responsible for setting up alarm controls based on the notification + * configurations, ensuring that the appropriate cache and lock mechanisms are initialized for + * each thread pool and platform combination.

+ * + * @param buildSingleNotifyConfig A map containing the notification configurations that need cache and lock initialization. + */ + public void initCacheAndLock(Map> buildSingleNotifyConfig) { + buildSingleNotifyConfig.forEach( + (key, val) -> val.stream() + .filter(each -> Objects.equals("ALARM", each.getType())) + .forEach(each -> alarmControlHandler.initCacheAndLock(each.getTpId(), each.getPlatform(), each.getInterval()))); + } +} diff --git a/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/conf/SpringBootConfig.java b/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/conf/SpringBootConfig.java index 9eaecfba..b93149a2 100644 --- a/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/conf/SpringBootConfig.java +++ b/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/conf/SpringBootConfig.java @@ -84,5 +84,19 @@ public class SpringBootConfig { public static String CONFIG_FILE_TYPE; } } + + @SpringBootConfigNode(root = SpringBootConfig.class) + public static class Application { + + public static String name = ""; + + } + + @SpringBootConfigNode(root = SpringBootConfig.class) + public static class Profiles { + + public static String active = ""; + + } } } diff --git a/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/support/SpringPropertiesLoader.java b/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/support/SpringPropertiesLoader.java index 8b215ec0..bb6e521f 100644 --- a/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/support/SpringPropertiesLoader.java +++ b/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/support/SpringPropertiesLoader.java @@ -18,16 +18,20 @@ package cn.hippo4j.agent.plugin.spring.common.support; import cn.hippo4j.agent.core.boot.SpringBootConfigInitializer; +import cn.hippo4j.agent.plugin.spring.common.toolkit.SpringPropertyBinder; import cn.hippo4j.common.logging.api.ILog; import cn.hippo4j.common.logging.api.LogManager; +import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties; import org.springframework.core.env.ConfigurableEnvironment; import org.springframework.core.env.EnumerablePropertySource; +import org.springframework.core.env.PropertiesPropertySource; import org.springframework.core.env.PropertySource; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Properties; +import static cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties.PREFIX; /** * Spring properties loader @@ -36,6 +40,8 @@ public class SpringPropertiesLoader { private static final ILog LOGGER = LogManager.getLogger(SpringPropertiesLoader.class); + public static BootstrapConfigProperties BOOTSTRAP_CONFIG_PROPERTIES = new BootstrapConfigProperties(); + public static void loadSpringProperties(ConfigurableEnvironment environment) { Iterator> iterator = environment.getPropertySources().iterator(); Properties properties = new Properties(); @@ -43,7 +49,15 @@ public class SpringPropertiesLoader { while (iterator.hasNext()) { propertySourceList.add(iterator.next()); } - for (int i = propertySourceList.size() - 1; i >= 0; i--) { + // Sort to ensure that the configuration in the configuration center is after the array + // To get the latest configuration information + propertySourceList.sort((o1, o2) -> { + boolean o1Contains = o1.getName().toLowerCase().contains("apollo") || o1.getName().toLowerCase().contains("nacos"); + boolean o2Contains = (o2.getName().toLowerCase().contains("apollo") || o2.getName().toLowerCase().contains("nacos")); + return Boolean.compare(o1Contains, o2Contains); + }); + + for (int i = 0; i <= propertySourceList.size() - 1; i++) { PropertySource propertySource = propertySourceList.get(i); if (!(propertySource instanceof EnumerablePropertySource)) { LOGGER.warn("Skip propertySource[{}] because {} not enumerable.", propertySource.getName(), propertySource.getClass()); @@ -65,5 +79,12 @@ public class SpringPropertiesLoader { } } SpringBootConfigInitializer.setSpringProperties(properties); + PropertiesPropertySource propertySource = new PropertiesPropertySource("customPropertySource", properties); + environment.getPropertySources().addFirst(propertySource); + // initialize BootstrapConfigProperties + BOOTSTRAP_CONFIG_PROPERTIES = SpringPropertyBinder.bindProperties(environment, PREFIX, BootstrapConfigProperties.class); + + ThreadPoolCheckAlarmSupport.enableThreadPoolCheckAlarmHandler(); } + } diff --git a/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/support/SpringThreadPoolRegisterSupport.java b/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/support/SpringThreadPoolRegisterSupport.java index 65a9f4cb..2af5a280 100644 --- a/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/support/SpringThreadPoolRegisterSupport.java +++ b/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/support/SpringThreadPoolRegisterSupport.java @@ -22,17 +22,30 @@ import cn.hippo4j.common.executor.ThreadPoolExecutorRegistry; import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum; import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum; import cn.hippo4j.common.handler.DynamicThreadPoolAdapterChoose; +import cn.hippo4j.common.model.executor.ExecutorNotifyProperties; import cn.hippo4j.common.model.executor.ExecutorProperties; -import cn.hippo4j.common.toolkit.BooleanUtil; +import cn.hippo4j.common.toolkit.StringUtil; +import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil; +import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties; +import cn.hippo4j.threadpool.message.core.service.GlobalNotifyAlarmManage; + +import cn.hippo4j.threadpool.message.core.service.ThreadPoolNotifyAlarm; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.ApplicationContext; import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static cn.hippo4j.common.constant.Constants.DYNAMIC_THREAD_POOL_EXECUTOR; /** * Spring thread pool register support @@ -41,6 +54,14 @@ public class SpringThreadPoolRegisterSupport { private static final Logger LOGGER = LoggerFactory.getLogger(SpringThreadPoolRegisterSupport.class); + private static final int DEFAULT_ACTIVE_ALARM = 80; + + private static final int DEFAULT_CAPACITY_ALARM = 80; + + private static final int DEFAULT_INTERVAL = 5; + + private static final String DEFAULT_RECEIVES = ""; + public static void registerThreadPoolInstances(ApplicationContext context) { Map> referencedClassMap = ThreadPoolExecutorRegistry.REFERENCED_CLASS_MAP; for (Map.Entry> entry : referencedClassMap.entrySet()) { @@ -52,7 +73,7 @@ public class SpringThreadPoolRegisterSupport { Object value = field.get(null); if (value == enhancedInstance) { String threadPoolId = declaredClass.getName() + "#" + field.getName(); - register(threadPoolId, enhancedInstance); + register(threadPoolId, enhancedInstance, Boolean.TRUE); break; } } catch (IllegalAccessException e) { @@ -74,25 +95,147 @@ public class SpringThreadPoolRegisterSupport { if (executor == null) { LOGGER.warn("[Hippo4j-Agent] Thread pool is null, ignore bean registration. beanName={}, beanClass={}", beanName, bean.getClass().getName()); } else { - register(beanName, executor); + register(beanName, executor, Boolean.FALSE); } } LOGGER.info("[Hippo4j-Agent] Registered thread pool instances successfully."); } - public static void register(String threadPoolId, ThreadPoolExecutor executor) { + public static void register(String threadPoolId, ThreadPoolExecutor executor, Boolean isAgentScanEnhancePool) { if (executor == null) { return; } - ExecutorProperties executorProperties = ExecutorProperties.builder() - .threadPoolId(threadPoolId) - .corePoolSize(executor.getCorePoolSize()) - .maximumPoolSize(executor.getMaximumPoolSize()) - .allowCoreThreadTimeOut(BooleanUtil.toBoolean(String.valueOf(executor.allowsCoreThreadTimeOut()))) - .blockingQueue(BlockingQueueTypeEnum.getBlockingQueueTypeEnumByName(executor.getQueue().getClass().getSimpleName()).getName()) - .queueCapacity(executor.getQueue().remainingCapacity()) - .rejectedHandler(RejectedPolicyTypeEnum.getRejectedPolicyTypeEnumByName(executor.getRejectedExecutionHandler().getClass().getSimpleName()).getName()) - .build(); + ExecutorProperties executorProperties = SpringPropertiesLoader.BOOTSTRAP_CONFIG_PROPERTIES.getExecutors().stream() + .filter(each -> Objects.equals(threadPoolId, each.getThreadPoolId())) + .findFirst() + .orElse(null); + + // Determines the thread pool that is currently obtained by bean scanning + if (Objects.isNull(executorProperties)) { + if (isAgentScanEnhancePool) { + throw new RuntimeException(String.format("The thread pool id [%s] does not exist in the configuration.", threadPoolId)); + } else { + // Thread pool that do not require enhancement are skipped by the agent + return; + } + } + + try { + executorProperties = buildActualExecutorProperties(executorProperties); + // Replace the original configuration and refresh the thread pool + threadPoolParamReplace(executor, executorProperties); + } catch (Exception ex) { + LOGGER.error("[Hippo4j-Agent] Failed to initialize thread pool configuration.", ex); + } + // Build notification information entity + ThreadPoolNotifyAlarm threadPoolNotifyAlarm = buildThreadPoolNotifyAlarm(executorProperties); + GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm); + ThreadPoolExecutorRegistry.putHolder(threadPoolId, executor, executorProperties); } + + /** + * Thread-pool param replace. + * + * @param executor dynamic thread-pool executor + * @param executorProperties executor properties + */ + private static void threadPoolParamReplace(ThreadPoolExecutor executor, ExecutorProperties executorProperties) { + BlockingQueue workQueue = BlockingQueueTypeEnum.createBlockingQueue(executorProperties.getBlockingQueue(), executorProperties.getQueueCapacity()); + cn.hippo4j.common.toolkit.ReflectUtil.setFieldValue(executor, "workQueue", workQueue); + ThreadPoolExecutorUtil.safeSetPoolSize(executor, executorProperties.getCorePoolSize(), executorProperties.getMaximumPoolSize()); + executor.setKeepAliveTime(executorProperties.getKeepAliveTime(), TimeUnit.SECONDS); + executor.allowCoreThreadTimeOut(executorProperties.getAllowCoreThreadTimeOut()); + executor.setRejectedExecutionHandler(RejectedPolicyTypeEnum.createPolicy(executorProperties.getRejectedHandler())); + // Reflection sets the thread pool setExecuteTimeOut + if (DYNAMIC_THREAD_POOL_EXECUTOR.equals(executor.getClass().getName())) { + try { + Method setExecuteTimeOutMethod = executor.getClass().getMethod("setExecuteTimeOut", Long.class); + Long executeTimeOut = executorProperties.getExecuteTimeOut(); + if (executeTimeOut != null) { + setExecuteTimeOutMethod.invoke(executor, executeTimeOut); + } + } catch (Exception e) { + LOGGER.error("[Hippo4j-Agent] Failed to set executeTimeOut.", e); + } + } + } + + /** + * Build actual executor properties. + * + * @param executorProperties executor properties + * @return executor properties + */ + private static ExecutorProperties buildActualExecutorProperties(ExecutorProperties executorProperties) { + return Optional.ofNullable(SpringPropertiesLoader.BOOTSTRAP_CONFIG_PROPERTIES.getDefaultExecutor()).map(each -> buildExecutorProperties(executorProperties)).orElse(executorProperties); + } + + /** + * Build executor properties. + * + * @param executorProperties executor properties + * @return executor properties + */ + private static ExecutorProperties buildExecutorProperties(ExecutorProperties executorProperties) { + BootstrapConfigProperties configProperties = SpringPropertiesLoader.BOOTSTRAP_CONFIG_PROPERTIES; + return ExecutorProperties.builder() + .corePoolSize(Optional.ofNullable(executorProperties.getCorePoolSize()) + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getCorePoolSize).get())) + .maximumPoolSize(Optional.ofNullable(executorProperties.getMaximumPoolSize()) + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getMaximumPoolSize).get())) + .allowCoreThreadTimeOut(Optional.ofNullable(executorProperties.getAllowCoreThreadTimeOut()) + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getAllowCoreThreadTimeOut).get())) + .keepAliveTime(Optional.ofNullable(executorProperties.getKeepAliveTime()) + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getKeepAliveTime).get())) + .blockingQueue(Optional.ofNullable(executorProperties.getBlockingQueue()) + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getBlockingQueue).get())) + .executeTimeOut(Optional.ofNullable(executorProperties.getExecuteTimeOut()) + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getExecuteTimeOut).orElse(0L))) + .queueCapacity(Optional.ofNullable(executorProperties.getQueueCapacity()) + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getQueueCapacity).get())) + .rejectedHandler(Optional.ofNullable(executorProperties.getRejectedHandler()) + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getRejectedHandler).get())) + .threadNamePrefix(StringUtil.isBlank(executorProperties.getThreadNamePrefix()) ? executorProperties.getThreadPoolId() : executorProperties.getThreadNamePrefix()) + .threadPoolId(executorProperties.getThreadPoolId()) + .alarm(Optional.ofNullable(executorProperties.getAlarm()) + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getAlarm).orElse(null))) + .activeAlarm(Optional.ofNullable(executorProperties.getActiveAlarm()) + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getActiveAlarm).orElse(null))) + .capacityAlarm(Optional.ofNullable(executorProperties.getCapacityAlarm()) + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getCapacityAlarm).orElse(null))) + .notify(Optional.ofNullable(executorProperties.getNotify()) + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getNotify).orElse(null))) + .nodes(Optional.ofNullable(executorProperties.getNodes()) + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getNodes).orElse(null))) + .build(); + } + + /** + * Build thread-pool notify alarm + * + * @param executorProperties executor properties + * @return thread-pool notify alarm + */ + private static ThreadPoolNotifyAlarm buildThreadPoolNotifyAlarm(ExecutorProperties executorProperties) { + BootstrapConfigProperties configProperties = SpringPropertiesLoader.BOOTSTRAP_CONFIG_PROPERTIES; + ExecutorNotifyProperties notify = Optional.ofNullable(executorProperties).map(ExecutorProperties::getNotify).orElse(null); + boolean isAlarm = Optional.ofNullable(executorProperties.getAlarm()) + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getAlarm).orElse(true)); + int activeAlarm = Optional.ofNullable(executorProperties.getActiveAlarm()) + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getActiveAlarm).orElse(DEFAULT_ACTIVE_ALARM)); + int capacityAlarm = Optional.ofNullable(executorProperties.getCapacityAlarm()) + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getCapacityAlarm).orElse(DEFAULT_CAPACITY_ALARM)); + int interval = Optional.ofNullable(notify) + .map(ExecutorNotifyProperties::getInterval) + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getNotify).map(ExecutorNotifyProperties::getInterval).orElse(DEFAULT_INTERVAL)); + String receive = Optional.ofNullable(notify) + .map(ExecutorNotifyProperties::getReceives) + .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getNotify).map(ExecutorNotifyProperties::getReceives).orElse(DEFAULT_RECEIVES)); + ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(isAlarm, activeAlarm, capacityAlarm); + threadPoolNotifyAlarm.setInterval(interval); + threadPoolNotifyAlarm.setReceives(receive); + return threadPoolNotifyAlarm; + } + } diff --git a/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/support/ThreadPoolCheckAlarmSupport.java b/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/support/ThreadPoolCheckAlarmSupport.java new file mode 100644 index 00000000..3fdf8ec1 --- /dev/null +++ b/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/support/ThreadPoolCheckAlarmSupport.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.agent.plugin.spring.common.support; + +import cn.hippo4j.agent.plugin.spring.common.alarm.AgentModeNotifyConfigBuilder; +import cn.hippo4j.agent.plugin.spring.common.conf.SpringBootConfig; +import cn.hippo4j.common.propertie.EnvironmentProperties; +import cn.hippo4j.threadpool.alarm.handler.DefaultThreadPoolCheckAlarmHandler; +import cn.hippo4j.threadpool.message.api.NotifyConfigDTO; +import cn.hippo4j.threadpool.message.core.platform.DingSendMessageHandler; +import cn.hippo4j.threadpool.message.core.platform.LarkSendMessageHandler; +import cn.hippo4j.threadpool.message.core.platform.WeChatSendMessageHandler; +import cn.hippo4j.threadpool.message.core.service.AlarmControlHandler; +import cn.hippo4j.threadpool.message.core.service.SendMessageHandler; +import cn.hippo4j.threadpool.message.core.service.ThreadPoolBaseSendMessageService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; + +import static cn.hippo4j.agent.plugin.spring.common.support.SpringPropertiesLoader.BOOTSTRAP_CONFIG_PROPERTIES; + +/** + * The {@code ThreadPoolCheckAlarmSupport} class provides functionality to enable and configure + * a thread pool check alarm handler. This is typically used to monitor thread pools for potential + * issues and send notifications based on the configured alert mechanisms. + */ +public class ThreadPoolCheckAlarmSupport { + + private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolCheckAlarmSupport.class); + + /** + * Enables the thread pool check alarm handler if the corresponding configuration property is set to {@code true}. + *

+ * This method performs the following actions: + *

    + *
  • Checks the value of the {@code enable} property in the bootstrap configuration. If it is {@code true}, it proceeds.
  • + *
  • Initializes environment properties needed for the monitoring process.
  • + *
  • Creates an instance of {@link AlarmControlHandler} and {@link ThreadPoolBaseSendMessageService} with necessary dependencies.
  • + *
  • Initializes and registers message handlers and notification configurations.
  • + *
  • Creates an instance of {@link DefaultThreadPoolCheckAlarmHandler} and schedules it to start monitoring the thread pool.
  • + *
+ */ + public static void enableThreadPoolCheckAlarmHandler() { + // Check if the thread pool checker is enabled in the bootstrap configuration properties + if (Boolean.TRUE.equals(BOOTSTRAP_CONFIG_PROPERTIES.getEnable())) { + + // Initialize EnvironmentProperties + initializeEnvironmentProperties(); + + // Initialize the AlarmControlHandler and ThreadPoolBaseSendMessageService + AlarmControlHandler alarmControlHandler = new AlarmControlHandler(); + ThreadPoolBaseSendMessageService threadPoolBaseSendMessageService = createThreadPoolBaseSendMessageService(alarmControlHandler); + + // Initialize the alarm platform information + initializeSendMessageHandlers(threadPoolBaseSendMessageService, alarmControlHandler); + + // Initialize the thread pool check alarm handler with necessary services + DefaultThreadPoolCheckAlarmHandler checkAlarmHandler = new DefaultThreadPoolCheckAlarmHandler(threadPoolBaseSendMessageService); + + // Run the check alarm handler to start monitoring the thread pool + checkAlarmHandler.scheduleExecute(); + } + } + + /** + * Initializes environment properties used for thread pool monitoring. + *

+ * This method sets the state check interval, item ID, application name, and active profile from the bootstrap configuration. + */ + private static void initializeEnvironmentProperties() { + EnvironmentProperties.checkStateInterval = Long.valueOf(BOOTSTRAP_CONFIG_PROPERTIES.getCheckStateInterval()); + EnvironmentProperties.itemId = BOOTSTRAP_CONFIG_PROPERTIES.getItemId(); + EnvironmentProperties.applicationName = SpringBootConfig.Spring.Application.name; + EnvironmentProperties.active = SpringBootConfig.Spring.Profiles.active; + } + + /** + * Creates and returns a new instance of {@link ThreadPoolBaseSendMessageService} with the specified {@link AlarmControlHandler}. + * + * @param alarmControlHandler The {@link AlarmControlHandler} used to control and handle alarms. + * @return A new instance of {@link ThreadPoolBaseSendMessageService}. + */ + private static ThreadPoolBaseSendMessageService createThreadPoolBaseSendMessageService(AlarmControlHandler alarmControlHandler) { + return new ThreadPoolBaseSendMessageService(alarmControlHandler); + } + + /** + * Initializes and registers the message handlers and notification configurations in the specified + * {@link ThreadPoolBaseSendMessageService}. + *

+ * This method creates instances of various {@link SendMessageHandler} implementations and registers them. + * It also constructs and registers notification configurations using the {@link AgentModeNotifyConfigBuilder}. + * + * @param threadPoolBaseSendMessageService The {@link ThreadPoolBaseSendMessageService} in which message handlers and notification configurations will be registered. + * @param alarmControlHandler The {@link AlarmControlHandler} used to handle alarms and notifications. + */ + private static void initializeSendMessageHandlers(ThreadPoolBaseSendMessageService threadPoolBaseSendMessageService, AlarmControlHandler alarmControlHandler) { + // Initialize message handlers + DingSendMessageHandler dingSendMessageHandler = new DingSendMessageHandler(); + WeChatSendMessageHandler weChatSendMessageHandler = new WeChatSendMessageHandler(); + LarkSendMessageHandler larkSendMessageHandler = new LarkSendMessageHandler(); + + // Register message handlers + threadPoolBaseSendMessageService.getSendMessageHandlers().put(dingSendMessageHandler.getType(), dingSendMessageHandler); + threadPoolBaseSendMessageService.getSendMessageHandlers().put(weChatSendMessageHandler.getType(), weChatSendMessageHandler); + threadPoolBaseSendMessageService.getSendMessageHandlers().put(larkSendMessageHandler.getType(), larkSendMessageHandler); + + // Construct and register notification configurations + AgentModeNotifyConfigBuilder notifyConfigBuilder = new AgentModeNotifyConfigBuilder(alarmControlHandler); + Map> notifyConfigs = notifyConfigBuilder.buildNotify(); + threadPoolBaseSendMessageService.getNotifyConfigs().putAll(notifyConfigs); + } +} diff --git a/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/toolkit/SpringPropertyBinder.java b/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/toolkit/SpringPropertyBinder.java new file mode 100644 index 00000000..4491eb70 --- /dev/null +++ b/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/toolkit/SpringPropertyBinder.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.agent.plugin.spring.common.toolkit; + +import cn.hippo4j.threadpool.dynamic.mode.config.parser.ConfigFileTypeEnum; +import org.springframework.beans.BeanWrapper; +import org.springframework.beans.BeanWrapperImpl; +import org.springframework.core.env.Environment; +import org.springframework.core.env.PropertySource; +import org.springframework.core.env.ConfigurableEnvironment; +import org.springframework.core.env.MapPropertySource; + +import java.beans.PropertyEditorSupport; +import java.util.*; + +/** + * CustomPropertyBinder is a utility class for binding properties from Spring's Environment + * to Java objects based on a given prefix. This is useful for dynamically binding configurations + * in Spring applications where configuration properties are organized hierarchically and need to be + * mapped to corresponding Java objects. + * + *

This class handles complex property structures, including nested properties and collections, + * ensuring that all properties prefixed with the specified string are correctly bound to the target + * Java object.

+ */ +public class SpringPropertyBinder { + + /** + * Binds properties from the Spring Environment to an instance of the specified configuration class. + * + * @param environment the Spring Environment containing property sources. + * @param prefix the prefix to filter properties for binding (e.g., "spring.dynamic.thread-pool"). + * @param clazz the class type of the configuration object to bind properties to. + * @param the type of the configuration class. + * @return an instance of the configuration class with properties bound from the environment. + * @throws RuntimeException if there is an error instantiating the configuration class or binding properties. + */ + public static T bindProperties(Environment environment, String prefix, Class clazz) { + try { + // Create an instance of the target class + T instance = clazz.getDeclaredConstructor().newInstance(); + BeanWrapper beanWrapper = new BeanWrapperImpl(instance); + + // Register custom editor for ConfigFileTypeEnum to handle specific type conversions + beanWrapper.registerCustomEditor(ConfigFileTypeEnum.class, new ConfigFileTypeEnumEditor()); + + // Iterate over all property keys that match the given prefix + for (String key : getAllPropertyKeys(environment, prefix)) { + String propertyName = key.substring(prefix.length() + 1); // Remove prefix from the property key + String[] tokens = propertyName.split("\\."); // Split the property name by dot for nested properties + setPropertyValue(tokens, beanWrapper, environment.getProperty(key)); // Set the property value recursively + } + + return instance; + } catch (Exception e) { + throw new RuntimeException("Unable to bind properties to " + clazz.getName(), e); + } + } + + /** + * Recursively sets property values on the target object, handling nested properties and collections. + * + * @param tokens an array of property path tokens (e.g., ["nested", "property", "name"]). + * @param beanWrapper the BeanWrapper instance used to manipulate the target object. + * @param value the value to set on the target property. + */ + private static void setPropertyValue(String[] tokens, BeanWrapper beanWrapper, String value) { + for (int i = 0; i < tokens.length - 1; i++) { + String token = tokens[i]; + + if (token.matches(".*\\[\\d+\\]$")) { // Handle array/list property + token = token.substring(0, token.indexOf('[')); + int index = Integer.parseInt(tokens[i].substring(token.length() + 1, tokens[i].length() - 1)); + + token = convertToCamelCase(token); // Convert token to camelCase if necessary + List list = (List) beanWrapper.getPropertyValue(token); + if (list == null) { + list = new ArrayList<>(); + beanWrapper.setPropertyValue(convertToCamelCase(token), list); // Initialize the list if it's null + } + + // Ensure the list has enough size to accommodate the index + if (list.size() <= index) { + try { + // Instantiate the list element if it does not exist + list.add(index, beanWrapper.getPropertyTypeDescriptor(token) + .getElementTypeDescriptor().getType().newInstance()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + // Move the beanWrapper context to the current list element + beanWrapper = new BeanWrapperImpl(list.get(index)); + } else { // Handle simple or nested property + Object nestedObject = beanWrapper.getPropertyValue(token); + if (nestedObject == null) { + Class nestedClass = beanWrapper.getPropertyType(token); + if (Map.class.isAssignableFrom(nestedClass)) { + nestedObject = new HashMap<>(); // Initialize nested Map if necessary + } else { + try { + nestedObject = nestedClass.getDeclaredConstructor().newInstance(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + beanWrapper.setPropertyValue(convertToCamelCase(token), nestedObject); + } + // Move the beanWrapper context to the nested object + beanWrapper = new BeanWrapperImpl(nestedObject); + } + } + + // Finally, set the actual property value on the resolved object + String finalPropertyName = tokens[tokens.length - 1]; + Object currentObject = beanWrapper.getWrappedInstance(); + if (currentObject instanceof Map) { + // If the current object is a Map, set the value as a key-value pair + ((Map) currentObject).put(finalPropertyName, value); + } else { + // Otherwise, set it as a simple property + beanWrapper.setPropertyValue(convertToCamelCase(finalPropertyName), value); + } + } + + /** + * Retrieves all property keys from the environment that start with the given prefix. + * + * @param environment the Spring Environment containing property sources. + * @param prefix the prefix to filter property keys. + * @return a set of property keys that match the prefix. + */ + private static Set getAllPropertyKeys(Environment environment, String prefix) { + Set keys = new HashSet<>(); + // Iterate through all property sources in the environment + for (PropertySource propertySource : ((ConfigurableEnvironment) environment).getPropertySources()) { + if (propertySource instanceof MapPropertySource) { + Map source = ((MapPropertySource) propertySource).getSource(); + // Collect keys that start with the specified prefix + for (String key : source.keySet()) { + if (key.startsWith(prefix)) { + keys.add(key); + } + } + } + } + return keys; + } + + /** + * Converts a dashed-separated string to camelCase. + *

+ * For example, "my-property-name" -> "myPropertyName". + * + * @param dashed the dashed-separated string to be converted. + * @return the camelCase representation of the input string. + */ + private static String convertToCamelCase(String dashed) { + String[] parts = dashed.split("-"); + return Arrays.stream(parts) + .map(part -> part.substring(0, 1).toUpperCase() + part.substring(1)) // Capitalize each part + .reduce((first, second) -> first + second) // Concatenate all parts together + .map(result -> result.substring(0, 1).toLowerCase() + result.substring(1)) // Lowercase the first letter + .orElse(dashed); + } + + /** + * ConfigFileTypeEnumEditor is a custom property editor for converting string representations + * of {@link ConfigFileTypeEnum} into the corresponding enum instances. + *

+ * This editor is useful in scenarios where properties are read as strings but need to be + * converted to enum types for further processing. + */ + public static class ConfigFileTypeEnumEditor extends PropertyEditorSupport { + + /** + * Converts the given text value to the corresponding {@link ConfigFileTypeEnum} instance. + *

+ * This method overrides the default implementation to parse the input string and convert + * it into a {@link ConfigFileTypeEnum}. If the input string does not match any known enum + * value, an {@link IllegalArgumentException} will be thrown. + * + * @param text the string representation of the enum to be converted. + * @throws IllegalArgumentException if the text does not match any known enum value. + */ + @Override + public void setAsText(String text) throws IllegalArgumentException { + setValue(ConfigFileTypeEnum.of(text)); + } + } + +} diff --git a/examples/threadpool-example/agent/agent-example-core/pom.xml b/examples/threadpool-example/agent/agent-example-core/pom.xml new file mode 100644 index 00000000..abc4d722 --- /dev/null +++ b/examples/threadpool-example/agent/agent-example-core/pom.xml @@ -0,0 +1,47 @@ + + + 4.0.0 + + cn.hippo4j + hippo4j-threadpool-agent-example + 2.0.0-SNAPSHOT + + + hippo4j-agent-example-core + + + true + + + + + org.springframework.boot + spring-boot-starter + + + org.springframework.boot + spring-boot-starter-test + test + + + cn.hippo4j + hippo4j-threadpool-message + ${revision} + + + org.openjdk.jmh + jmh-core + 1.23 + test + + + org.openjdk.jmh + jmh-generator-annprocess + 1.23 + test + + + + \ No newline at end of file diff --git a/examples/threadpool-example/agent/config-apollo/src/main/java/cn/hippo4j/example/agent/config/apollo/ThreadPoolConfiguration.java b/examples/threadpool-example/agent/agent-example-core/src/main/java/cn/hippo4j/example/agent/core/config/ThreadPoolConfiguration.java similarity index 92% rename from examples/threadpool-example/agent/config-apollo/src/main/java/cn/hippo4j/example/agent/config/apollo/ThreadPoolConfiguration.java rename to examples/threadpool-example/agent/agent-example-core/src/main/java/cn/hippo4j/example/agent/core/config/ThreadPoolConfiguration.java index cec7c3bb..77182468 100644 --- a/examples/threadpool-example/agent/config-apollo/src/main/java/cn/hippo4j/example/agent/config/apollo/ThreadPoolConfiguration.java +++ b/examples/threadpool-example/agent/agent-example-core/src/main/java/cn/hippo4j/example/agent/core/config/ThreadPoolConfiguration.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package cn.hippo4j.example.agent.config.apollo; +package cn.hippo4j.example.agent.core.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -46,7 +46,7 @@ public class ThreadPoolConfiguration { // 演示 Agent 模式修改线程池 // ------------------------------------------------------------------------- - public static final ThreadPoolExecutor RUN_MESSAGE_SEND_TASK_EXECUTOR = new ThreadPoolExecutor( + public static final ThreadPoolExecutor AGENT_RUN_MESSAGE_SEND_TASK_EXECUTOR = new ThreadPoolExecutor( 1, 10, 1024, diff --git a/examples/threadpool-example/agent/agent-example-core/src/main/java/cn/hippo4j/example/agent/core/inittest/AlarmSendMessageTest.java b/examples/threadpool-example/agent/agent-example-core/src/main/java/cn/hippo4j/example/agent/core/inittest/AlarmSendMessageTest.java new file mode 100644 index 00000000..ccf385db --- /dev/null +++ b/examples/threadpool-example/agent/agent-example-core/src/main/java/cn/hippo4j/example/agent/core/inittest/AlarmSendMessageTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.example.agent.core.inittest; + +import cn.hippo4j.common.executor.ThreadPoolExecutorHolder; +import cn.hippo4j.common.executor.ThreadPoolExecutorRegistry; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Test alarm send message. + */ +@Slf4j +@Component +public class AlarmSendMessageTest { + + private static final int SLEEP_TIME = 10240124; + + private static final int INITIAL_DELAY = 3; + + private static final String RUN_MESSAGE_SEND_TASK_EXECUTOR = "runMessageSendTaskExecutor"; + + private static final String AGENT_RUN_MESSAGE_SEND_TASK_EXECUTOR = "cn.hippo4j.example.agent.core.config.ThreadPoolConfiguration#AGENT_RUN_MESSAGE_SEND_TASK_EXECUTOR"; + + /** + * Test alarm notification. + * If you need to run this single test, add @PostConstruct to the method. + */ + @SuppressWarnings("all") + @PostConstruct + public void alarmSendMessageTest() { + ScheduledExecutorService scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(); + scheduledThreadPool.scheduleWithFixedDelay(() -> { + ThreadPoolExecutorHolder executorHolder = ThreadPoolExecutorRegistry.getHolder(AGENT_RUN_MESSAGE_SEND_TASK_EXECUTOR); + ThreadPoolExecutor poolExecutor = executorHolder.getExecutor(); + try { + poolExecutor.execute(() -> { + try { + Thread.sleep(SLEEP_TIME); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + } catch (Exception ex) { + log.error("Throw reject policy.", ex.getMessage()); + } + }, INITIAL_DELAY, 2, TimeUnit.SECONDS); + + scheduledThreadPool.scheduleWithFixedDelay(() -> { + ThreadPoolExecutorHolder executorHolder = ThreadPoolExecutorRegistry.getHolder(RUN_MESSAGE_SEND_TASK_EXECUTOR); + ThreadPoolExecutor poolExecutor = executorHolder.getExecutor(); + try { + poolExecutor.execute(() -> { + try { + Thread.sleep(SLEEP_TIME); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + } catch (Exception ex) { + log.error("Throw reject policy.", ex.getMessage()); + } + }, INITIAL_DELAY, 2, TimeUnit.SECONDS); + + } +} diff --git a/examples/threadpool-example/agent/config-apollo/pom.xml b/examples/threadpool-example/agent/config-apollo/pom.xml index 7a7adfd4..f87d6ddf 100644 --- a/examples/threadpool-example/agent/config-apollo/pom.xml +++ b/examples/threadpool-example/agent/config-apollo/pom.xml @@ -16,6 +16,12 @@ + + cn.hippo4j + hippo4j-agent-example-core + ${revision} + + org.springframework.boot spring-boot-starter diff --git a/examples/threadpool-example/agent/config-apollo/src/main/java/cn/hippo4j/example/agent/config/apollo/AgentConfigApolloExampleApplication.java b/examples/threadpool-example/agent/config-apollo/src/main/java/cn/hippo4j/example/agent/config/apollo/AgentConfigApolloExampleApplication.java index 33283990..365282b7 100644 --- a/examples/threadpool-example/agent/config-apollo/src/main/java/cn/hippo4j/example/agent/config/apollo/AgentConfigApolloExampleApplication.java +++ b/examples/threadpool-example/agent/config-apollo/src/main/java/cn/hippo4j/example/agent/config/apollo/AgentConfigApolloExampleApplication.java @@ -23,7 +23,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; /** * Agent config apollo example application. */ -@SpringBootApplication +@SpringBootApplication(scanBasePackages = "cn.hippo4j.example.agent.core") public class AgentConfigApolloExampleApplication { public static void main(String[] args) { diff --git a/examples/threadpool-example/agent/config-nacos/pom.xml b/examples/threadpool-example/agent/config-nacos/pom.xml index 52c783aa..4240c147 100644 --- a/examples/threadpool-example/agent/config-nacos/pom.xml +++ b/examples/threadpool-example/agent/config-nacos/pom.xml @@ -16,6 +16,11 @@ + + cn.hippo4j + hippo4j-agent-example-core + ${revision} + org.springframework.boot spring-boot-starter diff --git a/examples/threadpool-example/agent/config-nacos/src/main/java/cn/hippo4j/example/agent/config/nacos/AgentConfigNacosExampleApplication.java b/examples/threadpool-example/agent/config-nacos/src/main/java/cn/hippo4j/example/agent/config/nacos/AgentConfigNacosExampleApplication.java index b05949da..e5010fbd 100644 --- a/examples/threadpool-example/agent/config-nacos/src/main/java/cn/hippo4j/example/agent/config/nacos/AgentConfigNacosExampleApplication.java +++ b/examples/threadpool-example/agent/config-nacos/src/main/java/cn/hippo4j/example/agent/config/nacos/AgentConfigNacosExampleApplication.java @@ -25,7 +25,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; /** * Agent config Nacos example application. */ -@SpringBootApplication +@SpringBootApplication(scanBasePackages = "cn.hippo4j.example.agent.core") @EnableNacosConfig public class AgentConfigNacosExampleApplication { diff --git a/examples/threadpool-example/agent/config-nacos/src/main/java/cn/hippo4j/example/agent/config/nacos/ThreadPoolConfiguration.java b/examples/threadpool-example/agent/config-nacos/src/main/java/cn/hippo4j/example/agent/config/nacos/ThreadPoolConfiguration.java deleted file mode 100644 index 9f60e753..00000000 --- a/examples/threadpool-example/agent/config-nacos/src/main/java/cn/hippo4j/example/agent/config/nacos/ThreadPoolConfiguration.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.hippo4j.example.agent.config.nacos; - -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; - -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -@Configuration -public class ThreadPoolConfiguration { - - // ------------------------------------------------------------------------- - // 未使用 Hippo4j,原始定义线程池创建方式 - // ------------------------------------------------------------------------- - - @Bean - public ThreadPoolExecutor runMessageSendTaskExecutor() { - LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue<>(1024); - return new ThreadPoolExecutor( - 1, - 10, - 1024, - TimeUnit.SECONDS, - linkedBlockingQueue); - } - - // ------------------------------------------------------------------------- - // 演示 Agent 模式修改线程池 - // ------------------------------------------------------------------------- - - public static final ThreadPoolExecutor RUN_MESSAGE_SEND_TASK_EXECUTOR = new ThreadPoolExecutor( - 1, - 10, - 1024, - TimeUnit.SECONDS, - new LinkedBlockingQueue<>(1024)); -} diff --git a/examples/threadpool-example/agent/pom.xml b/examples/threadpool-example/agent/pom.xml index 8dd4320a..d215c979 100644 --- a/examples/threadpool-example/agent/pom.xml +++ b/examples/threadpool-example/agent/pom.xml @@ -19,5 +19,6 @@ config-apollo config-nacos + agent-example-core \ No newline at end of file diff --git a/infra/common/src/main/java/cn/hippo4j/common/constant/Constants.java b/infra/common/src/main/java/cn/hippo4j/common/constant/Constants.java index 9eeb865e..71577018 100644 --- a/infra/common/src/main/java/cn/hippo4j/common/constant/Constants.java +++ b/infra/common/src/main/java/cn/hippo4j/common/constant/Constants.java @@ -56,6 +56,8 @@ public class Constants { public static final String GENERAL_SPLIT_SYMBOL = ","; + public static final String DOT_SPLIT_SYMBOL = "."; + public static final String IDENTIFY_SLICER_SYMBOL = "_"; public static final String LONG_POLLING_LINE_SEPARATOR = "\r\n"; @@ -128,5 +130,12 @@ public class Constants { public static final String CONFIGURATION_PROPERTIES_PREFIX = "spring.dynamic.thread-pool"; + public static final String EXECUTORS = "executors"; + public static final long NO_REJECT_COUNT_NUM = -1L; + + public static final String DYNAMIC_THREAD_POOL_EXECUTOR = "cn.hippo4j.core.executor.DynamicThreadPoolExecutor"; + + public static final int DEFAULT_INTERVAL = 5; + }