feat:Agent dynamic monitor Initialize

pull/1572/head
Pan-YuJie 1 year ago
parent 1404235aae
commit 6ee942ef7c

@ -38,11 +38,19 @@
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency> <dependency>
<groupId>com.ctrip.framework.apollo</groupId> <groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-client</artifactId> <artifactId>apollo-client</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.alibaba.nacos</groupId> <groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId> <artifactId>nacos-client</artifactId>

@ -49,5 +49,30 @@
<version>${project.version}</version> <version>${project.version}</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-adapter-web</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-monitor-elasticsearch</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-monitor-local-log</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-monitor-micrometer</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>

@ -1,200 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.agent.plugin.spring.common.alarm;
import cn.hippo4j.agent.plugin.spring.common.support.ThreadPoolCheckAlarmSupport;
import cn.hippo4j.common.api.IExecutorProperties;
import cn.hippo4j.common.model.executor.ExecutorNotifyProperties;
import cn.hippo4j.common.model.executor.ExecutorProperties;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.NotifyPlatformProperties;
import cn.hippo4j.threadpool.message.api.NotifyConfigBuilder;
import cn.hippo4j.threadpool.message.api.NotifyConfigDTO;
import cn.hippo4j.threadpool.message.core.service.AlarmControlHandler;
import lombok.AllArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import static cn.hippo4j.agent.plugin.spring.common.support.SpringPropertiesLoader.BOOTSTRAP_CONFIG_PROPERTIES;
import static cn.hippo4j.common.constant.Constants.DEFAULT_INTERVAL;
/**
* This class is responsible for building the notification configurations for thread pools in an agent mode.
* It implements the {@link NotifyConfigBuilder} interface and provides methods to build and initialize
* notification configurations for various platforms and types (e.g., ALARM, CONFIG).
*
* <p>The configuration is based on the properties loaded from the bootstrap configuration and includes
* handling for alarm control and notification intervals.</p>
*/
@AllArgsConstructor
public class AgentModeNotifyConfigBuilder implements NotifyConfigBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolCheckAlarmSupport.class);
private final AlarmControlHandler alarmControlHandler;
/**
* Builds the notification configurations for all executors defined in the bootstrap configuration.
*
* <p>This method filters the executors based on their alarm settings and constructs the notification
* configurations accordingly. If global alarm settings are disabled and there are no specific alarms
* configured for any executor, the method returns an empty map.</p>
*
* @return A map containing the notification configurations, keyed by the notification type (e.g., ALARM, CONFIG).
*/
public Map<String, List<NotifyConfigDTO>> buildNotify() {
Map<String, List<NotifyConfigDTO>> resultMap = new HashMap<>();
boolean globalAlarm = Optional.ofNullable(BOOTSTRAP_CONFIG_PROPERTIES.getDefaultExecutor())
.map(ExecutorProperties::getAlarm)
.orElse(true);
List<ExecutorProperties> executors = BOOTSTRAP_CONFIG_PROPERTIES.getExecutors();
if (CollectionUtil.isEmpty(executors)) {
LOGGER.warn("Failed to build notify, executors configuration is empty.");
return resultMap;
}
List<ExecutorProperties> actual = executors.stream()
.filter(each -> Optional.ofNullable(each.getAlarm())
.orElse(false))
.collect(Collectors.toList());
if (!globalAlarm && CollectionUtil.isEmpty(actual)) {
return resultMap;
}
for (ExecutorProperties executorProperties : executors) {
Map<String, List<NotifyConfigDTO>> buildSingleNotifyConfig = buildSingleNotifyConfig(executorProperties);
initCacheAndLock(buildSingleNotifyConfig);
resultMap.putAll(buildSingleNotifyConfig);
}
return resultMap;
}
/**
* Builds the notification configurations for a single executor.
*
* <p>This method generates two types of notifications: ALARM and CONFIG. For each type, it creates
* notification configurations based on the platforms defined in the bootstrap configuration.</p>
*
* @param executorProperties The properties of the executor for which to build the notification configurations.
* @return A map containing the notification configurations for the given executor, keyed by the notification type.
*/
public Map<String, List<NotifyConfigDTO>> buildSingleNotifyConfig(IExecutorProperties executorProperties) {
String threadPoolId = executorProperties.getThreadPoolId();
Map<String, List<NotifyConfigDTO>> resultMap = new HashMap<>();
String alarmBuildKey = threadPoolId + "+ALARM";
List<NotifyConfigDTO> alarmNotifyConfigs = new ArrayList<>();
List<NotifyPlatformProperties> notifyPlatforms = BOOTSTRAP_CONFIG_PROPERTIES.getNotifyPlatforms();
for (NotifyPlatformProperties platformProperties : notifyPlatforms) {
NotifyConfigDTO notifyConfig = new NotifyConfigDTO();
notifyConfig.setPlatform(platformProperties.getPlatform());
notifyConfig.setTpId(threadPoolId);
notifyConfig.setType("ALARM");
notifyConfig.setSecret(platformProperties.getSecret());
notifyConfig.setSecretKey(getToken(platformProperties));
notifyConfig.setInterval(buildInterval(executorProperties));
notifyConfig.setReceives(buildReceive(executorProperties));
alarmNotifyConfigs.add(notifyConfig);
}
resultMap.put(alarmBuildKey, alarmNotifyConfigs);
String changeBuildKey = threadPoolId + "+CONFIG";
List<NotifyConfigDTO> changeNotifyConfigs = new ArrayList<>();
for (NotifyPlatformProperties platformProperties : notifyPlatforms) {
NotifyConfigDTO notifyConfig = new NotifyConfigDTO();
notifyConfig.setPlatform(platformProperties.getPlatform());
notifyConfig.setTpId(threadPoolId);
notifyConfig.setType("CONFIG");
notifyConfig.setSecretKey(getToken(platformProperties));
notifyConfig.setSecret(platformProperties.getSecret());
notifyConfig.setReceives(buildReceive(executorProperties));
changeNotifyConfigs.add(notifyConfig);
}
resultMap.put(changeBuildKey, changeNotifyConfigs);
return resultMap;
}
/**
* Retrieves the token for the given notification platform properties.
*
* <p>If the token is not explicitly set, the method returns the secret key as the fallback.</p>
*
* @param platformProperties The platform properties from which to retrieve the token.
* @return The token or secret key associated with the given platform properties.
*/
private String getToken(NotifyPlatformProperties platformProperties) {
return StringUtil.isNotBlank(platformProperties.getToken()) ? platformProperties.getToken() : platformProperties.getSecretKey();
}
/**
* Builds the notification interval for the given executor properties.
*
* <p>This method first checks the executor's specific notify configuration. If not set, it falls back
* to the default executor configuration in the bootstrap properties.</p>
*
* @param executorProperties The properties of the executor for which to build the notification interval.
* @return The notification interval in seconds.
*/
private int buildInterval(IExecutorProperties executorProperties) {
return Optional.ofNullable(executorProperties.getNotify())
.map(ExecutorNotifyProperties::getInterval)
.orElse(Optional.ofNullable(BOOTSTRAP_CONFIG_PROPERTIES.getDefaultExecutor())
.map(ExecutorProperties::getNotify)
.map(ExecutorNotifyProperties::getInterval)
.orElse(DEFAULT_INTERVAL));
}
/**
* Builds the notification recipients for the given executor properties.
*
* <p>This method first checks the executor's specific notify configuration. If not set, it falls back
* to the default executor configuration in the bootstrap properties.</p>
*
* @param executorProperties The properties of the executor for which to build the notification recipients.
* @return A string containing the recipients of the notifications.
*/
private String buildReceive(IExecutorProperties executorProperties) {
return Optional.ofNullable(executorProperties.getNotify())
.map(ExecutorNotifyProperties::getReceives)
.orElse(Optional.ofNullable(BOOTSTRAP_CONFIG_PROPERTIES.getDefaultExecutor())
.map(ExecutorProperties::getNotify)
.map(ExecutorNotifyProperties::getReceives).orElse(""));
}
/**
* Initializes the cache and lock mechanisms for the given notification configurations.
*
* <p>This method is primarily responsible for setting up alarm controls based on the notification
* configurations, ensuring that the appropriate cache and lock mechanisms are initialized for
* each thread pool and platform combination.</p>
*
* @param buildSingleNotifyConfig A map containing the notification configurations that need cache and lock initialization.
*/
public void initCacheAndLock(Map<String, List<NotifyConfigDTO>> buildSingleNotifyConfig) {
buildSingleNotifyConfig.forEach(
(key, val) -> val.stream()
.filter(each -> Objects.equals("ALARM", each.getType()))
.forEach(each -> alarmControlHandler.initCacheAndLock(each.getTpId(), each.getPlatform(), each.getInterval())));
}
}

