feat:Agent Nacos dynamic refresh Initialize

pull/1572/head
Pan-YuJie 1 year ago
parent af56750356
commit 33c4609f86

@ -17,8 +17,8 @@
package cn.hippo4j.agent.core.conf;
import cn.hippo4j.agent.core.boot.AgentPackagePath;
import cn.hippo4j.common.boot.AgentPackageNotFoundException;
import cn.hippo4j.common.boot.AgentPackagePath;
import cn.hippo4j.common.conf.Config;
import cn.hippo4j.common.conf.ConfigNotFoundException;
import cn.hippo4j.common.logging.api.ILog;

@ -43,5 +43,10 @@
<artifactId>apollo-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>2.2.1</version>
</dependency>
</dependencies>
</project>

@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.agent.plugin.spring.boot.v2;
import cn.hippo4j.agent.plugin.spring.common.conf.SpringBootConfig;
import cn.hippo4j.common.executor.ThreadFactoryBuilder;
import cn.hippo4j.common.logging.api.ILog;
import cn.hippo4j.common.logging.api.LogManager;
import cn.hippo4j.threadpool.dynamic.mode.config.parser.ConfigParserHandler;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.refresher.AbstractConfigThreadPoolDynamicRefresh;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import org.springframework.boot.context.properties.bind.Bindable;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.boot.context.properties.source.ConfigurationPropertySource;
import org.springframework.boot.context.properties.source.MapConfigurationPropertySource;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import static cn.hippo4j.common.constant.Constants.DEFAULT_NAMESPACE_ID;
/**
* NacosDynamicThreadPoolChangeHandlerSpring2x is responsible for handling dynamic thread pool
* configuration changes in a Spring environment by listening to configuration updates from Nacos.
* <p>
* This class extends {@link AbstractConfigThreadPoolDynamicRefresh} and implements the logic
* to register a Nacos listener, handle configuration changes, and dynamically refresh the thread pool
* properties based on the new configuration.
* <p>
* The handler is specifically tailored for use with Spring 2.x and integrates with Hippo4j's
* dynamic thread pool management system.
*
*/
public class NacosDynamicThreadPoolChangeHandlerSpring2x extends AbstractConfigThreadPoolDynamicRefresh {
private static final ILog LOGGER = LogManager.getLogger(NacosDynamicThreadPoolChangeHandlerSpring2x.class);
/**
* Registers a listener with Nacos to monitor for changes in the thread pool configuration.
* <p>
* This method sets up the Nacos {@link ConfigService} with the server address and namespace
* from the Spring Boot configuration. It then adds a listener that will receive and process
* configuration updates, triggering a dynamic refresh of thread pool settings.
*/
@Override
public void registerListener() {
// Retrieve necessary configuration properties
String configFileType = SpringBootConfig.Spring.Dynamic.Thread_Pool.CONFIG_FILE_TYPE;
String serverAddr = SpringBootConfig.Spring.Dynamic.Thread_Pool.Nacos.SERVER_ADDR;
String dataId = SpringBootConfig.Spring.Dynamic.Thread_Pool.Nacos.DATA_ID;
String namespace = SpringBootConfig.Spring.Dynamic.Thread_Pool.Nacos.NAMESPACE.get(0);
namespace = namespace.equals(DEFAULT_NAMESPACE_ID) ? "" : namespace;
String group = SpringBootConfig.Spring.Dynamic.Thread_Pool.Nacos.GROUP;
try {
// Initialize Nacos ConfigService with the provided properties
Properties properties = new Properties();
properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
properties.put(PropertyKeyConst.NAMESPACE, namespace);
ConfigService configService = NacosFactory.createConfigService(properties);
// Define the listener to handle configuration changes
Listener configChangeListener = new Listener() {
@Override
public void receiveConfigInfo(String configInfo) {
LOGGER.debug("Received configuration: " + configInfo);
Map<String, Object> changeValueMap = new HashMap<>();
try {
// Parse the configuration and map the values to the appropriate keys
Map<Object, Object> configInfoMap = ConfigParserHandler.getInstance().parseConfig(configInfo, configFileType);
configInfoMap.forEach((key, value) -> {
if (key instanceof String) {
changeValueMap.put((String) key, value);
}
});
} catch (IOException e) {
LOGGER.error(e, "[Hippo4j-Agent] Dynamic thread pool refresher, Failed to resolve configuration. configFileType: {} configInfo: {} ", configFileType, configInfo);
}
// Trigger the dynamic refresh with the parsed configuration
dynamicRefresh(configFileType, configInfo, changeValueMap);
}
@Override
public Executor getExecutor() {
return new ScheduledThreadPoolExecutor(
1,
ThreadFactoryBuilder.builder().daemon(true).prefix("client.dynamic.refresh.agent").build());
}
};
// Add the listener to the Nacos ConfigService
configService.addListener(dataId, group, configChangeListener);
} catch (Exception e) {
LOGGER.error(e, "[Hippo4j-Agent] Dynamic thread pool refresher, add Nacos listener failure. namespace: {} data-id: {} group: {}", namespace, dataId, group);
}
LOGGER.info("[Hippo4j-Agent] Dynamic thread pool refresher, added Nacos listener successfully. namespace: {} data-id: {} group: {}", namespace, dataId, group);
}
/**
* Builds and binds the {@link BootstrapConfigProperties} from the given configuration map.
* <p>
* This method uses Spring's {@link Binder} to bind the configuration values to an instance
* of {@link BootstrapConfigProperties}, which can then be used to configure the thread pool
* dynamically.
*
* @param configInfo the configuration map containing properties to bind.
* @return the bound {@link BootstrapConfigProperties} instance.
*/
@Override
public BootstrapConfigProperties buildBootstrapProperties(Map<Object, Object> configInfo) {
BootstrapConfigProperties bindableBootstrapConfigProperties = new BootstrapConfigProperties();
ConfigurationPropertySource sources = new MapConfigurationPropertySource(configInfo);
Binder binder = new Binder(sources);
return binder.bind(BootstrapConfigProperties.PREFIX, Bindable.ofInstance(bindableBootstrapConfigProperties)).get();
}
}

