feat:Agent dynamic alarm Initialize

pull/1572/head
Pan-YuJie 1 year ago
parent 33c4609f86
commit 1404235aae

@ -43,5 +43,11 @@
<version>${project.version}</version> <version>${project.version}</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-kernel-alarm</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

@ -0,0 +1,200 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.agent.plugin.spring.common.alarm;
import cn.hippo4j.agent.plugin.spring.common.support.ThreadPoolCheckAlarmSupport;
import cn.hippo4j.common.api.IExecutorProperties;
import cn.hippo4j.common.model.executor.ExecutorNotifyProperties;
import cn.hippo4j.common.model.executor.ExecutorProperties;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.NotifyPlatformProperties;
import cn.hippo4j.threadpool.message.api.NotifyConfigBuilder;
import cn.hippo4j.threadpool.message.api.NotifyConfigDTO;
import cn.hippo4j.threadpool.message.core.service.AlarmControlHandler;
import lombok.AllArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import static cn.hippo4j.agent.plugin.spring.common.support.SpringPropertiesLoader.BOOTSTRAP_CONFIG_PROPERTIES;
import static cn.hippo4j.common.constant.Constants.DEFAULT_INTERVAL;
/**
* This class is responsible for building the notification configurations for thread pools in an agent mode.
* It implements the {@link NotifyConfigBuilder} interface and provides methods to build and initialize
* notification configurations for various platforms and types (e.g., ALARM, CONFIG).
*
* <p>The configuration is based on the properties loaded from the bootstrap configuration and includes
* handling for alarm control and notification intervals.</p>
*/
@AllArgsConstructor
public class AgentModeNotifyConfigBuilder implements NotifyConfigBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolCheckAlarmSupport.class);
private final AlarmControlHandler alarmControlHandler;
/**
* Builds the notification configurations for all executors defined in the bootstrap configuration.
*
* <p>This method filters the executors based on their alarm settings and constructs the notification
* configurations accordingly. If global alarm settings are disabled and there are no specific alarms
* configured for any executor, the method returns an empty map.</p>
*
* @return A map containing the notification configurations, keyed by the notification type (e.g., ALARM, CONFIG).
*/
public Map<String, List<NotifyConfigDTO>> buildNotify() {
Map<String, List<NotifyConfigDTO>> resultMap = new HashMap<>();
boolean globalAlarm = Optional.ofNullable(BOOTSTRAP_CONFIG_PROPERTIES.getDefaultExecutor())
.map(ExecutorProperties::getAlarm)
.orElse(true);
List<ExecutorProperties> executors = BOOTSTRAP_CONFIG_PROPERTIES.getExecutors();
if (CollectionUtil.isEmpty(executors)) {
LOGGER.warn("Failed to build notify, executors configuration is empty.");
return resultMap;
}
List<ExecutorProperties> actual = executors.stream()
.filter(each -> Optional.ofNullable(each.getAlarm())
.orElse(false))
.collect(Collectors.toList());
if (!globalAlarm && CollectionUtil.isEmpty(actual)) {
return resultMap;
}
for (ExecutorProperties executorProperties : executors) {
Map<String, List<NotifyConfigDTO>> buildSingleNotifyConfig = buildSingleNotifyConfig(executorProperties);
initCacheAndLock(buildSingleNotifyConfig);
resultMap.putAll(buildSingleNotifyConfig);
}
return resultMap;
}
/**
* Builds the notification configurations for a single executor.
*
* <p>This method generates two types of notifications: ALARM and CONFIG. For each type, it creates
* notification configurations based on the platforms defined in the bootstrap configuration.</p>
*
* @param executorProperties The properties of the executor for which to build the notification configurations.
* @return A map containing the notification configurations for the given executor, keyed by the notification type.
*/
public Map<String, List<NotifyConfigDTO>> buildSingleNotifyConfig(IExecutorProperties executorProperties) {
String threadPoolId = executorProperties.getThreadPoolId();
Map<String, List<NotifyConfigDTO>> resultMap = new HashMap<>();
String alarmBuildKey = threadPoolId + "+ALARM";
List<NotifyConfigDTO> alarmNotifyConfigs = new ArrayList<>();
List<NotifyPlatformProperties> notifyPlatforms = BOOTSTRAP_CONFIG_PROPERTIES.getNotifyPlatforms();
for (NotifyPlatformProperties platformProperties : notifyPlatforms) {
NotifyConfigDTO notifyConfig = new NotifyConfigDTO();
notifyConfig.setPlatform(platformProperties.getPlatform());
notifyConfig.setTpId(threadPoolId);
notifyConfig.setType("ALARM");
notifyConfig.setSecret(platformProperties.getSecret());
notifyConfig.setSecretKey(getToken(platformProperties));
notifyConfig.setInterval(buildInterval(executorProperties));
notifyConfig.setReceives(buildReceive(executorProperties));
alarmNotifyConfigs.add(notifyConfig);
}
resultMap.put(alarmBuildKey, alarmNotifyConfigs);
String changeBuildKey = threadPoolId + "+CONFIG";
List<NotifyConfigDTO> changeNotifyConfigs = new ArrayList<>();
for (NotifyPlatformProperties platformProperties : notifyPlatforms) {
NotifyConfigDTO notifyConfig = new NotifyConfigDTO();
notifyConfig.setPlatform(platformProperties.getPlatform());
notifyConfig.setTpId(threadPoolId);
notifyConfig.setType("CONFIG");
notifyConfig.setSecretKey(getToken(platformProperties));
notifyConfig.setSecret(platformProperties.getSecret());
notifyConfig.setReceives(buildReceive(executorProperties));
changeNotifyConfigs.add(notifyConfig);
}
resultMap.put(changeBuildKey, changeNotifyConfigs);
return resultMap;
}
/**
* Retrieves the token for the given notification platform properties.
*
* <p>If the token is not explicitly set, the method returns the secret key as the fallback.</p>
*
* @param platformProperties The platform properties from which to retrieve the token.
* @return The token or secret key associated with the given platform properties.
*/
private String getToken(NotifyPlatformProperties platformProperties) {
return StringUtil.isNotBlank(platformProperties.getToken()) ? platformProperties.getToken() : platformProperties.getSecretKey();
}
/**
* Builds the notification interval for the given executor properties.
*
* <p>This method first checks the executor's specific notify configuration. If not set, it falls back
* to the default executor configuration in the bootstrap properties.</p>
*
* @param executorProperties The properties of the executor for which to build the notification interval.
* @return The notification interval in seconds.
*/
private int buildInterval(IExecutorProperties executorProperties) {
return Optional.ofNullable(executorProperties.getNotify())
.map(ExecutorNotifyProperties::getInterval)
.orElse(Optional.ofNullable(BOOTSTRAP_CONFIG_PROPERTIES.getDefaultExecutor())
.map(ExecutorProperties::getNotify)
.map(ExecutorNotifyProperties::getInterval)
.orElse(DEFAULT_INTERVAL));
}
/**
* Builds the notification recipients for the given executor properties.
*
* <p>This method first checks the executor's specific notify configuration. If not set, it falls back
* to the default executor configuration in the bootstrap properties.</p>
*
* @param executorProperties The properties of the executor for which to build the notification recipients.
* @return A string containing the recipients of the notifications.
*/
private String buildReceive(IExecutorProperties executorProperties) {
return Optional.ofNullable(executorProperties.getNotify())
.map(ExecutorNotifyProperties::getReceives)
.orElse(Optional.ofNullable(BOOTSTRAP_CONFIG_PROPERTIES.getDefaultExecutor())
.map(ExecutorProperties::getNotify)
.map(ExecutorNotifyProperties::getReceives).orElse(""));
}
/**
* Initializes the cache and lock mechanisms for the given notification configurations.
*
* <p>This method is primarily responsible for setting up alarm controls based on the notification
* configurations, ensuring that the appropriate cache and lock mechanisms are initialized for
* each thread pool and platform combination.</p>
*
* @param buildSingleNotifyConfig A map containing the notification configurations that need cache and lock initialization.
*/
public void initCacheAndLock(Map<String, List<NotifyConfigDTO>> buildSingleNotifyConfig) {
buildSingleNotifyConfig.forEach(
(key, val) -> val.stream()
.filter(each -> Objects.equals("ALARM", each.getType()))
.forEach(each -> alarmControlHandler.initCacheAndLock(each.getTpId(), each.getPlatform(), each.getInterval())));
}
}