@ -30,11 +30,13 @@ public class SpringBootConfig {
/** /**
* Spring * Spring
*/ */
@SpringBootConfigNode(root = SpringBootConfig.class)
public static class Spring { public static class Spring {
/** /**
* Dynamic * Dynamic
*/ */
@SpringBootConfigNode(root = SpringBootConfig.class)
public static class Dynamic { public static class Dynamic {
/** /**
@ -79,6 +81,9 @@ public class SpringBootConfig {
public static Long initialDelay = 10000L; public static Long initialDelay = 10000L;
public static Long collectInterval = 5000L; public static Long collectInterval = 5000L;
public static Integer AGENT_MICROMETER_PORT;
} }
public static String CONFIG_FILE_TYPE; public static String CONFIG_FILE_TYPE;

@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.agent.plugin.spring.common.monitor;
import cn.hippo4j.common.extension.spi.ServiceLoaderRegistry;
import cn.hippo4j.common.logging.api.ILog;
import cn.hippo4j.common.logging.api.LogManager;
import cn.hippo4j.common.monitor.MonitorCollectTypeEnum;
import cn.hippo4j.common.monitor.MonitorHandlerTypeEnum;
import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler;
import cn.hippo4j.core.toolkit.inet.InetUtils;
import cn.hippo4j.core.toolkit.inet.InetUtilsProperties;
import cn.hippo4j.monitor.elasticsearch.AdapterThreadPoolElasticSearchMonitorHandler;
import cn.hippo4j.monitor.elasticsearch.DynamicThreadPoolElasticSearchMonitorHandler;
import cn.hippo4j.monitor.elasticsearch.WebThreadPoolElasticSearchMonitorHandler;
import cn.hippo4j.monitor.local.log.AdapterThreadPoolLocalLogMonitorHandler;
import cn.hippo4j.monitor.local.log.DynamicThreadPoolLocalLogMonitorHandler;
import cn.hippo4j.monitor.local.log.WebThreadPoolLocalLogMonitorHandler;
import cn.hippo4j.monitor.micrometer.AdapterThreadPoolMicrometerMonitorHandler;
import cn.hippo4j.monitor.micrometer.DynamicThreadPoolMicrometerMonitorHandler;
import cn.hippo4j.monitor.micrometer.WebThreadPoolMicrometerMonitorHandler;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.MonitorProperties;
import cn.hippo4j.threadpool.monitor.api.ThreadPoolMonitor;
import org.springframework.core.env.ConfigurableEnvironment;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import static cn.hippo4j.agent.plugin.spring.common.support.SpringPropertiesLoader.INET_UTILS_PROPERTIES;
/**
* This class is responsible for configuring and initializing monitoring handlers
* for various types of thread pools. It maps specific monitoring types (e.g., Micrometer,
* Log, Elasticsearch) to their corresponding handler initializers and manages the
* setup process based on the provided configuration.
*/
public class MonitorHandlersConfigurator {
private static final ILog LOGGER = LogManager.getLogger(MonitorHandlersConfigurator.class);
// Maps thread pool types to their corresponding handler constructors
private static final Map<String, BiConsumer<MonitorHandlerTypeEnum, MonitorHandlerContext>> handlerMap = new HashMap<>();
static {
// Initialize the handler map with specific monitoring types
handlerMap.put(MonitorCollectTypeEnum.MICROMETER.getValue(), MonitorHandlersConfigurator::handleMicrometer);
handlerMap.put(MonitorCollectTypeEnum.LOG.getValue(), MonitorHandlersConfigurator::handleLog);
handlerMap.put(MonitorCollectTypeEnum.ELASTICSEARCH.getValue(), MonitorHandlersConfigurator::handleElasticSearch);
}
/**
* Initializes the monitoring handlers based on the provided monitoring configuration.
* <p>
* This method performs the following tasks:
* <ul>
* <li>Parses the configured monitoring types and thread pool types.</li>
* <li>Initializes a monitoring context with the necessary thread pool monitors and state handler.</li>
* <li>For each configured monitoring type, invokes the corresponding handler initializer
* for each relevant thread pool type.</li>
* <li>Logs a warning if an unrecognized monitoring type is encountered.</li>
* <li>Registers and adds thread pool monitors that match the configured monitoring types.</li>
* </ul>
*
* @param monitor The monitoring properties configuration.
* @param environment The application environment from which additional configuration can be loaded.
* @param threadPoolMonitors A list to hold the initialized thread pool monitors.
*/
public static void initializeMonitorHandlers(MonitorProperties monitor, ConfigurableEnvironment environment, List<ThreadPoolMonitor> threadPoolMonitors) {
List<String> collectTypes = Arrays.asList(monitor.getCollectTypes().split(","));
List<String> threadPoolTypes = Arrays.asList(monitor.getThreadPoolTypes().split(","));
ThreadPoolRunStateHandler threadPoolRunStateHandler = new ThreadPoolRunStateHandler(
new InetUtils(INET_UTILS_PROPERTIES), environment);
MonitorHandlerContext context = new MonitorHandlerContext(threadPoolMonitors, threadPoolRunStateHandler);
// Initialize handlers for each configured monitoring type and thread pool type
for (String collectType : collectTypes) {
if (handlerMap.containsKey(collectType)) {
for (MonitorHandlerTypeEnum type : MonitorHandlerTypeEnum.values()) {
if (threadPoolTypes.contains(type.name().toLowerCase())) {
handlerMap.get(collectType).accept(type, context);
}
}
} else {
LOGGER.warn("[Hippo4j-Agent] MonitorConfigurator initialize Unrecognized collect type: [{}]", collectType);
}
}
// Register and add dynamic thread pool monitors matching the configured types
Collection<ThreadPoolMonitor> dynamicThreadPoolMonitors = ServiceLoaderRegistry.getSingletonServiceInstances(ThreadPoolMonitor.class);
dynamicThreadPoolMonitors.stream().filter(each -> collectTypes.contains(each.getType())).forEach(threadPoolMonitors::add);
}
/**
* Initializes Micrometer-based monitoring handlers for the specified thread pool type.
*
* @param type The type of thread pool to be monitored.
* @param context The context containing the monitors and state handler.
*/
private static void handleMicrometer(MonitorHandlerTypeEnum type, MonitorHandlerContext context) {
switch (type) {
case DYNAMIC:
context.monitors.add(new DynamicThreadPoolMicrometerMonitorHandler(context.threadPoolRunStateHandler));
break;
case WEB:
context.monitors.add(new WebThreadPoolMicrometerMonitorHandler());
break;
case ADAPTER:
context.monitors.add(new AdapterThreadPoolMicrometerMonitorHandler());
break;
}
}
/**
* Initializes Log-based monitoring handlers for the specified thread pool type.
*
* @param type The type of thread pool to be monitored.
* @param context The context containing the monitors and state handler.
*/
private static void handleLog(MonitorHandlerTypeEnum type, MonitorHandlerContext context) {
switch (type) {
case DYNAMIC:
context.monitors.add(new DynamicThreadPoolLocalLogMonitorHandler(context.threadPoolRunStateHandler));
break;
case WEB:
context.monitors.add(new WebThreadPoolLocalLogMonitorHandler());
break;
case ADAPTER:
context.monitors.add(new AdapterThreadPoolLocalLogMonitorHandler());
break;
}
}
/**
* Initializes Elasticsearch-based monitoring handlers for the specified thread pool type.
*
* @param type The type of thread pool to be monitored.
* @param context The context containing the monitors and state handler.
*/
private static void handleElasticSearch(MonitorHandlerTypeEnum type, MonitorHandlerContext context) {
switch (type) {
case DYNAMIC:
context.monitors.add(new DynamicThreadPoolElasticSearchMonitorHandler(context.threadPoolRunStateHandler));
break;
case WEB:
context.monitors.add(new WebThreadPoolElasticSearchMonitorHandler(context.threadPoolRunStateHandler));
break;
case ADAPTER:
context.monitors.add(new AdapterThreadPoolElasticSearchMonitorHandler(context.threadPoolRunStateHandler));
break;
}
}
/**
* A helper class to manage the context in which monitoring handlers are initialized.
*/
private static class MonitorHandlerContext {
List<ThreadPoolMonitor> monitors;
ThreadPoolRunStateHandler threadPoolRunStateHandler;
MonitorHandlerContext(List<ThreadPoolMonitor> monitors, ThreadPoolRunStateHandler handler) {
this.monitors = monitors;
this.threadPoolRunStateHandler = handler;
}
}
}

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.agent.plugin.spring.common.monitor;
import cn.hippo4j.agent.plugin.spring.common.conf.SpringBootConfig;
import cn.hippo4j.common.logging.api.ILog;
import cn.hippo4j.common.logging.api.LogManager;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
/**
* This class is responsible for exposing Prometheus metrics via an HTTP endpoint.
* It initializes the Prometheus registry, binds it to the global metrics registry,
* and starts an HTTP server to serve the metrics data.
*/
public class MonitorMetricEndpoint {
private static final ILog LOGGER = LogManager.getLogger(MonitorHandlersConfigurator.class);
/**
* Starts the Prometheus metrics HTTP server.
* <p>
* This method performs the following steps:
* <ul>
* <li>Initializes the PrometheusMeterRegistry with the default configuration.</li>
* <li>Binds the Prometheus registry to the global CompositeMeterRegistry.</li>
* <li>Attempts to start an HTTP server on the configured port to expose the Prometheus metrics.</li>
* </ul>
* If the port is not configured, or if there is an error starting the server, appropriate error messages
* are logged, and the method returns without starting the server.
* </p>
*/
public static void startPrometheusEndpoint() {
// Initialize the Prometheus registry
PrometheusMeterRegistry prometheusRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
// Bind the Prometheus registry to the global Metrics registry
CompositeMeterRegistry globalRegistry = Metrics.globalRegistry;
globalRegistry.add(prometheusRegistry);
// Get the configured port for the Prometheus metrics HTTP server
Integer port = SpringBootConfig.Spring.Dynamic.Thread_Pool.Monitor.AGENT_MICROMETER_PORT;
if (port == null) {
LOGGER.error(
"[Hippo4j-Agent] Failed to start Prometheus metrics endpoint server. Please configure the exposed endpoint by adding: spring.dynamic.thread-pool.monitor.port=xxx to the configuration file");
return;
}
// Create the HTTP server
HttpServer server = null;
try {
server = HttpServer.create(new InetSocketAddress(port), 0);
} catch (IOException e) {
LOGGER.error("[Hippo4j-Agent] Failed to start Prometheus metrics endpoint server", e);
return;
}
// Register the /actuator/prometheus context to handle metrics requests
server.createContext("/actuator/prometheus", exchange -> {
String response = prometheusRegistry.scrape(); // Get metrics data in Prometheus format
exchange.sendResponseHeaders(200, response.getBytes().length);
try (OutputStream os = exchange.getResponseBody()) {
os.write(response.getBytes());
}
});
// Start the server
server.start();
LOGGER.info("[Hippo4j-Agent] Prometheus metrics server started on port {}", port);
}
}

@ -21,6 +21,7 @@ import cn.hippo4j.agent.core.boot.SpringBootConfigInitializer;
import cn.hippo4j.agent.plugin.spring.common.toolkit.SpringPropertyBinder; import cn.hippo4j.agent.plugin.spring.common.toolkit.SpringPropertyBinder;
import cn.hippo4j.common.logging.api.ILog; import cn.hippo4j.common.logging.api.ILog;
import cn.hippo4j.common.logging.api.LogManager; import cn.hippo4j.common.logging.api.LogManager;
import cn.hippo4j.core.toolkit.inet.InetUtilsProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties; import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import org.springframework.core.env.ConfigurableEnvironment; import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.EnumerablePropertySource; import org.springframework.core.env.EnumerablePropertySource;
@ -42,6 +43,8 @@ public class SpringPropertiesLoader {
public static BootstrapConfigProperties BOOTSTRAP_CONFIG_PROPERTIES = new BootstrapConfigProperties(); public static BootstrapConfigProperties BOOTSTRAP_CONFIG_PROPERTIES = new BootstrapConfigProperties();
public static InetUtilsProperties INET_UTILS_PROPERTIES = new InetUtilsProperties();
public static void loadSpringProperties(ConfigurableEnvironment environment) { public static void loadSpringProperties(ConfigurableEnvironment environment) {
Iterator<PropertySource<?>> iterator = environment.getPropertySources().iterator(); Iterator<PropertySource<?>> iterator = environment.getPropertySources().iterator();
Properties properties = new Properties(); Properties properties = new Properties();
@ -83,8 +86,11 @@ public class SpringPropertiesLoader {
environment.getPropertySources().addFirst(propertySource); environment.getPropertySources().addFirst(propertySource);
// initialize BootstrapConfigProperties // initialize BootstrapConfigProperties
BOOTSTRAP_CONFIG_PROPERTIES = SpringPropertyBinder.bindProperties(environment, PREFIX, BootstrapConfigProperties.class); BOOTSTRAP_CONFIG_PROPERTIES = SpringPropertyBinder.bindProperties(environment, PREFIX, BootstrapConfigProperties.class);
INET_UTILS_PROPERTIES = SpringPropertyBinder.bindProperties(environment, InetUtilsProperties.PREFIX, InetUtilsProperties.class);
// Enable the thread pool check alert handler
ThreadPoolCheckAlarmSupport.enableThreadPoolCheckAlarmHandler(); ThreadPoolCheckAlarmSupport.enableThreadPoolCheckAlarmHandler();
// Enable thread pool monitor handler
ThreadPoolMonitorSupport.enableThreadPoolMonitorHandler(environment);
} }
} }

@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.agent.plugin.spring.common.support;
import cn.hippo4j.agent.plugin.spring.common.monitor.MonitorHandlersConfigurator;
import cn.hippo4j.agent.plugin.spring.common.monitor.MonitorMetricEndpoint;
import cn.hippo4j.common.executor.ThreadFactoryBuilder;
import cn.hippo4j.common.executor.ThreadPoolExecutorRegistry;
import cn.hippo4j.common.extension.spi.ServiceLoaderRegistry;
import cn.hippo4j.common.monitor.MonitorCollectTypeEnum;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.MonitorProperties;
import cn.hippo4j.threadpool.monitor.api.ThreadPoolMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.Environment;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static cn.hippo4j.agent.plugin.spring.common.support.SpringPropertiesLoader.BOOTSTRAP_CONFIG_PROPERTIES;
/**
* This class provides support for monitoring dynamic thread pools in an application.
* It includes methods to initialize and enable monitoring components, and schedules
* periodic data collection from the thread pools.
*/
public class ThreadPoolMonitorSupport {
private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolMonitorSupport.class);
private static List<ThreadPoolMonitor> threadPoolMonitors;
static {
// Register the ThreadPoolMonitor service with the ServiceLoaderRegistry
ServiceLoaderRegistry.register(ThreadPoolMonitor.class);
}
/**
* Enables the dynamic thread pool monitoring handler.
* <p>
* This method performs the following steps:
* <ul>
* <li>Validates the monitoring configuration from the environment properties.</li>
* <li>Initializes monitoring components for the dynamic thread pools.</li>
* <li>Exposes metric endpoints, such as Prometheus, if configured.</li>
* <li>Schedules periodic collection of metrics from the thread pools.</li>
* </ul>
* If the monitoring configuration is invalid or disabled, the method returns without
* enabling the monitoring handler.
* </p>
*
* @param environment The environment from which the monitoring configuration is loaded.
*/
public static void enableThreadPoolMonitorHandler(Environment environment) {
BootstrapConfigProperties properties = BOOTSTRAP_CONFIG_PROPERTIES;
MonitorProperties monitor = properties.getMonitor();
if (Objects.isNull(monitor)
|| !monitor.getEnable()
|| StringUtil.isBlank(monitor.getThreadPoolTypes())
|| StringUtil.isBlank(monitor.getCollectTypes())) {
return;
}
LOGGER.info("[Hippo4j-Agent] Start monitoring the running status of dynamic thread pools.");
threadPoolMonitors = new ArrayList<>();
ScheduledExecutorService collectScheduledExecutor = new ScheduledThreadPoolExecutor(
1,
r -> new Thread(r, "client.agent.scheduled.collect.data"));
// Initialize monitoring components for the dynamic thread pools
MonitorHandlersConfigurator.initializeMonitorHandlers(monitor, (ConfigurableEnvironment) environment, threadPoolMonitors);
// Expose metric endpoints based on the configured collect types
List<String> collectTypes = Arrays.asList(monitor.getCollectTypes().split(","));
if (collectTypes.contains(MonitorCollectTypeEnum.MICROMETER.getValue())) {
MonitorMetricEndpoint.startPrometheusEndpoint();
}
// Schedule periodic collection of metrics from the thread pools
collectScheduledExecutor.scheduleWithFixedDelay(
scheduleRunnable(),
monitor.getInitialDelay(),
monitor.getCollectInterval(),
TimeUnit.MILLISECONDS);
if (ThreadPoolExecutorRegistry.getThreadPoolExecutorSize() > 0) {
LOGGER.info("[Hippo4j-Agent] Dynamic thread pool: [{}]. The dynamic thread pool starts data collection and reporting.", ThreadPoolExecutorRegistry.getThreadPoolExecutorSize());
}
}
/**
* Returns a Runnable task that collects metrics from the dynamic thread pools.
* <p>
* This method is used to create a task that periodically iterates over the
* registered thread pool monitors and collects their metrics. If an exception
* occurs during the collection, it is logged.
* </p>
*
* @return A Runnable task that performs the metrics collection.
*/
private static Runnable scheduleRunnable() {
return () -> {
for (ThreadPoolMonitor each : threadPoolMonitors) {
try {
each.collect();
} catch (Exception ex) {
LOGGER.error("[Hippo4j-Agent] Error monitoring the running status of dynamic thread pool. Type: {}", each.getType(), ex);
}
}
};
}
}

@ -48,7 +48,7 @@ public class AlarmSendMessageTest {
* If you need to run this single test, add @PostConstruct to the method. * If you need to run this single test, add @PostConstruct to the method.
*/ */
@SuppressWarnings("all") @SuppressWarnings("all")
@PostConstruct // @PostConstruct
public void alarmSendMessageTest() { public void alarmSendMessageTest() {
ScheduledExecutorService scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(); ScheduledExecutorService scheduledThreadPool = Executors.newSingleThreadScheduledExecutor();
scheduledThreadPool.scheduleWithFixedDelay(() -> { scheduledThreadPool.scheduleWithFixedDelay(() -> {

@ -44,14 +44,6 @@
<artifactId>slf4j-api</artifactId> <artifactId>slf4j-api</artifactId>
<version>1.7.21</version> <version>1.7.21</version>
</dependency> </dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.common.monitor;
/**
* MonitorCollect type enum.
*/
public enum MonitorCollectTypeEnum {
/**
* Micrometer
*/
MICROMETER("micrometer"),
/**
* ELASTICSEARCH
*/
ELASTICSEARCH("elasticsearch"),
/**
* LOG
*/
LOG("log");
private final String value;
MonitorCollectTypeEnum(String value) {
this.value = value;
}
public String getValue() {
return value;
}
}

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.common.monitor;
/**
* MonitorHandler type enum.
*/
public enum MonitorHandlerTypeEnum {
/**
* DYNAMIC
*/
DYNAMIC,
/**
* WEB
*/
WEB,
/**
* ADAPTER
*/
ADAPTER
}

@ -51,4 +51,9 @@ public class MonitorProperties {
* Collect interval. unit: ms * Collect interval. unit: ms
*/ */
private Long collectInterval = 5000L; private Long collectInterval = 5000L;
/**
* Agent micrometer exposed port
*/
private Integer agentMicrometerPort;
} }

Loading…
Cancel
Save