@ -17,6 +17,7 @@
package cn.hippo4j.agent.plugin.spring.boot.v2.interceptor;
import cn.hippo4j.agent.plugin.spring.boot.v2.NacosDynamicThreadPoolChangeHandlerSpring2x;
import cn.hippo4j.common.logging.api.ILog;
import cn.hippo4j.common.logging.api.LogManager;
import cn.hippo4j.agent.core.plugin.interceptor.enhance.EnhancedInstance;
@ -55,7 +56,9 @@ public class EventPublishingStartedInterceptor implements InstanceMethodsAroundI
return ret;
}
SpringPropertiesLoader.loadSpringProperties(context.getEnvironment());
ThreadPoolDynamicRefresh dynamicRefresh = new DynamicThreadPoolChangeHandlerSpring2x();
// ThreadPoolDynamicRefresh dynamicRefresh = new DynamicThreadPoolChangeHandlerSpring2x();
// TODO Nacos配置
ThreadPoolDynamicRefresh dynamicRefresh = new NacosDynamicThreadPoolChangeHandlerSpring2x();
dynamicRefresh.registerListener();
AbstractSubjectCenter.register(AbstractSubjectCenter.SubjectType.THREAD_POOL_DYNAMIC_REFRESH,
new DynamicThreadPoolRefreshListener());

@ -52,6 +52,18 @@ public class SpringBootConfig {
public static List<String> NAMESPACE = Arrays.asList("application");
}
@SpringBootConfigNode(root = SpringBootConfig.class)
public static class Nacos {
public static String SERVER_ADDR = "localhost";
public static List<String> NAMESPACE = Arrays.asList("");
public static String DATA_ID = "";
public static String GROUP = "DEFAULT_GROUP";
}
/**
* Monitor
*/

@ -0,0 +1,68 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-agent-example</artifactId>
<version>${revision}</version>
</parent>
<artifactId>hippo4j-threadpool-agent-config-nacos-example</artifactId>
<properties>
<maven.deploy.skip>true</maven.deploy.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.boot</groupId>
<artifactId>nacos-config-spring-boot-starter</artifactId>
<version>0.2.12</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-context</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.example.agent.config.nacos;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.spring.context.annotation.config.EnableNacosConfig;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Agent config Nacos example application.
*/
@SpringBootApplication
@EnableNacosConfig
public class AgentConfigNacosExampleApplication {
public static void main(String[] args) throws NacosException {
SpringApplication.run(AgentConfigNacosExampleApplication.class, args);
}
}

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.example.agent.config.nacos;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Configuration
public class ThreadPoolConfiguration {
// -------------------------------------------------------------------------
// 未使用 Hippo4j原始定义线程池创建方式
// -------------------------------------------------------------------------
@Bean
public ThreadPoolExecutor runMessageSendTaskExecutor() {
LinkedBlockingQueue<Runnable> linkedBlockingQueue = new LinkedBlockingQueue<>(1024);
return new ThreadPoolExecutor(
1,
10,
1024,
TimeUnit.SECONDS,
linkedBlockingQueue);
}
// -------------------------------------------------------------------------
// 演示 Agent 模式修改线程池
// -------------------------------------------------------------------------
public static final ThreadPoolExecutor RUN_MESSAGE_SEND_TASK_EXECUTOR = new ThreadPoolExecutor(
1,
10,
1024,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1024));
}