@ -84,5 +84,19 @@ public class SpringBootConfig {
public static String CONFIG_FILE_TYPE; public static String CONFIG_FILE_TYPE;
} }
} }
@SpringBootConfigNode(root = SpringBootConfig.class)
public static class Application {
public static String name = "";
}
@SpringBootConfigNode(root = SpringBootConfig.class)
public static class Profiles {
public static String active = "";
}
} }
} }

@ -18,16 +18,20 @@
package cn.hippo4j.agent.plugin.spring.common.support; package cn.hippo4j.agent.plugin.spring.common.support;
import cn.hippo4j.agent.core.boot.SpringBootConfigInitializer; import cn.hippo4j.agent.core.boot.SpringBootConfigInitializer;
import cn.hippo4j.agent.plugin.spring.common.toolkit.SpringPropertyBinder;
import cn.hippo4j.common.logging.api.ILog; import cn.hippo4j.common.logging.api.ILog;
import cn.hippo4j.common.logging.api.LogManager; import cn.hippo4j.common.logging.api.LogManager;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import org.springframework.core.env.ConfigurableEnvironment; import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.EnumerablePropertySource; import org.springframework.core.env.EnumerablePropertySource;
import org.springframework.core.env.PropertiesPropertySource;
import org.springframework.core.env.PropertySource; import org.springframework.core.env.PropertySource;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Properties; import java.util.Properties;
import static cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties.PREFIX;
/** /**
* Spring properties loader * Spring properties loader
@ -36,6 +40,8 @@ public class SpringPropertiesLoader {
private static final ILog LOGGER = LogManager.getLogger(SpringPropertiesLoader.class); private static final ILog LOGGER = LogManager.getLogger(SpringPropertiesLoader.class);
public static BootstrapConfigProperties BOOTSTRAP_CONFIG_PROPERTIES = new BootstrapConfigProperties();
public static void loadSpringProperties(ConfigurableEnvironment environment) { public static void loadSpringProperties(ConfigurableEnvironment environment) {
Iterator<PropertySource<?>> iterator = environment.getPropertySources().iterator(); Iterator<PropertySource<?>> iterator = environment.getPropertySources().iterator();
Properties properties = new Properties(); Properties properties = new Properties();
@ -43,7 +49,15 @@ public class SpringPropertiesLoader {
while (iterator.hasNext()) { while (iterator.hasNext()) {
propertySourceList.add(iterator.next()); propertySourceList.add(iterator.next());
} }
for (int i = propertySourceList.size() - 1; i >= 0; i--) { // Sort to ensure that the configuration in the configuration center is after the array
// To get the latest configuration information
propertySourceList.sort((o1, o2) -> {
boolean o1Contains = o1.getName().toLowerCase().contains("apollo") || o1.getName().toLowerCase().contains("nacos");
boolean o2Contains = (o2.getName().toLowerCase().contains("apollo") || o2.getName().toLowerCase().contains("nacos"));
return Boolean.compare(o1Contains, o2Contains);
});
for (int i = 0; i <= propertySourceList.size() - 1; i++) {
PropertySource<?> propertySource = propertySourceList.get(i); PropertySource<?> propertySource = propertySourceList.get(i);
if (!(propertySource instanceof EnumerablePropertySource)) { if (!(propertySource instanceof EnumerablePropertySource)) {
LOGGER.warn("Skip propertySource[{}] because {} not enumerable.", propertySource.getName(), propertySource.getClass()); LOGGER.warn("Skip propertySource[{}] because {} not enumerable.", propertySource.getName(), propertySource.getClass());
@ -65,5 +79,12 @@ public class SpringPropertiesLoader {
} }
} }
SpringBootConfigInitializer.setSpringProperties(properties); SpringBootConfigInitializer.setSpringProperties(properties);
PropertiesPropertySource propertySource = new PropertiesPropertySource("customPropertySource", properties);
environment.getPropertySources().addFirst(propertySource);
// initialize BootstrapConfigProperties
BOOTSTRAP_CONFIG_PROPERTIES = SpringPropertyBinder.bindProperties(environment, PREFIX, BootstrapConfigProperties.class);
ThreadPoolCheckAlarmSupport.enableThreadPoolCheckAlarmHandler();
} }
} }

@ -22,17 +22,30 @@ import cn.hippo4j.common.executor.ThreadPoolExecutorRegistry;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum; import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum; import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.handler.DynamicThreadPoolAdapterChoose; import cn.hippo4j.common.handler.DynamicThreadPoolAdapterChoose;
import cn.hippo4j.common.model.executor.ExecutorNotifyProperties;
import cn.hippo4j.common.model.executor.ExecutorProperties; import cn.hippo4j.common.model.executor.ExecutorProperties;
import cn.hippo4j.common.toolkit.BooleanUtil; import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import cn.hippo4j.threadpool.message.core.service.GlobalNotifyAlarmManage;
import cn.hippo4j.threadpool.message.core.service.ThreadPoolNotifyAlarm;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContext;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static cn.hippo4j.common.constant.Constants.DYNAMIC_THREAD_POOL_EXECUTOR;
/** /**
* Spring thread pool register support * Spring thread pool register support
@ -41,6 +54,14 @@ public class SpringThreadPoolRegisterSupport {
private static final Logger LOGGER = LoggerFactory.getLogger(SpringThreadPoolRegisterSupport.class); private static final Logger LOGGER = LoggerFactory.getLogger(SpringThreadPoolRegisterSupport.class);
private static final int DEFAULT_ACTIVE_ALARM = 80;
private static final int DEFAULT_CAPACITY_ALARM = 80;
private static final int DEFAULT_INTERVAL = 5;
private static final String DEFAULT_RECEIVES = "";
public static void registerThreadPoolInstances(ApplicationContext context) { public static void registerThreadPoolInstances(ApplicationContext context) {
Map<ThreadPoolExecutor, Class<?>> referencedClassMap = ThreadPoolExecutorRegistry.REFERENCED_CLASS_MAP; Map<ThreadPoolExecutor, Class<?>> referencedClassMap = ThreadPoolExecutorRegistry.REFERENCED_CLASS_MAP;
for (Map.Entry<ThreadPoolExecutor, Class<?>> entry : referencedClassMap.entrySet()) { for (Map.Entry<ThreadPoolExecutor, Class<?>> entry : referencedClassMap.entrySet()) {
@ -52,7 +73,7 @@ public class SpringThreadPoolRegisterSupport {
Object value = field.get(null); Object value = field.get(null);
if (value == enhancedInstance) { if (value == enhancedInstance) {
String threadPoolId = declaredClass.getName() + "#" + field.getName(); String threadPoolId = declaredClass.getName() + "#" + field.getName();
register(threadPoolId, enhancedInstance); register(threadPoolId, enhancedInstance, Boolean.TRUE);
break; break;
} }
} catch (IllegalAccessException e) { } catch (IllegalAccessException e) {
@ -74,25 +95,147 @@ public class SpringThreadPoolRegisterSupport {
if (executor == null) { if (executor == null) {
LOGGER.warn("[Hippo4j-Agent] Thread pool is null, ignore bean registration. beanName={}, beanClass={}", beanName, bean.getClass().getName()); LOGGER.warn("[Hippo4j-Agent] Thread pool is null, ignore bean registration. beanName={}, beanClass={}", beanName, bean.getClass().getName());
} else { } else {
register(beanName, executor); register(beanName, executor, Boolean.FALSE);
} }
} }
LOGGER.info("[Hippo4j-Agent] Registered thread pool instances successfully."); LOGGER.info("[Hippo4j-Agent] Registered thread pool instances successfully.");
} }
public static void register(String threadPoolId, ThreadPoolExecutor executor) { public static void register(String threadPoolId, ThreadPoolExecutor executor, Boolean isAgentScanEnhancePool) {
if (executor == null) { if (executor == null) {
return; return;
} }
ExecutorProperties executorProperties = ExecutorProperties.builder() ExecutorProperties executorProperties = SpringPropertiesLoader.BOOTSTRAP_CONFIG_PROPERTIES.getExecutors().stream()
.threadPoolId(threadPoolId) .filter(each -> Objects.equals(threadPoolId, each.getThreadPoolId()))
.corePoolSize(executor.getCorePoolSize()) .findFirst()
.maximumPoolSize(executor.getMaximumPoolSize()) .orElse(null);
.allowCoreThreadTimeOut(BooleanUtil.toBoolean(String.valueOf(executor.allowsCoreThreadTimeOut())))
.blockingQueue(BlockingQueueTypeEnum.getBlockingQueueTypeEnumByName(executor.getQueue().getClass().getSimpleName()).getName()) // Determines the thread pool that is currently obtained by bean scanning
.queueCapacity(executor.getQueue().remainingCapacity()) if (Objects.isNull(executorProperties)) {
.rejectedHandler(RejectedPolicyTypeEnum.getRejectedPolicyTypeEnumByName(executor.getRejectedExecutionHandler().getClass().getSimpleName()).getName()) if (isAgentScanEnhancePool) {
.build(); throw new RuntimeException(String.format("The thread pool id [%s] does not exist in the configuration.", threadPoolId));
} else {
// Thread pool that do not require enhancement are skipped by the agent
return;
}
}
try {
executorProperties = buildActualExecutorProperties(executorProperties);
// Replace the original configuration and refresh the thread pool
threadPoolParamReplace(executor, executorProperties);
} catch (Exception ex) {
LOGGER.error("[Hippo4j-Agent] Failed to initialize thread pool configuration.", ex);
}
// Build notification information entity
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = buildThreadPoolNotifyAlarm(executorProperties);
GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm);
ThreadPoolExecutorRegistry.putHolder(threadPoolId, executor, executorProperties); ThreadPoolExecutorRegistry.putHolder(threadPoolId, executor, executorProperties);
} }
/**
* Thread-pool param replace.
*
* @param executor dynamic thread-pool executor
* @param executorProperties executor properties
*/
private static void threadPoolParamReplace(ThreadPoolExecutor executor, ExecutorProperties executorProperties) {
BlockingQueue workQueue = BlockingQueueTypeEnum.createBlockingQueue(executorProperties.getBlockingQueue(), executorProperties.getQueueCapacity());
cn.hippo4j.common.toolkit.ReflectUtil.setFieldValue(executor, "workQueue", workQueue);
ThreadPoolExecutorUtil.safeSetPoolSize(executor, executorProperties.getCorePoolSize(), executorProperties.getMaximumPoolSize());
executor.setKeepAliveTime(executorProperties.getKeepAliveTime(), TimeUnit.SECONDS);
executor.allowCoreThreadTimeOut(executorProperties.getAllowCoreThreadTimeOut());
executor.setRejectedExecutionHandler(RejectedPolicyTypeEnum.createPolicy(executorProperties.getRejectedHandler()));
// Reflection sets the thread pool setExecuteTimeOut
if (DYNAMIC_THREAD_POOL_EXECUTOR.equals(executor.getClass().getName())) {
try {
Method setExecuteTimeOutMethod = executor.getClass().getMethod("setExecuteTimeOut", Long.class);
Long executeTimeOut = executorProperties.getExecuteTimeOut();
if (executeTimeOut != null) {
setExecuteTimeOutMethod.invoke(executor, executeTimeOut);
}
} catch (Exception e) {
LOGGER.error("[Hippo4j-Agent] Failed to set executeTimeOut.", e);
}
}
}
/**
* Build actual executor properties.
*
* @param executorProperties executor properties
* @return executor properties
*/
private static ExecutorProperties buildActualExecutorProperties(ExecutorProperties executorProperties) {
return Optional.ofNullable(SpringPropertiesLoader.BOOTSTRAP_CONFIG_PROPERTIES.getDefaultExecutor()).map(each -> buildExecutorProperties(executorProperties)).orElse(executorProperties);
}
/**
* Build executor properties.
*
* @param executorProperties executor properties
* @return executor properties
*/
private static ExecutorProperties buildExecutorProperties(ExecutorProperties executorProperties) {
BootstrapConfigProperties configProperties = SpringPropertiesLoader.BOOTSTRAP_CONFIG_PROPERTIES;
return ExecutorProperties.builder()
.corePoolSize(Optional.ofNullable(executorProperties.getCorePoolSize())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getCorePoolSize).get()))
.maximumPoolSize(Optional.ofNullable(executorProperties.getMaximumPoolSize())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getMaximumPoolSize).get()))
.allowCoreThreadTimeOut(Optional.ofNullable(executorProperties.getAllowCoreThreadTimeOut())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getAllowCoreThreadTimeOut).get()))
.keepAliveTime(Optional.ofNullable(executorProperties.getKeepAliveTime())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getKeepAliveTime).get()))
.blockingQueue(Optional.ofNullable(executorProperties.getBlockingQueue())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getBlockingQueue).get()))
.executeTimeOut(Optional.ofNullable(executorProperties.getExecuteTimeOut())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getExecuteTimeOut).orElse(0L)))
.queueCapacity(Optional.ofNullable(executorProperties.getQueueCapacity())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getQueueCapacity).get()))
.rejectedHandler(Optional.ofNullable(executorProperties.getRejectedHandler())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getRejectedHandler).get()))
.threadNamePrefix(StringUtil.isBlank(executorProperties.getThreadNamePrefix()) ? executorProperties.getThreadPoolId() : executorProperties.getThreadNamePrefix())
.threadPoolId(executorProperties.getThreadPoolId())
.alarm(Optional.ofNullable(executorProperties.getAlarm())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getAlarm).orElse(null)))
.activeAlarm(Optional.ofNullable(executorProperties.getActiveAlarm())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getActiveAlarm).orElse(null)))
.capacityAlarm(Optional.ofNullable(executorProperties.getCapacityAlarm())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getCapacityAlarm).orElse(null)))
.notify(Optional.ofNullable(executorProperties.getNotify())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getNotify).orElse(null)))
.nodes(Optional.ofNullable(executorProperties.getNodes())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getNodes).orElse(null)))
.build();
}
/**
* Build thread-pool notify alarm
*
* @param executorProperties executor properties
* @return thread-pool notify alarm
*/
private static ThreadPoolNotifyAlarm buildThreadPoolNotifyAlarm(ExecutorProperties executorProperties) {
BootstrapConfigProperties configProperties = SpringPropertiesLoader.BOOTSTRAP_CONFIG_PROPERTIES;
ExecutorNotifyProperties notify = Optional.ofNullable(executorProperties).map(ExecutorProperties::getNotify).orElse(null);
boolean isAlarm = Optional.ofNullable(executorProperties.getAlarm())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getAlarm).orElse(true));
int activeAlarm = Optional.ofNullable(executorProperties.getActiveAlarm())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getActiveAlarm).orElse(DEFAULT_ACTIVE_ALARM));
int capacityAlarm = Optional.ofNullable(executorProperties.getCapacityAlarm())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getCapacityAlarm).orElse(DEFAULT_CAPACITY_ALARM));
int interval = Optional.ofNullable(notify)
.map(ExecutorNotifyProperties::getInterval)
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getNotify).map(ExecutorNotifyProperties::getInterval).orElse(DEFAULT_INTERVAL));
String receive = Optional.ofNullable(notify)
.map(ExecutorNotifyProperties::getReceives)
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getNotify).map(ExecutorNotifyProperties::getReceives).orElse(DEFAULT_RECEIVES));
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(isAlarm, activeAlarm, capacityAlarm);
threadPoolNotifyAlarm.setInterval(interval);
threadPoolNotifyAlarm.setReceives(receive);
return threadPoolNotifyAlarm;
}
} }

