From fc981a9839d312d3073fc9210803f810ef357145 Mon Sep 17 00:00:00 2001 From: Pan-YuJie <646836760@qq.com> Date: Thu, 12 Sep 2024 15:36:11 +0800 Subject: [PATCH] refactor:Agent Listener logic, add configuration refreshes platform push, and carries the unique application ID --- ...ynamicThreadPoolChangeHandlerSpring2x.java | 1 + .../alarm/AgentModeNotifyConfigBuilder.java | 218 ++++++++++++++++++ .../DynamicThreadPoolRefreshListener.java | 123 +++++++++- .../support/ThreadPoolCheckAlarmSupport.java | 32 ++- 4 files changed, 362 insertions(+), 12 deletions(-) create mode 100644 agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/alarm/AgentModeNotifyConfigBuilder.java rename {kernel/dynamic/mode/config/src/main/java/cn/hippo4j/threadpool/dynamic/mode/config/refresher => agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common}/event/DynamicThreadPoolRefreshListener.java (50%) diff --git a/agent/hippo4j-agent-plugin/spring-plugins/spring-boot-2x-plugin/src/main/java/cn/hippo4j/agent/plugin/spring/boot/v2/NacosDynamicThreadPoolChangeHandlerSpring2x.java b/agent/hippo4j-agent-plugin/spring-plugins/spring-boot-2x-plugin/src/main/java/cn/hippo4j/agent/plugin/spring/boot/v2/NacosDynamicThreadPoolChangeHandlerSpring2x.java index e13517ad..83dd7f35 100644 --- a/agent/hippo4j-agent-plugin/spring-plugins/spring-boot-2x-plugin/src/main/java/cn/hippo4j/agent/plugin/spring/boot/v2/NacosDynamicThreadPoolChangeHandlerSpring2x.java +++ b/agent/hippo4j-agent-plugin/spring-plugins/spring-boot-2x-plugin/src/main/java/cn/hippo4j/agent/plugin/spring/boot/v2/NacosDynamicThreadPoolChangeHandlerSpring2x.java @@ -112,6 +112,7 @@ public class NacosDynamicThreadPoolChangeHandlerSpring2x extends AbstractConfigT }; // Add the listener to the Nacos ConfigService configService.addListener(dataId, group, configChangeListener); + LOGGER.info("[Hippo4j-Agent] Dynamic thread pool refresher, add Nacos listener successfully. namespace: {} data-id: {} group: {}", namespace, dataId, group); } catch (Exception e) { LOGGER.error(e, "[Hippo4j-Agent] Dynamic thread pool refresher, add Nacos listener failure. namespace: {} data-id: {} group: {}", namespace, dataId, group); } diff --git a/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/alarm/AgentModeNotifyConfigBuilder.java b/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/alarm/AgentModeNotifyConfigBuilder.java new file mode 100644 index 00000000..9e8c1ce1 --- /dev/null +++ b/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/alarm/AgentModeNotifyConfigBuilder.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.agent.plugin.spring.common.alarm; + +import cn.hippo4j.adapter.web.WebThreadPoolService; +import cn.hippo4j.agent.plugin.spring.common.support.ThreadPoolCheckAlarmSupport; +import cn.hippo4j.common.api.IExecutorProperties; +import cn.hippo4j.common.model.executor.ExecutorNotifyProperties; +import cn.hippo4j.common.model.executor.ExecutorProperties; +import cn.hippo4j.common.toolkit.CollectionUtil; +import cn.hippo4j.common.toolkit.StringUtil; +import cn.hippo4j.threadpool.dynamic.mode.config.properties.NotifyPlatformProperties; +import cn.hippo4j.threadpool.dynamic.mode.config.properties.WebExecutorProperties; +import cn.hippo4j.threadpool.message.api.NotifyConfigBuilder; +import cn.hippo4j.threadpool.message.api.NotifyConfigDTO; +import cn.hippo4j.threadpool.message.core.service.AlarmControlHandler; +import cn.hippo4j.threadpool.message.core.service.ThreadPoolBaseSendMessageService; +import lombok.AllArgsConstructor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; + +import static cn.hippo4j.agent.plugin.spring.common.support.SpringPropertiesLoader.BOOTSTRAP_CONFIG_PROPERTIES; +import static cn.hippo4j.common.constant.Constants.DEFAULT_INTERVAL; + +/** + * This class is responsible for building the notification configurations for thread pools in an agent mode. + * It implements the {@link NotifyConfigBuilder} interface and provides methods to build and initialize + * notification configurations for various platforms and types (e.g., ALARM, CONFIG). + * + *

The configuration is based on the properties loaded from the bootstrap configuration and includes + * handling for alarm control and notification intervals.