@ -0,0 +1,59 @@
server.port=8092
server.servlet.context-path=/example
nacos.config.auto-refresh=true
nacos.config.bootstrap.enable=true
# The following parameters are used for testing
nacos.config.server-addr=127.0.0.1:8848
nacos.config.data-id=dynamic-threadpool-example-config
spring.profiles.active=dev
spring.application.name=hippo4j-config-nacos-spring-boot-starter-example
management.metrics.export.prometheus.enabled=true
management.server.port=29998
management.endpoints.web.exposure.include=*
spring.dynamic.thread-pool.enable=true
spring.dynamic.thread-pool.banner=true
spring.dynamic.thread-pool.check-state-interval=3
#spring.dynamic.thread-pool.monitor.enable=true
#spring.dynamic.thread-pool.monitor.collect-types=micrometer
#spring.dynamic.thread-pool.monitor.thread-pool-types=dynamic,web
#spring.dynamic.thread-pool.monitor.initial-delay=10000
#spring.dynamic.thread-pool.monitor.collect-interval=5000
#spring.dynamic.thread-pool.notify-platforms[0].platform=WECHAT
#spring.dynamic.thread-pool.notify-platforms[0].token=ac0426a5-c712-474c-9bff-72b8b8f5caff
#spring.dynamic.thread-pool.notify-platforms[1].platform=DING
#spring.dynamic.thread-pool.notify-platforms[1].token=56417ebba6a27ca352f0de77a2ae9da66d01f39610b5ee8a6033c60ef9071c55
#spring.dynamic.thread-pool.notify-platforms[2].platform=LARK
#spring.dynamic.thread-pool.notify-platforms[2].token=2cbf2808-3839-4c26-a04d-fd201dd51f9e
spring.dynamic.thread-pool.nacos.server-addr=127.0.0.1:8848
spring.dynamic.thread-pool.nacos.data-id=dynamic-threadpool-example-config
spring.dynamic.thread-pool.nacos.group=DEFAULT_GROUP
spring.dynamic.thread-pool.nacos.namespace=public
spring.dynamic.thread-pool.config-file-type=properties
spring.dynamic.thread-pool.executors[0].thread-name-prefix = DynamicThreadPoolConfig#FIELD1
spring.dynamic.thread-pool.executors[0].core-pool-size = 2
spring.dynamic.thread-pool.executors[0].thread-pool-id = cn.hippo4j.example.agent.config.nacos.ThreadPoolConfiguration#RUN_MESSAGE_SEND_TASK_EXECUTOR
spring.dynamic.thread-pool.executors[0].maximum-pool-size = 20
spring.dynamic.thread-pool.executors[0].queue-capacity = 1024
spring.dynamic.thread-pool.executors[0].blocking-queue = ResizableCapacityLinkedBlockingQueue
spring.dynamic.thread-pool.executors[0].execute-time-out = 800
spring.dynamic.thread-pool.executors[0].rejected-handler = AbortPolicy
spring.dynamic.thread-pool.executors[0].keep-alive-time = 6691
spring.dynamic.thread-pool.executors[0].allow-core-thread-time-out = true
spring.dynamic.thread-pool.executors[0].alarm = true
spring.dynamic.thread-pool.executors[0].active-alarm = 80
spring.dynamic.thread-pool.executors[0].capacity-alarm = 80
spring.dynamic.thread-pool.executors[0].notify.interval = 8
spring.dynamic.thread-pool.executors[0].notify.receives = nobodyiam
spring.dynamic.thread-pool.executors[1].thread-pool-id = runMessageSendTaskExecutor
spring.dynamic.thread-pool.executors[1].thread-name-prefix = runMessageSendTaskExecutor
spring.dynamic.thread-pool.executors[1].core-pool-size = 3
spring.dynamic.thread-pool.executors[1].maximum-pool-size = 4
spring.dynamic.thread-pool.executors[1].queue-capacity = 1024
spring.dynamic.thread-pool.executors[1].blocking-queue = ResizableCapacityLinkedBlockingQueue
spring.dynamic.thread-pool.executors[1].execute-time-out = 800
spring.dynamic.thread-pool.executors[1].rejected-handler = AbortPolicy
spring.dynamic.thread-pool.executors[1].keep-alive-time = 6691
spring.dynamic.thread-pool.executors[1].allow-core-thread-time-out = true

@ -18,5 +18,6 @@
<modules>
<module>config-apollo</module>
<module>config-nacos</module>
</modules>
</project>
Loading…
Cancel
Save