@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.agent.plugin.spring.common.support;
import cn.hippo4j.agent.plugin.spring.common.alarm.AgentModeNotifyConfigBuilder;
import cn.hippo4j.agent.plugin.spring.common.conf.SpringBootConfig;
import cn.hippo4j.common.propertie.EnvironmentProperties;
import cn.hippo4j.threadpool.alarm.handler.DefaultThreadPoolCheckAlarmHandler;
import cn.hippo4j.threadpool.message.api.NotifyConfigDTO;
import cn.hippo4j.threadpool.message.core.platform.DingSendMessageHandler;
import cn.hippo4j.threadpool.message.core.platform.LarkSendMessageHandler;
import cn.hippo4j.threadpool.message.core.platform.WeChatSendMessageHandler;
import cn.hippo4j.threadpool.message.core.service.AlarmControlHandler;
import cn.hippo4j.threadpool.message.core.service.SendMessageHandler;
import cn.hippo4j.threadpool.message.core.service.ThreadPoolBaseSendMessageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import static cn.hippo4j.agent.plugin.spring.common.support.SpringPropertiesLoader.BOOTSTRAP_CONFIG_PROPERTIES;
/**
* The {@code ThreadPoolCheckAlarmSupport} class provides functionality to enable and configure
* a thread pool check alarm handler. This is typically used to monitor thread pools for potential
* issues and send notifications based on the configured alert mechanisms.
*/
public class ThreadPoolCheckAlarmSupport {
private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolCheckAlarmSupport.class);
/**
* Enables the thread pool check alarm handler if the corresponding configuration property is set to {@code true}.
* <p>
* This method performs the following actions:
* <ul>
* <li>Checks the value of the {@code enable} property in the bootstrap configuration. If it is {@code true}, it proceeds.</li>
* <li>Initializes environment properties needed for the monitoring process.</li>
* <li>Creates an instance of {@link AlarmControlHandler} and {@link ThreadPoolBaseSendMessageService} with necessary dependencies.</li>
* <li>Initializes and registers message handlers and notification configurations.</li>
* <li>Creates an instance of {@link DefaultThreadPoolCheckAlarmHandler} and schedules it to start monitoring the thread pool.</li>
* </ul>
*/
public static void enableThreadPoolCheckAlarmHandler() {
// Check if the thread pool checker is enabled in the bootstrap configuration properties
if (Boolean.TRUE.equals(BOOTSTRAP_CONFIG_PROPERTIES.getEnable())) {
// Initialize EnvironmentProperties
initializeEnvironmentProperties();
// Initialize the AlarmControlHandler and ThreadPoolBaseSendMessageService
AlarmControlHandler alarmControlHandler = new AlarmControlHandler();
ThreadPoolBaseSendMessageService threadPoolBaseSendMessageService = createThreadPoolBaseSendMessageService(alarmControlHandler);
// Initialize the alarm platform information
initializeSendMessageHandlers(threadPoolBaseSendMessageService, alarmControlHandler);
// Initialize the thread pool check alarm handler with necessary services
DefaultThreadPoolCheckAlarmHandler checkAlarmHandler = new DefaultThreadPoolCheckAlarmHandler(threadPoolBaseSendMessageService);
// Run the check alarm handler to start monitoring the thread pool
checkAlarmHandler.scheduleExecute();
}
}
/**
* Initializes environment properties used for thread pool monitoring.
* <p>
* This method sets the state check interval, item ID, application name, and active profile from the bootstrap configuration.
*/
private static void initializeEnvironmentProperties() {
EnvironmentProperties.checkStateInterval = Long.valueOf(BOOTSTRAP_CONFIG_PROPERTIES.getCheckStateInterval());
EnvironmentProperties.itemId = BOOTSTRAP_CONFIG_PROPERTIES.getItemId();
EnvironmentProperties.applicationName = SpringBootConfig.Spring.Application.name;
EnvironmentProperties.active = SpringBootConfig.Spring.Profiles.active;
}
/**
* Creates and returns a new instance of {@link ThreadPoolBaseSendMessageService} with the specified {@link AlarmControlHandler}.
*
* @param alarmControlHandler The {@link AlarmControlHandler} used to control and handle alarms.
* @return A new instance of {@link ThreadPoolBaseSendMessageService}.
*/
private static ThreadPoolBaseSendMessageService createThreadPoolBaseSendMessageService(AlarmControlHandler alarmControlHandler) {
return new ThreadPoolBaseSendMessageService(alarmControlHandler);
}
/**
* Initializes and registers the message handlers and notification configurations in the specified
* {@link ThreadPoolBaseSendMessageService}.
* <p>
* This method creates instances of various {@link SendMessageHandler} implementations and registers them.
* It also constructs and registers notification configurations using the {@link AgentModeNotifyConfigBuilder}.
*
* @param threadPoolBaseSendMessageService The {@link ThreadPoolBaseSendMessageService} in which message handlers and notification configurations will be registered.
* @param alarmControlHandler The {@link AlarmControlHandler} used to handle alarms and notifications.
*/
private static void initializeSendMessageHandlers(ThreadPoolBaseSendMessageService threadPoolBaseSendMessageService, AlarmControlHandler alarmControlHandler) {
// Initialize message handlers
DingSendMessageHandler dingSendMessageHandler = new DingSendMessageHandler();
WeChatSendMessageHandler weChatSendMessageHandler = new WeChatSendMessageHandler();
LarkSendMessageHandler larkSendMessageHandler = new LarkSendMessageHandler();
// Register message handlers
threadPoolBaseSendMessageService.getSendMessageHandlers().put(dingSendMessageHandler.getType(), dingSendMessageHandler);
threadPoolBaseSendMessageService.getSendMessageHandlers().put(weChatSendMessageHandler.getType(), weChatSendMessageHandler);
threadPoolBaseSendMessageService.getSendMessageHandlers().put(larkSendMessageHandler.getType(), larkSendMessageHandler);
// Construct and register notification configurations
AgentModeNotifyConfigBuilder notifyConfigBuilder = new AgentModeNotifyConfigBuilder(alarmControlHandler);
Map<String, List<NotifyConfigDTO>> notifyConfigs = notifyConfigBuilder.buildNotify();
threadPoolBaseSendMessageService.getNotifyConfigs().putAll(notifyConfigs);
}
}