+ * + * TODO: This is copied from {@link cn.hippo4j.config.springboot.starter.notify.ConfigModeNotifyConfigBuilder} and can be refactored later + */ +@AllArgsConstructor +public class AgentModeNotifyConfigBuilder implements NotifyConfigBuilder { + + private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolCheckAlarmSupport.class); + + private final AlarmControlHandler alarmControlHandler; + + private final WebThreadPoolService webThreadPoolService; + + /** + * Builds the notification configurations for all executors defined in the bootstrap configuration. + * + *

This method filters the executors based on their alarm settings and constructs the notification + * configurations accordingly. If global alarm settings are disabled and there are no specific alarms + * configured for any executor, the method returns an empty map.

+ * + * @return A map containing the notification configurations, keyed by the notification type (e.g., ALARM, CONFIG). + */ + public Map> buildNotify() { + Map> resultMap = new HashMap<>(); + boolean globalAlarm = Optional.ofNullable(BOOTSTRAP_CONFIG_PROPERTIES.getDefaultExecutor()) + .map(ExecutorProperties::getAlarm) + .orElse(true); + List executors = BOOTSTRAP_CONFIG_PROPERTIES.getExecutors(); + if (CollectionUtil.isEmpty(executors)) { + LOGGER.warn("Failed to build notify, executors configuration is empty."); + return resultMap; + } + List actual = executors.stream() + .filter(each -> Optional.ofNullable(each.getAlarm()) + .orElse(false)) + .collect(Collectors.toList()); + if (!globalAlarm && CollectionUtil.isEmpty(actual)) { + return resultMap; + } + for (ExecutorProperties executorProperties : executors) { + Map> buildSingleNotifyConfig = buildSingleNotifyConfig(executorProperties); + initCacheAndLock(buildSingleNotifyConfig); + resultMap.putAll(buildSingleNotifyConfig); + } + // register notify config for web + WebExecutorProperties webProperties = BOOTSTRAP_CONFIG_PROPERTIES.getWeb(); + if (webProperties == null) { + return resultMap; + } + if (StringUtil.isBlank(webProperties.getThreadPoolId())) { + webProperties.setThreadPoolId(webThreadPoolService.getWebContainerType().getName()); + } + Map> webSingleNotifyConfigMap = buildSingleNotifyConfig(webProperties); + initCacheAndLock(webSingleNotifyConfigMap); + resultMap.putAll(webSingleNotifyConfigMap); + + return resultMap; + } + + /** + * Builds the notification configurations for a single executor. + * + *

This method generates two types of notifications: ALARM and CONFIG. For each type, it creates + * notification configurations based on the platforms defined in the bootstrap configuration.

+ * + * @param executorProperties The properties of the executor for which to build the notification configurations. + * @return A map containing the notification configurations for the given executor, keyed by the notification type. + */ + public Map> buildSingleNotifyConfig(IExecutorProperties executorProperties) { + String threadPoolId = executorProperties.getThreadPoolId(); + Map> resultMap = new HashMap<>(); + String alarmBuildKey = threadPoolId + "+ALARM"; + List alarmNotifyConfigs = new ArrayList<>(); + List notifyPlatforms = BOOTSTRAP_CONFIG_PROPERTIES.getNotifyPlatforms(); + for (NotifyPlatformProperties platformProperties : notifyPlatforms) { + NotifyConfigDTO notifyConfig = new NotifyConfigDTO(); + notifyConfig.setPlatform(platformProperties.getPlatform()); + notifyConfig.setTpId(threadPoolId); + notifyConfig.setType("ALARM"); + notifyConfig.setSecret(platformProperties.getSecret()); + notifyConfig.setSecretKey(getToken(platformProperties)); + notifyConfig.setInterval(buildInterval(executorProperties)); + notifyConfig.setReceives(buildReceive(executorProperties)); + alarmNotifyConfigs.add(notifyConfig); + } + resultMap.put(alarmBuildKey, alarmNotifyConfigs); + String changeBuildKey = threadPoolId + "+CONFIG"; + List changeNotifyConfigs = new ArrayList<>(); + for (NotifyPlatformProperties platformProperties : notifyPlatforms) { + NotifyConfigDTO notifyConfig = new NotifyConfigDTO(); + notifyConfig.setPlatform(platformProperties.getPlatform()); + notifyConfig.setTpId(threadPoolId); + notifyConfig.setType("CONFIG"); + notifyConfig.setSecretKey(getToken(platformProperties)); + notifyConfig.setSecret(platformProperties.getSecret()); + notifyConfig.setReceives(buildReceive(executorProperties)); + changeNotifyConfigs.add(notifyConfig); + } + resultMap.put(changeBuildKey, changeNotifyConfigs); + return resultMap; + } + + /** + * Retrieves the token for the given notification platform properties. + * + *

If the token is not explicitly set, the method returns the secret key as the fallback.

+ * + * @param platformProperties The platform properties from which to retrieve the token. + * @return The token or secret key associated with the given platform properties. + */ + private String getToken(NotifyPlatformProperties platformProperties) { + return StringUtil.isNotBlank(platformProperties.getToken()) ? platformProperties.getToken() : platformProperties.getSecretKey(); + } + + /** + * Builds the notification interval for the given executor properties. + * + *

This method first checks the executor's specific notify configuration. If not set, it falls back + * to the default executor configuration in the bootstrap properties.

+ * + * @param executorProperties The properties of the executor for which to build the notification interval. + * @return The notification interval in seconds. + */ + private int buildInterval(IExecutorProperties executorProperties) { + return Optional.ofNullable(executorProperties.getNotify()) + .map(ExecutorNotifyProperties::getInterval) + .orElse(Optional.ofNullable(BOOTSTRAP_CONFIG_PROPERTIES.getDefaultExecutor()) + .map(ExecutorProperties::getNotify) + .map(ExecutorNotifyProperties::getInterval) + .orElse(DEFAULT_INTERVAL)); + } + + /** + * Builds the notification recipients for the given executor properties. + * + *

This method first checks the executor's specific notify configuration. If not set, it falls back + * to the default executor configuration in the bootstrap properties.

+ * + * @param executorProperties The properties of the executor for which to build the notification recipients. + * @return A string containing the recipients of the notifications. + */ + private String buildReceive(IExecutorProperties executorProperties) { + return Optional.ofNullable(executorProperties.getNotify()) + .map(ExecutorNotifyProperties::getReceives) + .orElse(Optional.ofNullable(BOOTSTRAP_CONFIG_PROPERTIES.getDefaultExecutor()) + .map(ExecutorProperties::getNotify) + .map(ExecutorNotifyProperties::getReceives).orElse("")); + } + + /** + * Initializes the cache and lock mechanisms for the given notification configurations. + * + *

This method is primarily responsible for setting up alarm controls based on the notification + * configurations, ensuring that the appropriate cache and lock mechanisms are initialized for + * each thread pool and platform combination.

+ * + * @param buildSingleNotifyConfig A map containing the notification configurations that need cache and lock initialization. + */ + public void initCacheAndLock(Map> buildSingleNotifyConfig) { + buildSingleNotifyConfig.forEach( + (key, val) -> val.stream() + .filter(each -> Objects.equals("ALARM", each.getType())) + .forEach(each -> alarmControlHandler.initCacheAndLock(each.getTpId(), each.getPlatform(), each.getInterval()))); + } +} diff --git a/kernel/dynamic/mode/config/src/main/java/cn/hippo4j/threadpool/dynamic/mode/config/refresher/event/DynamicThreadPoolRefreshListener.java b/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/event/DynamicThreadPoolRefreshListener.java similarity index 50% rename from kernel/dynamic/mode/config/src/main/java/cn/hippo4j/threadpool/dynamic/mode/config/refresher/event/DynamicThreadPoolRefreshListener.java rename to agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/event/DynamicThreadPoolRefreshListener.java index 0cb87c96..836ab712 100644 --- a/kernel/dynamic/mode/config/src/main/java/cn/hippo4j/threadpool/dynamic/mode/config/refresher/event/DynamicThreadPoolRefreshListener.java +++ b/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/event/DynamicThreadPoolRefreshListener.java @@ -15,8 +15,11 @@ * limitations under the License. */ -package cn.hippo4j.threadpool.dynamic.mode.config.refresher.event; +package cn.hippo4j.agent.plugin.spring.common.event; +import cn.hippo4j.agent.core.util.CollectionUtil; +import cn.hippo4j.agent.plugin.spring.common.alarm.AgentModeNotifyConfigBuilder; +import cn.hippo4j.agent.plugin.spring.common.support.ThreadPoolCheckAlarmSupport; import cn.hippo4j.common.executor.ThreadPoolExecutorHolder; import cn.hippo4j.common.executor.ThreadPoolExecutorRegistry; import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum; @@ -29,10 +32,17 @@ import cn.hippo4j.common.logging.api.LogManager; import cn.hippo4j.common.model.executor.ExecutorProperties; import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil; import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties; +import cn.hippo4j.threadpool.message.api.NotifyConfigDTO; +import cn.hippo4j.threadpool.message.core.request.ChangeParameterNotifyRequest; +import cn.hippo4j.threadpool.message.core.service.GlobalNotifyAlarmManage; +import cn.hippo4j.threadpool.message.core.service.ThreadPoolBaseSendMessageService; +import cn.hippo4j.threadpool.message.core.service.ThreadPoolNotifyAlarm; import lombok.RequiredArgsConstructor; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -56,6 +66,10 @@ public class DynamicThreadPoolRefreshListener implements Observer> newDynamicThreadPoolNotifyMap = agentNotifyConfigBuilder.buildSingleNotifyConfig(executorProperties); + Map> notifyConfigs = threadPoolBaseSendMessageService.getNotifyConfigs(); + if (CollectionUtil.isNotEmpty(notifyConfigs)) { + for (Map.Entry> each : newDynamicThreadPoolNotifyMap.entrySet()) { + if (checkNotifyConfig) { + break; + } + List notifyConfigDTOS = notifyConfigs.get(each.getKey()); + for (NotifyConfigDTO notifyConfig : each.getValue()) { + if (!notifyConfigDTOS.contains(notifyConfig)) { + checkNotifyConfig = true; + break; + } + } + } + } + if (checkNotifyConfig) { + agentNotifyConfigBuilder.initCacheAndLock(newDynamicThreadPoolNotifyMap); + threadPoolBaseSendMessageService.putPlatform(newDynamicThreadPoolNotifyMap); + } + ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(executorProperties.getThreadPoolId()); + if (threadPoolNotifyAlarm != null) { + Boolean isAlarm = executorProperties.getAlarm(); + Integer activeAlarm = executorProperties.getActiveAlarm(); + Integer capacityAlarm = + executorProperties.getCapacityAlarm(); + if ((isAlarm != null && !Objects.equals(isAlarm, threadPoolNotifyAlarm.getAlarm())) || (activeAlarm != null && !Objects.equals(activeAlarm, + threadPoolNotifyAlarm.getActiveAlarm())) || (capacityAlarm != null && !Objects.equals(capacityAlarm, threadPoolNotifyAlarm.getCapacityAlarm()))) { + checkNotifyAlarm = true; + threadPoolNotifyAlarm.setAlarm(Optional.ofNullable(isAlarm).orElse(threadPoolNotifyAlarm.getAlarm())); + threadPoolNotifyAlarm.setActiveAlarm(Optional.ofNullable(activeAlarm).orElse(threadPoolNotifyAlarm.getActiveAlarm())); + threadPoolNotifyAlarm.setCapacityAlarm(Optional.ofNullable(capacityAlarm).orElse(threadPoolNotifyAlarm.getCapacityAlarm())); + } + } + if (checkNotifyConfig || checkNotifyAlarm) { + LOG.info("[{}] Dynamic thread pool notification property changes.", executorProperties.getThreadPoolId()); + } + } + /** * Check consistency. * @@ -132,16 +196,57 @@ public class DynamicThreadPoolRefreshListener implements Observer @@ -66,7 +86,8 @@ public class ThreadPoolCheckAlarmSupport { // Initialize the AlarmControlHandler and ThreadPoolBaseSendMessageService AlarmControlHandler alarmControlHandler = new AlarmControlHandler(); - ThreadPoolBaseSendMessageService threadPoolBaseSendMessageService = createThreadPoolBaseSendMessageService(alarmControlHandler); + threadPoolBaseSendMessageService = createThreadPoolBaseSendMessageService(alarmControlHandler); + threadPoolConfigChangeHandler = new DefaultThreadPoolConfigChangeHandler(threadPoolBaseSendMessageService); // Initialize the alarm platform information initializeSendMessageHandlers(threadPoolBaseSendMessageService, alarmControlHandler); @@ -89,6 +110,10 @@ public class ThreadPoolCheckAlarmSupport { EnvironmentProperties.itemId = BOOTSTRAP_CONFIG_PROPERTIES.getItemId(); EnvironmentProperties.applicationName = SpringBootConfig.Spring.Application.name; EnvironmentProperties.active = SpringBootConfig.Spring.Profiles.active; + ConfigurableEnvironment environment = ApplicationContextHolder.getBean(ConfigurableEnvironment.class); + InetUtilsProperties inetUtilsProperties = SpringPropertyBinder.bindProperties(environment, InetUtilsProperties.PREFIX, InetUtilsProperties.class); + InetUtils inetUtils = new InetUtils(inetUtilsProperties); + IdentifyUtil.generate(environment, inetUtils); } /** @@ -123,8 +148,9 @@ public class ThreadPoolCheckAlarmSupport { threadPoolBaseSendMessageService.getSendMessageHandlers().put(larkSendMessageHandler.getType(), larkSendMessageHandler); // Construct and register notification configurations - AgentModeNotifyConfigBuilder notifyConfigBuilder = new AgentModeNotifyConfigBuilder(alarmControlHandler); - Map> notifyConfigs = notifyConfigBuilder.buildNotify(); + // TODO : register notify config for web , null Can be replaced with tomcat, jetty, undertow, etc. implementation classes + agentNotifyConfigBuilder = new AgentModeNotifyConfigBuilder(alarmControlHandler, null); + Map> notifyConfigs = agentNotifyConfigBuilder.buildNotify(); threadPoolBaseSendMessageService.getNotifyConfigs().putAll(notifyConfigs); } }