@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.agent.plugin.spring.common.toolkit;
import cn.hippo4j.threadpool.dynamic.mode.config.parser.ConfigFileTypeEnum;
import org.springframework.beans.BeanWrapper;
import org.springframework.beans.BeanWrapperImpl;
import org.springframework.core.env.Environment;
import org.springframework.core.env.PropertySource;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.MapPropertySource;
import java.beans.PropertyEditorSupport;
import java.util.*;
/**
* CustomPropertyBinder is a utility class for binding properties from Spring's Environment
* to Java objects based on a given prefix. This is useful for dynamically binding configurations
* in Spring applications where configuration properties are organized hierarchically and need to be
* mapped to corresponding Java objects.
*
* <p>This class handles complex property structures, including nested properties and collections,
* ensuring that all properties prefixed with the specified string are correctly bound to the target
* Java object.</p>
*/
public class SpringPropertyBinder {
/**
* Binds properties from the Spring Environment to an instance of the specified configuration class.
*
* @param environment the Spring Environment containing property sources.
* @param prefix the prefix to filter properties for binding (e.g., "spring.dynamic.thread-pool").
* @param clazz the class type of the configuration object to bind properties to.
* @param <T> the type of the configuration class.
* @return an instance of the configuration class with properties bound from the environment.
* @throws RuntimeException if there is an error instantiating the configuration class or binding properties.
*/
public static <T> T bindProperties(Environment environment, String prefix, Class<T> clazz) {
try {
// Create an instance of the target class
T instance = clazz.getDeclaredConstructor().newInstance();
BeanWrapper beanWrapper = new BeanWrapperImpl(instance);
// Register custom editor for ConfigFileTypeEnum to handle specific type conversions
beanWrapper.registerCustomEditor(ConfigFileTypeEnum.class, new ConfigFileTypeEnumEditor());
// Iterate over all property keys that match the given prefix
for (String key : getAllPropertyKeys(environment, prefix)) {
String propertyName = key.substring(prefix.length() + 1); // Remove prefix from the property key
String[] tokens = propertyName.split("\\."); // Split the property name by dot for nested properties
setPropertyValue(tokens, beanWrapper, environment.getProperty(key)); // Set the property value recursively
}
return instance;
} catch (Exception e) {
throw new RuntimeException("Unable to bind properties to " + clazz.getName(), e);
}
}
/**
* Recursively sets property values on the target object, handling nested properties and collections.
*
* @param tokens an array of property path tokens (e.g., ["nested", "property", "name"]).
* @param beanWrapper the BeanWrapper instance used to manipulate the target object.
* @param value the value to set on the target property.
*/
private static void setPropertyValue(String[] tokens, BeanWrapper beanWrapper, String value) {
for (int i = 0; i < tokens.length - 1; i++) {
String token = tokens[i];
if (token.matches(".*\\[\\d+\\]$")) { // Handle array/list property
token = token.substring(0, token.indexOf('['));
int index = Integer.parseInt(tokens[i].substring(token.length() + 1, tokens[i].length() - 1));
token = convertToCamelCase(token); // Convert token to camelCase if necessary
List<Object> list = (List<Object>) beanWrapper.getPropertyValue(token);
if (list == null) {
list = new ArrayList<>();
beanWrapper.setPropertyValue(convertToCamelCase(token), list); // Initialize the list if it's null
}
// Ensure the list has enough size to accommodate the index
if (list.size() <= index) {
try {
// Instantiate the list element if it does not exist
list.add(index, beanWrapper.getPropertyTypeDescriptor(token)
.getElementTypeDescriptor().getType().newInstance());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// Move the beanWrapper context to the current list element
beanWrapper = new BeanWrapperImpl(list.get(index));
} else { // Handle simple or nested property
Object nestedObject = beanWrapper.getPropertyValue(token);
if (nestedObject == null) {
Class<?> nestedClass = beanWrapper.getPropertyType(token);
if (Map.class.isAssignableFrom(nestedClass)) {
nestedObject = new HashMap<>(); // Initialize nested Map if necessary
} else {
try {
nestedObject = nestedClass.getDeclaredConstructor().newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
beanWrapper.setPropertyValue(convertToCamelCase(token), nestedObject);
}
// Move the beanWrapper context to the nested object
beanWrapper = new BeanWrapperImpl(nestedObject);
}
}
// Finally, set the actual property value on the resolved object
String finalPropertyName = tokens[tokens.length - 1];
Object currentObject = beanWrapper.getWrappedInstance();
if (currentObject instanceof Map) {
// If the current object is a Map, set the value as a key-value pair
((Map<String, Object>) currentObject).put(finalPropertyName, value);
} else {
// Otherwise, set it as a simple property
beanWrapper.setPropertyValue(convertToCamelCase(finalPropertyName), value);
}
}
/**
* Retrieves all property keys from the environment that start with the given prefix.
*
* @param environment the Spring Environment containing property sources.
* @param prefix the prefix to filter property keys.
* @return a set of property keys that match the prefix.
*/
private static Set<String> getAllPropertyKeys(Environment environment, String prefix) {
Set<String> keys = new HashSet<>();
// Iterate through all property sources in the environment
for (PropertySource<?> propertySource : ((ConfigurableEnvironment) environment).getPropertySources()) {
if (propertySource instanceof MapPropertySource) {
Map<String, Object> source = ((MapPropertySource) propertySource).getSource();
// Collect keys that start with the specified prefix
for (String key : source.keySet()) {
if (key.startsWith(prefix)) {
keys.add(key);
}
}
}
}
return keys;
}
/**
* Converts a dashed-separated string to camelCase.
* <p>
* For example, "my-property-name" -> "myPropertyName".
*
* @param dashed the dashed-separated string to be converted.
* @return the camelCase representation of the input string.
*/
private static String convertToCamelCase(String dashed) {
String[] parts = dashed.split("-");
return Arrays.stream(parts)
.map(part -> part.substring(0, 1).toUpperCase() + part.substring(1)) // Capitalize each part
.reduce((first, second) -> first + second) // Concatenate all parts together
.map(result -> result.substring(0, 1).toLowerCase() + result.substring(1)) // Lowercase the first letter
.orElse(dashed);
}
/**
* ConfigFileTypeEnumEditor is a custom property editor for converting string representations
* of {@link ConfigFileTypeEnum} into the corresponding enum instances.
* <p>
* This editor is useful in scenarios where properties are read as strings but need to be
* converted to enum types for further processing.
*/
public static class ConfigFileTypeEnumEditor extends PropertyEditorSupport {
/**
* Converts the given text value to the corresponding {@link ConfigFileTypeEnum} instance.
* <p>
* This method overrides the default implementation to parse the input string and convert
* it into a {@link ConfigFileTypeEnum}. If the input string does not match any known enum
* value, an {@link IllegalArgumentException} will be thrown.
*
* @param text the string representation of the enum to be converted.
* @throws IllegalArgumentException if the text does not match any known enum value.
*/
@Override
public void setAsText(String text) throws IllegalArgumentException {
setValue(ConfigFileTypeEnum.of(text));
}
}
}

@ -0,0 +1,47 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-agent-example</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>hippo4j-agent-example-core</artifactId>
<properties>
<maven.deploy.skip>true</maven.deploy.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-message</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>1.23</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>1.23</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

@ -15,7 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package cn.hippo4j.example.agent.config.apollo; package cn.hippo4j.example.agent.core.config;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
@ -46,7 +46,7 @@ public class ThreadPoolConfiguration {
// 演示 Agent 模式修改线程池 // 演示 Agent 模式修改线程池
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
public static final ThreadPoolExecutor RUN_MESSAGE_SEND_TASK_EXECUTOR = new ThreadPoolExecutor( public static final ThreadPoolExecutor AGENT_RUN_MESSAGE_SEND_TASK_EXECUTOR = new ThreadPoolExecutor(
1, 1,
10, 10,
1024, 1024,

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.example.agent.core.inittest;
import cn.hippo4j.common.executor.ThreadPoolExecutorHolder;
import cn.hippo4j.common.executor.ThreadPoolExecutorRegistry;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Test alarm send message.
*/
@Slf4j
@Component
public class AlarmSendMessageTest {
private static final int SLEEP_TIME = 10240124;
private static final int INITIAL_DELAY = 3;
private static final String RUN_MESSAGE_SEND_TASK_EXECUTOR = "runMessageSendTaskExecutor";
private static final String AGENT_RUN_MESSAGE_SEND_TASK_EXECUTOR = "cn.hippo4j.example.agent.core.config.ThreadPoolConfiguration#AGENT_RUN_MESSAGE_SEND_TASK_EXECUTOR";
/**
* Test alarm notification.
* If you need to run this single test, add @PostConstruct to the method.
*/
@SuppressWarnings("all")
@PostConstruct
public void alarmSendMessageTest() {
ScheduledExecutorService scheduledThreadPool = Executors.newSingleThreadScheduledExecutor();
scheduledThreadPool.scheduleWithFixedDelay(() -> {
ThreadPoolExecutorHolder executorHolder = ThreadPoolExecutorRegistry.getHolder(AGENT_RUN_MESSAGE_SEND_TASK_EXECUTOR);
ThreadPoolExecutor poolExecutor = executorHolder.getExecutor();
try {
poolExecutor.execute(() -> {
try {
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
} catch (Exception ex) {
log.error("Throw reject policy.", ex.getMessage());
}
}, INITIAL_DELAY, 2, TimeUnit.SECONDS);
scheduledThreadPool.scheduleWithFixedDelay(() -> {
ThreadPoolExecutorHolder executorHolder = ThreadPoolExecutorRegistry.getHolder(RUN_MESSAGE_SEND_TASK_EXECUTOR);
ThreadPoolExecutor poolExecutor = executorHolder.getExecutor();
try {
poolExecutor.execute(() -> {
try {
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
} catch (Exception ex) {
log.error("Throw reject policy.", ex.getMessage());
}
}, INITIAL_DELAY, 2, TimeUnit.SECONDS);
}
}

@ -16,6 +16,12 @@
</properties> </properties>
<dependencies> <dependencies>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-agent-example-core</artifactId>
<version>${revision}</version>
</dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId> <artifactId>spring-boot-starter</artifactId>

@ -23,7 +23,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
/** /**
* Agent config apollo example application. * Agent config apollo example application.
*/ */
@SpringBootApplication @SpringBootApplication(scanBasePackages = "cn.hippo4j.example.agent.core")
public class AgentConfigApolloExampleApplication { public class AgentConfigApolloExampleApplication {
public static void main(String[] args) { public static void main(String[] args) {

@ -16,6 +16,11 @@
</properties> </properties>
<dependencies> <dependencies>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-agent-example-core</artifactId>
<version>${revision}</version>
</dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId> <artifactId>spring-boot-starter</artifactId>

@ -25,7 +25,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
/** /**
* Agent config Nacos example application. * Agent config Nacos example application.
*/ */
@SpringBootApplication @SpringBootApplication(scanBasePackages = "cn.hippo4j.example.agent.core")
@EnableNacosConfig @EnableNacosConfig
public class AgentConfigNacosExampleApplication { public class AgentConfigNacosExampleApplication {

@ -1,55 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.example.agent.config.nacos;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Configuration
public class ThreadPoolConfiguration {
// -------------------------------------------------------------------------
// 未使用 Hippo4j原始定义线程池创建方式
// -------------------------------------------------------------------------
@Bean
public ThreadPoolExecutor runMessageSendTaskExecutor() {
LinkedBlockingQueue<Runnable> linkedBlockingQueue = new LinkedBlockingQueue<>(1024);
return new ThreadPoolExecutor(
1,
10,
1024,
TimeUnit.SECONDS,
linkedBlockingQueue);
}
// -------------------------------------------------------------------------
// 演示 Agent 模式修改线程池
// -------------------------------------------------------------------------
public static final ThreadPoolExecutor RUN_MESSAGE_SEND_TASK_EXECUTOR = new ThreadPoolExecutor(
1,
10,
1024,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1024));
}

@ -19,5 +19,6 @@
<modules> <modules>
<module>config-apollo</module> <module>config-apollo</module>
<module>config-nacos</module> <module>config-nacos</module>
<module>agent-example-core</module>
</modules> </modules>
</project> </project>

@ -56,6 +56,8 @@ public class Constants {
public static final String GENERAL_SPLIT_SYMBOL = ","; public static final String GENERAL_SPLIT_SYMBOL = ",";
public static final String DOT_SPLIT_SYMBOL = ".";
public static final String IDENTIFY_SLICER_SYMBOL = "_"; public static final String IDENTIFY_SLICER_SYMBOL = "_";
public static final String LONG_POLLING_LINE_SEPARATOR = "\r\n"; public static final String LONG_POLLING_LINE_SEPARATOR = "\r\n";
@ -128,5 +130,12 @@ public class Constants {
public static final String CONFIGURATION_PROPERTIES_PREFIX = "spring.dynamic.thread-pool"; public static final String CONFIGURATION_PROPERTIES_PREFIX = "spring.dynamic.thread-pool";
public static final String EXECUTORS = "executors";
public static final long NO_REJECT_COUNT_NUM = -1L; public static final long NO_REJECT_COUNT_NUM = -1L;
public static final String DYNAMIC_THREAD_POOL_EXECUTOR = "cn.hippo4j.core.executor.DynamicThreadPoolExecutor";
public static final int DEFAULT_INTERVAL = 5;
} }

Loading…
Cancel
Save