Feature : Agent mode supports dynamic thread pool changes, alarm and monitoring functions without modifying code, and adapts to Nacos and Apollo Configuration Centers (#1572)

* fix:Fix send threadPool change notification message log

* feat:Agent Nacos dynamic refresh Initialize

* feat:Agent dynamic alarm Initialize

* feat:Agent dynamic monitor Initialize

* refactor:Agent Listener logic, add configuration refreshes platform push, and carries the unique application ID

* feat:Apollo Configuration Center Plugin Logic Adaptation

* feat:Completed the implementation of Nacos Configuration Center plugin and Nacos,Apollo plugins adapted to Spring 1.x , 2.x environment

* fix: Fixed jar package mounting and startup problem in Agent mode
pull/1557/merge
Pan_Yujie 1 month ago committed by GitHub
parent 9bdb8102ea
commit c521a97388
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -17,17 +17,17 @@
package cn.hippo4j.agent.core.conf;
import cn.hippo4j.agent.core.boot.AgentPackagePath;
import cn.hippo4j.agent.core.util.PropertyPlaceholderHelper;
import cn.hippo4j.agent.core.util.StringUtil;
import cn.hippo4j.common.boot.AgentPackageNotFoundException;
import cn.hippo4j.common.boot.AgentPackagePath;
import cn.hippo4j.common.conf.Config;
import cn.hippo4j.common.conf.ConfigNotFoundException;
import cn.hippo4j.common.logging.api.ILog;
import cn.hippo4j.common.logging.api.LogManager;
import cn.hippo4j.common.logging.core.JsonLogResolver;
import cn.hippo4j.common.logging.core.PatternLogResolver;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.common.toolkit.agent.ConfigInitializer;
import cn.hippo4j.common.toolkit.agent.PropertyPlaceholderHelper;
import java.io.File;
import java.io.FileInputStream;

@ -16,6 +16,16 @@
</properties>
<dependencies>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-agent-spring-plugin-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-dynamic-mode-config</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-client</artifactId>

@ -15,11 +15,12 @@
* limitations under the License.
*/
package cn.hippo4j.agent.plugin.spring.boot.v2;
package cn.hippo4j.agent.plugin.apollo;
import cn.hippo4j.agent.plugin.spring.common.conf.SpringBootConfig;
import cn.hippo4j.agent.plugin.spring.common.toolkit.SpringPropertyBinder;
import cn.hippo4j.common.logging.api.ILog;
import cn.hippo4j.common.logging.api.LogManager;
import cn.hippo4j.agent.plugin.spring.common.conf.SpringBootConfig;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.refresher.AbstractConfigThreadPoolDynamicRefresh;
import com.ctrip.framework.apollo.Config;
@ -28,10 +29,7 @@ import com.ctrip.framework.apollo.ConfigFile;
import com.ctrip.framework.apollo.ConfigService;
import com.ctrip.framework.apollo.core.enums.ConfigFileFormat;
import com.ctrip.framework.apollo.model.ConfigChange;
import org.springframework.boot.context.properties.bind.Bindable;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.boot.context.properties.source.ConfigurationPropertySource;
import org.springframework.boot.context.properties.source.MapConfigurationPropertySource;
import java.util.HashMap;
import java.util.List;
@ -40,12 +38,15 @@ import java.util.Map;
import static cn.hippo4j.agent.core.conf.Constants.SPRING_BOOT_CONFIG_PREFIX;
/**
* Dynamic thread pool change handler spring 2x
* Dynamic thread pool change handler
*/
public class DynamicThreadPoolChangeHandlerSpring2x extends AbstractConfigThreadPoolDynamicRefresh {
public class ApolloDynamicThreadPoolChangeHandler extends AbstractConfigThreadPoolDynamicRefresh {
private static ILog LOGGER = LogManager.getLogger(DynamicThreadPoolChangeHandlerSpring2x.class);
private static final ILog LOGGER = LogManager.getLogger(ApolloDynamicThreadPoolChangeHandler.class);
/**
* Registers a listener with Apollo to monitor for changes in the thread pool configuration.
*/
@Override
public void registerListener() {
List<String> apolloNamespaces = SpringBootConfig.Spring.Dynamic.Thread_Pool.Apollo.NAMESPACE;
@ -68,11 +69,19 @@ public class DynamicThreadPoolChangeHandlerSpring2x extends AbstractConfigThread
LOGGER.info("[Hippo4j-Agent] Dynamic thread pool refresher, add apollo listener success. namespace: {}", namespace);
}
/**
* Builds and binds the {@link BootstrapConfigProperties} from the given configuration map.
* <p>
* This method uses Spring's {@link Binder} to bind the configuration values to an instance
* of {@link BootstrapConfigProperties}, which can then be used to configure the thread pool
* dynamically.
*
* @param configInfo the configuration map containing properties to bind.
* @return the bound {@link BootstrapConfigProperties} instance.
*/
@Override
public BootstrapConfigProperties buildBootstrapProperties(Map<Object, Object> configInfo) {
BootstrapConfigProperties bindableBootstrapConfigProperties = new BootstrapConfigProperties();
ConfigurationPropertySource sources = new MapConfigurationPropertySource(configInfo);
Binder binder = new Binder(sources);
return binder.bind(BootstrapConfigProperties.PREFIX, Bindable.ofInstance(bindableBootstrapConfigProperties)).get();
BootstrapConfigProperties bindableBootstrapConfigProperties = SpringPropertyBinder.bindProperties(configInfo, BootstrapConfigProperties.PREFIX, BootstrapConfigProperties.class);
return bindableBootstrapConfigProperties;
}
}

@ -34,7 +34,7 @@ public class ApolloInstrumentation extends ClassInstanceMethodsEnhancePluginDefi
private static final String ENHANCE_CLASS = "com.ctrip.framework.apollo.internals.DefaultConfig";
private static final String CONSTRUCTOR_INTERCEPT_CLASS = "cn.hippo4j.agent.plugin.apollo.interceptor.DefaultConfigConstructorInterceptor";
private static final String CONSTRUCTOR_INTERCEPT_CLASS = "cn.hippo4j.agent.plugin.apollo.interceptor.ApolloConfigConstructorInterceptor";
@Override
protected ClassMatch enhanceClass() {

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.agent.plugin.apollo.interceptor;
import cn.hippo4j.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import cn.hippo4j.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
import cn.hippo4j.agent.plugin.apollo.listeners.ApolloConfigPropertiesLoaderCompletedListener;
import cn.hippo4j.common.extension.design.AbstractSubjectCenter;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Default config constructor interceptor
*/
public class ApolloConfigConstructorInterceptor implements InstanceConstructorInterceptor {
private static final AtomicBoolean isExecuted = new AtomicBoolean(false);
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) throws Throwable {
// This logic will only be executed once
if (isExecuted.compareAndSet(false, true)) {
// The Apollo plugin triggers before the Spring configuration plug-in.
// This means that when the Apollo plug-in executes, Spring's Environment is not yet ready,
// so the configuration cannot be read
// After listening to the AGENT_SPRING_PROPERTIES_LOADER_COMPLETED event, register the listener for Apollo
AbstractSubjectCenter.register(AbstractSubjectCenter.SubjectType.AGENT_SPRING_PROPERTIES_LOADER_COMPLETED,
new ApolloConfigPropertiesLoaderCompletedListener());
}
}
}

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.agent.plugin.apollo.listeners;
import cn.hippo4j.agent.plugin.apollo.ApolloDynamicThreadPoolChangeHandler;
import cn.hippo4j.common.extension.design.Observer;
import cn.hippo4j.common.extension.design.ObserverMessage;
import cn.hippo4j.threadpool.dynamic.api.ThreadPoolDynamicRefresh;
/**
* Apollo Config Properties Loader Completed Listener
*/
public class ApolloConfigPropertiesLoaderCompletedListener implements Observer<String> {
@Override
public void accept(ObserverMessage<String> observerMessage) {
ThreadPoolDynamicRefresh dynamicRefresh = new ApolloDynamicThreadPoolChangeHandler();
dynamicRefresh.registerListener();
}
}

@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-agent-plugin</artifactId>
<version>${revision}</version>
</parent>
<artifactId>hippo4j-agent-nacos-plugin</artifactId>
<properties>
<nacos.version>2.2.1</nacos.version>
</properties>
<dependencies>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-agent-spring-plugin-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-dynamic-mode-config</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>${nacos.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.agent.plugin.nacos;
import cn.hippo4j.agent.plugin.spring.common.conf.NacosCloudConfig;
import cn.hippo4j.agent.plugin.spring.common.conf.NacosConfig;
import cn.hippo4j.agent.plugin.spring.common.conf.SpringBootConfig;
import cn.hippo4j.agent.plugin.spring.common.toolkit.SpringPropertyBinder;
import cn.hippo4j.common.executor.ThreadFactoryBuilder;
import cn.hippo4j.common.logging.api.ILog;
import cn.hippo4j.common.logging.api.LogManager;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.threadpool.dynamic.mode.config.parser.ConfigParserHandler;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.refresher.AbstractConfigThreadPoolDynamicRefresh;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import org.springframework.boot.context.properties.bind.Binder;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import static cn.hippo4j.common.constant.Constants.DEFAULT_NAMESPACE_ID;
/**
* NacosDynamicThreadPoolChangeHandler is responsible for handling dynamic thread pool
* configuration changes in a Spring environment by listening to configuration updates from Nacos.
* <p>
* This class extends {@link AbstractConfigThreadPoolDynamicRefresh} and implements the logic
* to register a Nacos listener, handle configuration changes, and dynamically refresh the thread pool
* properties based on the new configuration.
* <p>
*/
public class NacosDynamicThreadPoolChangeHandler extends AbstractConfigThreadPoolDynamicRefresh {
private static final ILog LOGGER = LogManager.getLogger(NacosDynamicThreadPoolChangeHandler.class);
/**
* Registers a listener with Nacos to monitor for changes in the thread pool configuration.
* <p>
* This method sets up the Nacos {@link ConfigService} with the server address and namespace
* from the Spring Boot configuration. It then adds a listener that will receive and process
* configuration updates, triggering a dynamic refresh of thread pool settings.
*/
@Override
public void registerListener() {
// Retrieve necessary configuration properties
String configFileType = SpringBootConfig.Spring.Dynamic.Thread_Pool.CONFIG_FILE_TYPE;
String serverAddr = Optional.ofNullable(NacosCloudConfig.Spring.Cloud.Nacos.Config.SERVER_ADDR).filter(s -> !StringUtil.isEmpty(s))
.orElse(Optional.ofNullable(NacosConfig.Nacos.Config.SERVER_ADDR).filter(s -> !StringUtil.isEmpty(s))
.orElse(""));
if (StringUtil.isEmpty(serverAddr)) {
LOGGER.error("[Hippo4j-Agent] add Nacos listener failure. Nacos Registry address not configured");
return;
}
String dataId = SpringBootConfig.Spring.Dynamic.Thread_Pool.Nacos.DATA_ID;
String group = SpringBootConfig.Spring.Dynamic.Thread_Pool.Nacos.GROUP;
String namespace = SpringBootConfig.Spring.Dynamic.Thread_Pool.Nacos.NAMESPACE.get(0);
namespace = namespace.equals(DEFAULT_NAMESPACE_ID) ? "" : namespace;
try {
// Initialize Nacos ConfigService with the provided properties
Properties properties = new Properties();
properties.put(PropertyKeyConst.SERVER_ADDR, serverAddr);
properties.put(PropertyKeyConst.NAMESPACE, namespace);
ConfigService configService = NacosFactory.createConfigService(properties);
// Define the listener to handle configuration changes
Listener configChangeListener = new Listener() {
@Override
public void receiveConfigInfo(String configInfo) {
LOGGER.debug("Received configuration: " + configInfo);
Map<String, Object> changeValueMap = new HashMap<>();
try {
// Parse the configuration and map the values to the appropriate keys
Map<Object, Object> configInfoMap = ConfigParserHandler.getInstance().parseConfig(configInfo, configFileType);
configInfoMap.forEach((key, value) -> {
if (key instanceof String) {
changeValueMap.put((String) key, value);
}
});
} catch (IOException e) {
LOGGER.error(e, "[Hippo4j-Agent] Dynamic thread pool refresher, Failed to resolve configuration. configFileType: {} configInfo: {} ", configFileType, configInfo);
}
// Trigger the dynamic refresh with the parsed configuration
dynamicRefresh(configFileType, configInfo, changeValueMap);
}
@Override
public Executor getExecutor() {
return new ScheduledThreadPoolExecutor(1, ThreadFactoryBuilder.builder().daemon(true).prefix("client.dynamic.refresh.agent").build());
}
};
// Add the listener to the Nacos ConfigService
configService.addListener(dataId, group, configChangeListener);
LOGGER.info("[Hippo4j-Agent] Dynamic thread pool refresher, add Nacos listener successfully. serverAddr: {} namespace: {} data-id: {} group: {}", serverAddr, namespace, dataId, group);
} catch (Exception e) {
LOGGER.error(e, "[Hippo4j-Agent] Dynamic thread pool refresher, add Nacos listener failure. serverAddr: {} namespace: {} data-id: {} group: {}", serverAddr, namespace, dataId, group);
}
}
/**
* Builds and binds the {@link BootstrapConfigProperties} from the given configuration map.
* <p>
* This method uses Spring's {@link Binder} to bind the configuration values to an instance
* of {@link BootstrapConfigProperties}, which can then be used to configure the thread pool
* dynamically.
*
* @param configInfo the configuration map containing properties to bind.
* @return the bound {@link BootstrapConfigProperties} instance.
*/
@Override
public BootstrapConfigProperties buildBootstrapProperties(Map<Object, Object> configInfo) {
BootstrapConfigProperties bindableBootstrapConfigProperties = SpringPropertyBinder.bindProperties(configInfo, BootstrapConfigProperties.PREFIX, BootstrapConfigProperties.class);
return bindableBootstrapConfigProperties;
}
}

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.agent.plugin.nacos.boot;
import cn.hippo4j.agent.core.boot.BootService;
import cn.hippo4j.agent.core.boot.DefaultImplementor;
/**
* Nacos plugin boot service
*/
@DefaultImplementor
public class NacosPluginBootService implements BootService {
@Override
public void prepare() throws Throwable {
}
@Override
public void boot() throws Throwable {
}
@Override
public void onComplete() throws Throwable {
}
@Override
public void shutdown() throws Throwable {
}
}

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.agent.plugin.nacos.define;
import cn.hippo4j.agent.core.plugin.WitnessMethod;
import cn.hippo4j.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import cn.hippo4j.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import cn.hippo4j.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import cn.hippo4j.agent.core.plugin.match.ClassMatch;
import cn.hippo4j.agent.core.plugin.match.NameMatch;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import java.util.Collections;
import java.util.List;
import static net.bytebuddy.matcher.ElementMatchers.named;
public class NacosCloudAdapterInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "org.springframework.cloud.bootstrap.config.PropertySourceBootstrapConfiguration";
private static final String INSTANCE_METHODS_INTERCEPT_CLASS = "cn.hippo4j.agent.plugin.nacos.interceptor.NacosCloudAdapterConfigInstanceMethodInterceptor";
@Override
protected ClassMatch enhanceClass() {
return NameMatch.byName(ENHANCE_CLASS);
}
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[0];
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[]{new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("initialize");
}
@Override
public String getMethodsInterceptor() {
return INSTANCE_METHODS_INTERCEPT_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}};
}
@Override
protected List<WitnessMethod> witnessMethods() {
return Collections.singletonList(new WitnessMethod("com.alibaba.cloud.nacos.client.NacosPropertySourceLocator", named("locate")));
}
}

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.agent.plugin.nacos.define;
import cn.hippo4j.agent.core.plugin.interceptor.ConstructorInterceptPoint;
import cn.hippo4j.agent.core.plugin.interceptor.InstanceMethodsInterceptPoint;
import cn.hippo4j.agent.core.plugin.interceptor.enhance.ClassInstanceMethodsEnhancePluginDefine;
import cn.hippo4j.agent.core.plugin.match.ClassMatch;
import cn.hippo4j.agent.core.plugin.match.NameMatch;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.matcher.ElementMatcher;
import static net.bytebuddy.matcher.ElementMatchers.any;
/**
* Nacos instrumentation
*/
public class NacosInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "com.alibaba.nacos.client.config.NacosConfigService";
private static final String CONSTRUCTOR_INTERCEPT_CLASS = "cn.hippo4j.agent.plugin.nacos.interceptor.NacosConfigConstructorInterceptor";
@Override
protected ClassMatch enhanceClass() {
return NameMatch.byName(ENHANCE_CLASS);
}
@Override
public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
return new ConstructorInterceptPoint[]{
new ConstructorInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getConstructorMatcher() {
return any();
}
@Override
public String getConstructorInterceptor() {
return CONSTRUCTOR_INTERCEPT_CLASS;
}
}
};
}
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[0];
}
}

@ -15,31 +15,26 @@
* limitations under the License.
*/
package cn.hippo4j.agent.plugin.spring.boot.v1.interceptor;
package cn.hippo4j.agent.plugin.nacos.interceptor;
import cn.hippo4j.agent.adapter.dubbo.DubboThreadPoolAdapter;
import cn.hippo4j.common.logging.api.ILog;
import cn.hippo4j.common.logging.api.LogManager;
import cn.hippo4j.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import cn.hippo4j.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import cn.hippo4j.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import cn.hippo4j.agent.plugin.spring.boot.v1.DynamicThreadPoolChangeHandlerSpring1x;
import cn.hippo4j.agent.plugin.spring.common.support.SpringPropertiesLoader;
import cn.hippo4j.agent.plugin.spring.common.support.SpringThreadPoolRegisterSupport;
import cn.hippo4j.threadpool.dynamic.api.ThreadPoolDynamicRefresh;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.hippo4j.core.config.ApplicationContextHolder;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.env.ConfigurableEnvironment;
import java.lang.reflect.Method;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Event publishing finished interceptor
* Nacos Cloud config constructor interceptor
*/
public class EventPublishingFinishedInterceptor implements InstanceMethodsAroundInterceptor {
public class NacosCloudAdapterConfigInstanceMethodInterceptor implements InstanceMethodsAroundInterceptor {
private static final ILog FILE_LOGGER = LogManager.getLogger(EventPublishingFinishedInterceptor.class);
private static final Logger LOGGER = LoggerFactory.getLogger(EventPublishingFinishedInterceptor.class);
private static final AtomicBoolean isExecuted = new AtomicBoolean(false);
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
@ -48,16 +43,18 @@ public class EventPublishingFinishedInterceptor implements InstanceMethodsAround
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
ConfigurableApplicationContext context = (ConfigurableApplicationContext) allArguments[0];
if (context.getParent() != null) {
// After the child container is started, the thread pool registration will be carried out
SpringThreadPoolRegisterSupport.registerThreadPoolInstances(context);
return ret;
// This logic will only be executed once
if (isExecuted.compareAndSet(false, true)) {
// Get the configurable Application Context
ConfigurableApplicationContext configurableApplicationContext = (ConfigurableApplicationContext) allArguments[0];
ConfigurableEnvironment environment = configurableApplicationContext.getEnvironment();
// Remote Nacos configuration swiped into SpringPropertiesLoader
SpringPropertiesLoader.loadSpringProperties(environment);
// Refresh thread pool instances through configuration
SpringThreadPoolRegisterSupport.registerThreadPoolInstances(ApplicationContextHolder.getInstance());
}
SpringPropertiesLoader.loadSpringProperties(context.getEnvironment());
ThreadPoolDynamicRefresh dynamicRefreshSpring1x = new DynamicThreadPoolChangeHandlerSpring1x(context);
dynamicRefreshSpring1x.registerListener();
DubboThreadPoolAdapter.registerExecutors();
return ret;
}

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.agent.plugin.nacos.interceptor;
import cn.hippo4j.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import cn.hippo4j.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
import cn.hippo4j.agent.plugin.nacos.NacosDynamicThreadPoolChangeHandler;
import cn.hippo4j.agent.plugin.nacos.listeners.NacosConfigPropertiesLoaderCompletedListener;
import cn.hippo4j.agent.plugin.spring.common.support.SpringPropertiesLoader;
import cn.hippo4j.common.extension.design.AbstractSubjectCenter;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Nacos config constructor interceptor
*/
public class NacosConfigConstructorInterceptor implements InstanceConstructorInterceptor {
private static final AtomicBoolean isExecuted = new AtomicBoolean(false);
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) throws Throwable {
// This logic will only be executed once
if (isExecuted.compareAndSet(false, true)) {
// Determine whether SpringPropertiesLoader is initialized
AtomicBoolean active = SpringPropertiesLoader.getActive();
// For Nacos-Cloud, the SpringPropertiesLoader environment initialization is triggered first, and then the logic to register listeners is triggered
// For Nacos-Boot, the listener is registered first, and the SpringPropertiesLoader environment is initialized
if (Boolean.TRUE.equals(active.get())) {
new NacosDynamicThreadPoolChangeHandler().registerListener();
return;
}
// The Nacos plugin triggers before the Spring configuration plug-in.
// This means that when the Apollo plug-in executes, Spring's Environment is not yet ready,
// so the configuration cannot be read
// After listening to the AGENT_SPRING_PROPERTIES_LOADER_COMPLETED event, register the listener for Nacos
AbstractSubjectCenter.register(AbstractSubjectCenter.SubjectType.AGENT_SPRING_PROPERTIES_LOADER_COMPLETED, new NacosConfigPropertiesLoaderCompletedListener());
}
}
}

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.agent.plugin.nacos.listeners;
import cn.hippo4j.agent.plugin.nacos.NacosDynamicThreadPoolChangeHandler;
import cn.hippo4j.common.extension.design.Observer;
import cn.hippo4j.common.extension.design.ObserverMessage;
import cn.hippo4j.threadpool.dynamic.api.ThreadPoolDynamicRefresh;
/**
* Nacos Config Properties Loader Completed Listener
*/
public class NacosConfigPropertiesLoaderCompletedListener implements Observer<String> {
@Override
public void accept(ObserverMessage<String> observerMessage) {
ThreadPoolDynamicRefresh dynamicRefresh = new NacosDynamicThreadPoolChangeHandler();
dynamicRefresh.registerListener();
}
}

@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cn.hippo4j.agent.plugin.nacos.boot.NacosPluginBootService

@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
nacos-plugin=cn.hippo4j.agent.plugin.nacos.define.NacosInstrumentation
nacos-cloud-adapter-plugin=cn.hippo4j.agent.plugin.nacos.define.NacosCloudAdapterInstrumentation

@ -16,6 +16,7 @@
<module>threadpool-plugin</module>
<module>adapter-plugins</module>
<module>apollo-plugin</module>
<module>nacos-plugin</module>
</modules>
<properties>

@ -29,13 +29,6 @@
</dependencyManagement>
<dependencies>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-agent-spring-plugin-common</artifactId>
<scope>provided</scope>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
@ -45,9 +38,8 @@
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-config-spring-boot-1x-starter</artifactId>
<artifactId>hippo4j-agent-spring-plugin-common</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>

@ -1,71 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.agent.plugin.spring.boot.v1;
import cn.hippo4j.common.toolkit.MapUtil;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.refresher.AbstractConfigThreadPoolDynamicRefresh;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.PropertyValues;
import org.springframework.beans.support.ResourceEditorRegistrar;
import org.springframework.boot.bind.CustomPropertyNamePatternsMatcher;
import org.springframework.boot.bind.RelaxedDataBinder;
import org.springframework.boot.bind.RelaxedNames;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.env.MapPropertySource;
import org.springframework.core.env.MutablePropertySources;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import static cn.hippo4j.config.springboot1x.starter.refresher.SpringBoot1xBootstrapConfigPropertiesBinderAdapter.getNames;
/**
* Dynamic thread pool change handler spring 1x
*/
@RequiredArgsConstructor
public class DynamicThreadPoolChangeHandlerSpring1x extends AbstractConfigThreadPoolDynamicRefresh {
private final ConfigurableApplicationContext applicationContext;
@Override
public BootstrapConfigProperties buildBootstrapProperties(Map<Object, Object> configInfo) {
BootstrapConfigProperties bindableCoreProperties = new BootstrapConfigProperties();
if (MapUtil.isEmpty(configInfo)) {
return bindableCoreProperties;
}
RelaxedNames relaxedNames = new RelaxedNames(BootstrapConfigProperties.PREFIX);
Set<String> names = getNames(bindableCoreProperties, relaxedNames);
Map<String, Object> stringConfigInfo = new HashMap<>(configInfo.size());
configInfo.forEach((key, value) -> stringConfigInfo.put(key.toString(), value));
MapPropertySource test = new MapPropertySource("Hippo4j", stringConfigInfo);
MutablePropertySources propertySources = new MutablePropertySources();
propertySources.addFirst(test);
PropertyValues propertyValues = CustomPropertyNamePatternsMatcher.getPropertySourcesPropertyValues(names, propertySources);
RelaxedDataBinder dataBinder = new RelaxedDataBinder(bindableCoreProperties, BootstrapConfigProperties.PREFIX);
dataBinder.setAutoGrowCollectionLimit(Integer.MAX_VALUE);
dataBinder.setIgnoreNestedProperties(false);
dataBinder.setIgnoreInvalidFields(false);
dataBinder.setIgnoreUnknownFields(true);
ResourceEditorRegistrar resourceEditorRegistrar = new ResourceEditorRegistrar(applicationContext, applicationContext.getEnvironment());
resourceEditorRegistrar.registerCustomEditors(dataBinder);
dataBinder.bind(propertyValues);
return bindableCoreProperties;
}
}

@ -28,18 +28,17 @@ import net.bytebuddy.matcher.ElementMatcher;
import java.util.Collections;
import java.util.List;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static cn.hippo4j.agent.core.plugin.match.NameMatch.byName;
import static net.bytebuddy.matcher.ElementMatchers.named;
/**
* Event publishing run listener instrumentation
* Application Context Refresh instrumentation
*/
public class EventPublishingRunListenerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
public class ApplicationContextInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {
private static final String ENHANCE_CLASS = "org.springframework.boot.context.event.EventPublishingRunListener";
private static final String ENHANCE_CLASS = "org.springframework.context.support.AbstractApplicationContext";
private static final String EVENT_PUBLISHING_FINISHED_INTERCEPTOR = "cn.hippo4j.agent.plugin.spring.boot.v1.interceptor.EventPublishingFinishedInterceptor";
private static final String EVENT_PUBLISHING_ENVIRONMENT_PREPARED_INTERCEPTOR = "cn.hippo4j.agent.plugin.spring.common.interceptor.EventPublishingRunListenerEnvironmentPreparedInterceptor";
private static final String APPLICATION_CONTEXT_REFRESH_INTERCEPTOR = "cn.hippo4j.agent.plugin.spring.boot.v1.interceptor.ApplicationContextInterceptor";
@Override
protected ClassMatch enhanceClass() {
@ -53,47 +52,27 @@ public class EventPublishingRunListenerInstrumentation extends ClassInstanceMeth
@Override
public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
return new InstanceMethodsInterceptPoint[]{
new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("finished");
}
@Override
public String getMethodsInterceptor() {
return EVENT_PUBLISHING_FINISHED_INTERCEPTOR;
}
@Override
public boolean isOverrideArgs() {
return false;
}
},
new InstanceMethodsInterceptPoint() {
return new InstanceMethodsInterceptPoint[]{new InstanceMethodsInterceptPoint() {
@Override
public ElementMatcher<MethodDescription> getMethodsMatcher() {
return named("environmentPrepared");
return named("refresh");
}
@Override
public String getMethodsInterceptor() {
return EVENT_PUBLISHING_ENVIRONMENT_PREPARED_INTERCEPTOR;
return APPLICATION_CONTEXT_REFRESH_INTERCEPTOR;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
}};
}
@Override
protected List<WitnessMethod> witnessMethods() {
return Collections.singletonList(new WitnessMethod("org.springframework.boot.context.event.EventPublishingRunListener",
named("finished")));
return Collections.singletonList(new WitnessMethod("org.springframework.boot.context.event.EventPublishingRunListener", named("finished")));
}
}

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.agent.plugin.spring.boot.v1.interceptor;
import cn.hippo4j.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import cn.hippo4j.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import cn.hippo4j.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import cn.hippo4j.agent.plugin.spring.common.event.DynamicThreadPoolRefreshListener;
import cn.hippo4j.agent.plugin.spring.common.support.SpringPropertiesLoader;
import cn.hippo4j.agent.plugin.spring.common.support.SpringThreadPoolRegisterSupport;
import cn.hippo4j.common.extension.design.AbstractSubjectCenter;
import cn.hippo4j.core.config.ApplicationContextHolder;
import org.springframework.context.ApplicationListener;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.event.ContextRefreshedEvent;
import java.lang.reflect.Method;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Application Context Refresh interceptor
*/
public class ApplicationContextInterceptor implements InstanceMethodsAroundInterceptor {
private static final AtomicBoolean isExecuted = new AtomicBoolean(false);
@Override
public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
}
@Override
public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret) throws Throwable {
// Since the refresh() method is a method of the AbstractApplicationContext class,
// the AbstractApplicationContext itself is an implementation class of the ApplicationContext.
// Therefore, can treat the class instance itself as an ApplicationContext object.
ConfigurableApplicationContext context = (ConfigurableApplicationContext) objInst;
if (context.getParent() != null) {
// After the child container is started, the thread pool registration will be carried out
// IDEA's runtime environment or debugging mechanisms make context refresh speeds different.
// Ensure that thread pool registration logic is executed only after the context is fully started
if (context.isActive()) {
SpringThreadPoolRegisterSupport.registerThreadPoolInstances(context);
return ret;
}
// However, the packaged JAR runtime may refresh the context faster
// resulting in the context not being refreshed yet when registerThreadPoolInstances is called
// Register listener to handle the registration after the context has been fully refreshed
context.addApplicationListener((ApplicationListener<ContextRefreshedEvent>) event -> {
SpringThreadPoolRegisterSupport.registerThreadPoolInstances(event.getApplicationContext());
});
return ret;
}
// This logic will only be executed once
if (isExecuted.compareAndSet(false, true)) {
ApplicationContextHolder contextHolder = new ApplicationContextHolder();
contextHolder.setApplicationContext(context);
SpringPropertiesLoader.loadSpringProperties(context.getEnvironment());
AbstractSubjectCenter.register(AbstractSubjectCenter.SubjectType.THREAD_POOL_DYNAMIC_REFRESH, new DynamicThreadPoolRefreshListener());
}
return ret;
}
@Override
public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Throwable t) {
}
}

@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
spring-boot-1.x=cn.hippo4j.agent.plugin.spring.boot.v1.define.EventPublishingRunListenerInstrumentation
spring-boot-1.x=cn.hippo4j.agent.plugin.spring.boot.v1.define.ApplicationContextInstrumentation

@ -17,13 +17,6 @@
</properties>
<dependencies>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-agent-spring-plugin-common</artifactId>
<scope>provided</scope>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
@ -33,15 +26,15 @@
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-dynamic-mode-config</artifactId>
<artifactId>hippo4j-agent-spring-plugin-common</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-client</artifactId>
<scope>provided</scope>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-dynamic-mode-config</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>

@ -17,28 +17,28 @@
package cn.hippo4j.agent.plugin.spring.boot.v2.interceptor;
import cn.hippo4j.common.logging.api.ILog;
import cn.hippo4j.common.logging.api.LogManager;
import cn.hippo4j.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import cn.hippo4j.agent.core.plugin.interceptor.enhance.InstanceMethodsAroundInterceptor;
import cn.hippo4j.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import cn.hippo4j.agent.plugin.spring.boot.v2.DynamicThreadPoolChangeHandlerSpring2x;
import cn.hippo4j.agent.plugin.spring.common.event.DynamicThreadPoolRefreshListener;
import cn.hippo4j.agent.plugin.spring.common.support.SpringPropertiesLoader;
import cn.hippo4j.agent.plugin.spring.common.support.SpringThreadPoolRegisterSupport;
import cn.hippo4j.common.extension.design.AbstractSubjectCenter;
import cn.hippo4j.threadpool.dynamic.api.ThreadPoolDynamicRefresh;
import cn.hippo4j.threadpool.dynamic.mode.config.refresher.event.DynamicThreadPoolRefreshListener;
import lombok.extern.slf4j.Slf4j;
import cn.hippo4j.common.logging.api.ILog;
import cn.hippo4j.common.logging.api.LogManager;
import cn.hippo4j.core.config.ApplicationContextHolder;
import org.springframework.context.ConfigurableApplicationContext;
import java.lang.reflect.Method;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Event publishing started interceptor
*/
@Slf4j
public class EventPublishingStartedInterceptor implements InstanceMethodsAroundInterceptor {
private static final AtomicBoolean isExecuted = new AtomicBoolean(false);
private static final ILog LOGGER = LogManager.getLogger(EventPublishingStartedInterceptor.class);
@Override
@ -54,11 +54,15 @@ public class EventPublishingStartedInterceptor implements InstanceMethodsAroundI
SpringThreadPoolRegisterSupport.registerThreadPoolInstances(context);
return ret;
}
// This logic will only be executed once
if (isExecuted.compareAndSet(false, true)) {
ApplicationContextHolder contextHolder = new ApplicationContextHolder();
contextHolder.setApplicationContext(context);
// Load Spring Properties
SpringPropertiesLoader.loadSpringProperties(context.getEnvironment());
ThreadPoolDynamicRefresh dynamicRefresh = new DynamicThreadPoolChangeHandlerSpring2x();
dynamicRefresh.registerListener();
AbstractSubjectCenter.register(AbstractSubjectCenter.SubjectType.THREAD_POOL_DYNAMIC_REFRESH,
new DynamicThreadPoolRefreshListener());
// register Dynamic ThreadPool Refresh Listener
AbstractSubjectCenter.register(AbstractSubjectCenter.SubjectType.THREAD_POOL_DYNAMIC_REFRESH, new DynamicThreadPoolRefreshListener());
}
return ret;
}

@ -35,13 +35,56 @@
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-dynamic-api</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-dynamic-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-kernel-alarm</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-core</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-adapter-web</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-infra-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-monitor-elasticsearch</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-monitor-local-log</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-monitor-micrometer</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
</dependencies>
</project>

@ -0,0 +1,218 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.agent.plugin.spring.common.alarm;
import cn.hippo4j.adapter.web.WebThreadPoolService;
import cn.hippo4j.agent.plugin.spring.common.support.ThreadPoolCheckAlarmSupport;
import cn.hippo4j.common.api.IExecutorProperties;
import cn.hippo4j.common.model.executor.ExecutorNotifyProperties;
import cn.hippo4j.common.model.executor.ExecutorProperties;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.NotifyPlatformProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.WebExecutorProperties;
import cn.hippo4j.threadpool.message.api.NotifyConfigBuilder;
import cn.hippo4j.threadpool.message.api.NotifyConfigDTO;
import cn.hippo4j.threadpool.message.core.service.AlarmControlHandler;
import cn.hippo4j.threadpool.message.core.service.ThreadPoolBaseSendMessageService;
import lombok.AllArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import static cn.hippo4j.agent.plugin.spring.common.support.SpringPropertiesLoader.BOOTSTRAP_CONFIG_PROPERTIES;
import static cn.hippo4j.common.constant.Constants.DEFAULT_INTERVAL;
/**
* This class is responsible for building the notification configurations for thread pools in an agent mode.
* It implements the {@link NotifyConfigBuilder} interface and provides methods to build and initialize
* notification configurations for various platforms and types (e.g., ALARM, CONFIG).
*
* <p>The configuration is based on the properties loaded from the bootstrap configuration and includes
* handling for alarm control and notification intervals.</p>
*
* TODO: This is copied from {@link cn.hippo4j.config.springboot.starter.notify.ConfigModeNotifyConfigBuilder} and can be refactored later
*/
@AllArgsConstructor
public class AgentModeNotifyConfigBuilder implements NotifyConfigBuilder {
private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolCheckAlarmSupport.class);
private final AlarmControlHandler alarmControlHandler;
private final WebThreadPoolService webThreadPoolService;
/**
* Builds the notification configurations for all executors defined in the bootstrap configuration.
*
* <p>This method filters the executors based on their alarm settings and constructs the notification
* configurations accordingly. If global alarm settings are disabled and there are no specific alarms
* configured for any executor, the method returns an empty map.</p>
*
* @return A map containing the notification configurations, keyed by the notification type (e.g., ALARM, CONFIG).
*/
public Map<String, List<NotifyConfigDTO>> buildNotify() {
Map<String, List<NotifyConfigDTO>> resultMap = new HashMap<>();
boolean globalAlarm = Optional.ofNullable(BOOTSTRAP_CONFIG_PROPERTIES.getDefaultExecutor())
.map(ExecutorProperties::getAlarm)
.orElse(true);
List<ExecutorProperties> executors = BOOTSTRAP_CONFIG_PROPERTIES.getExecutors();
if (CollectionUtil.isEmpty(executors)) {
LOGGER.warn("Failed to build notify, executors configuration is empty.");
return resultMap;
}
List<ExecutorProperties> actual = executors.stream()
.filter(each -> Optional.ofNullable(each.getAlarm())
.orElse(false))
.collect(Collectors.toList());
if (!globalAlarm && CollectionUtil.isEmpty(actual)) {
return resultMap;
}
for (ExecutorProperties executorProperties : executors) {
Map<String, List<NotifyConfigDTO>> buildSingleNotifyConfig = buildSingleNotifyConfig(executorProperties);
initCacheAndLock(buildSingleNotifyConfig);
resultMap.putAll(buildSingleNotifyConfig);
}
// register notify config for web
WebExecutorProperties webProperties = BOOTSTRAP_CONFIG_PROPERTIES.getWeb();
if (webProperties == null) {
return resultMap;
}
if (StringUtil.isBlank(webProperties.getThreadPoolId())) {
webProperties.setThreadPoolId(webThreadPoolService.getWebContainerType().getName());
}
Map<String, List<NotifyConfigDTO>> webSingleNotifyConfigMap = buildSingleNotifyConfig(webProperties);
initCacheAndLock(webSingleNotifyConfigMap);
resultMap.putAll(webSingleNotifyConfigMap);
return resultMap;
}
/**
* Builds the notification configurations for a single executor.
*
* <p>This method generates two types of notifications: ALARM and CONFIG. For each type, it creates
* notification configurations based on the platforms defined in the bootstrap configuration.</p>
*
* @param executorProperties The properties of the executor for which to build the notification configurations.
* @return A map containing the notification configurations for the given executor, keyed by the notification type.
*/
public Map<String, List<NotifyConfigDTO>> buildSingleNotifyConfig(IExecutorProperties executorProperties) {
String threadPoolId = executorProperties.getThreadPoolId();
Map<String, List<NotifyConfigDTO>> resultMap = new HashMap<>();
String alarmBuildKey = threadPoolId + "+ALARM";
List<NotifyConfigDTO> alarmNotifyConfigs = new ArrayList<>();
List<NotifyPlatformProperties> notifyPlatforms = BOOTSTRAP_CONFIG_PROPERTIES.getNotifyPlatforms();
for (NotifyPlatformProperties platformProperties : notifyPlatforms) {
NotifyConfigDTO notifyConfig = new NotifyConfigDTO();
notifyConfig.setPlatform(platformProperties.getPlatform());
notifyConfig.setTpId(threadPoolId);
notifyConfig.setType("ALARM");
notifyConfig.setSecret(platformProperties.getSecret());
notifyConfig.setSecretKey(getToken(platformProperties));
notifyConfig.setInterval(buildInterval(executorProperties));
notifyConfig.setReceives(buildReceive(executorProperties));
alarmNotifyConfigs.add(notifyConfig);
}
resultMap.put(alarmBuildKey, alarmNotifyConfigs);
String changeBuildKey = threadPoolId + "+CONFIG";
List<NotifyConfigDTO> changeNotifyConfigs = new ArrayList<>();
for (NotifyPlatformProperties platformProperties : notifyPlatforms) {
NotifyConfigDTO notifyConfig = new NotifyConfigDTO();
notifyConfig.setPlatform(platformProperties.getPlatform());
notifyConfig.setTpId(threadPoolId);
notifyConfig.setType("CONFIG");
notifyConfig.setSecretKey(getToken(platformProperties));
notifyConfig.setSecret(platformProperties.getSecret());
notifyConfig.setReceives(buildReceive(executorProperties));
changeNotifyConfigs.add(notifyConfig);
}
resultMap.put(changeBuildKey, changeNotifyConfigs);
return resultMap;
}
/**
* Retrieves the token for the given notification platform properties.
*
* <p>If the token is not explicitly set, the method returns the secret key as the fallback.</p>
*
* @param platformProperties The platform properties from which to retrieve the token.
* @return The token or secret key associated with the given platform properties.
*/
private String getToken(NotifyPlatformProperties platformProperties) {
return StringUtil.isNotBlank(platformProperties.getToken()) ? platformProperties.getToken() : platformProperties.getSecretKey();
}
/**
* Builds the notification interval for the given executor properties.
*
* <p>This method first checks the executor's specific notify configuration. If not set, it falls back
* to the default executor configuration in the bootstrap properties.</p>
*
* @param executorProperties The properties of the executor for which to build the notification interval.
* @return The notification interval in seconds.
*/
private int buildInterval(IExecutorProperties executorProperties) {
return Optional.ofNullable(executorProperties.getNotify())
.map(ExecutorNotifyProperties::getInterval)
.orElse(Optional.ofNullable(BOOTSTRAP_CONFIG_PROPERTIES.getDefaultExecutor())
.map(ExecutorProperties::getNotify)
.map(ExecutorNotifyProperties::getInterval)
.orElse(DEFAULT_INTERVAL));
}
/**
* Builds the notification recipients for the given executor properties.
*
* <p>This method first checks the executor's specific notify configuration. If not set, it falls back
* to the default executor configuration in the bootstrap properties.</p>
*
* @param executorProperties The properties of the executor for which to build the notification recipients.
* @return A string containing the recipients of the notifications.
*/
private String buildReceive(IExecutorProperties executorProperties) {
return Optional.ofNullable(executorProperties.getNotify())
.map(ExecutorNotifyProperties::getReceives)
.orElse(Optional.ofNullable(BOOTSTRAP_CONFIG_PROPERTIES.getDefaultExecutor())
.map(ExecutorProperties::getNotify)
.map(ExecutorNotifyProperties::getReceives).orElse(""));
}
/**
* Initializes the cache and lock mechanisms for the given notification configurations.
*
* <p>This method is primarily responsible for setting up alarm controls based on the notification
* configurations, ensuring that the appropriate cache and lock mechanisms are initialized for
* each thread pool and platform combination.</p>
*
* @param buildSingleNotifyConfig A map containing the notification configurations that need cache and lock initialization.
*/
public void initCacheAndLock(Map<String, List<NotifyConfigDTO>> buildSingleNotifyConfig) {
buildSingleNotifyConfig.forEach(
(key, val) -> val.stream()
.filter(each -> Objects.equals("ALARM", each.getType()))
.forEach(each -> alarmControlHandler.initCacheAndLock(each.getTpId(), each.getPlatform(), each.getInterval())));
}
}

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.agent.plugin.spring.common.conf;
import cn.hippo4j.agent.core.boot.SpringBootConfigNode;
/**
* Nacos Cloud config
*/
public class NacosCloudConfig {
public static class Spring {
/**
* Cloud
*/
public static class Cloud {
/**
* Nacos
*/
public static class Nacos {
/**
* Config
*/
@SpringBootConfigNode(root = NacosConfig.class)
public static class Config {
public static String SERVER_ADDR = "";
}
}
}
}
}

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.agent.plugin.spring.common.conf;
import cn.hippo4j.agent.core.boot.SpringBootConfigNode;
/**
* nacos config
*/
public class NacosConfig {
/**
* Nacos
*/
public static class Nacos {
/**
* Config
*/
@SpringBootConfigNode(root = NacosCloudConfig.class)
public static class Config {
public static String SERVER_ADDR = "";
}
}
}

@ -30,11 +30,13 @@ public class SpringBootConfig {
/**
* Spring
*/
@SpringBootConfigNode(root = SpringBootConfig.class)
public static class Spring {
/**
* Dynamic
*/
@SpringBootConfigNode(root = SpringBootConfig.class)
public static class Dynamic {
/**
@ -52,6 +54,18 @@ public class SpringBootConfig {
public static List<String> NAMESPACE = Arrays.asList("application");
}
@SpringBootConfigNode(root = SpringBootConfig.class)
public static class Nacos {
public static String SERVER_ADDR = "localhost";
public static List<String> NAMESPACE = Arrays.asList("");
public static String DATA_ID = "";
public static String GROUP = "DEFAULT_GROUP";
}
/**
* Monitor
*/
@ -67,10 +81,27 @@ public class SpringBootConfig {
public static Long initialDelay = 10000L;
public static Long collectInterval = 5000L;
public static Integer AGENT_MICROMETER_PORT;
}
public static String CONFIG_FILE_TYPE;
}
}
@SpringBootConfigNode(root = SpringBootConfig.class)
public static class Application {
public static String name = "";
}
@SpringBootConfigNode(root = SpringBootConfig.class)
public static class Profiles {
public static String active = "";
}
}
}

@ -15,8 +15,11 @@
* limitations under the License.
*/
package cn.hippo4j.threadpool.dynamic.mode.config.refresher.event;
package cn.hippo4j.agent.plugin.spring.common.event;
import cn.hippo4j.agent.core.util.CollectionUtil;
import cn.hippo4j.agent.plugin.spring.common.alarm.AgentModeNotifyConfigBuilder;
import cn.hippo4j.agent.plugin.spring.common.support.ThreadPoolCheckAlarmSupport;
import cn.hippo4j.common.executor.ThreadPoolExecutorHolder;
import cn.hippo4j.common.executor.ThreadPoolExecutorRegistry;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
@ -29,10 +32,17 @@ import cn.hippo4j.common.logging.api.LogManager;
import cn.hippo4j.common.model.executor.ExecutorProperties;
import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import cn.hippo4j.threadpool.message.api.NotifyConfigDTO;
import cn.hippo4j.threadpool.message.core.request.ChangeParameterNotifyRequest;
import cn.hippo4j.threadpool.message.core.service.GlobalNotifyAlarmManage;
import cn.hippo4j.threadpool.message.core.service.ThreadPoolBaseSendMessageService;
import cn.hippo4j.threadpool.message.core.service.ThreadPoolNotifyAlarm;
import lombok.RequiredArgsConstructor;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -56,6 +66,10 @@ public class DynamicThreadPoolRefreshListener implements Observer<BootstrapConfi
String threadPoolId = properties.getThreadPoolId();
// Check whether the thread pool configuration is empty and whether the parameters have changed
ThreadPoolExecutorHolder executorHolder = ThreadPoolExecutorRegistry.getHolder(threadPoolId);
/*
* Check whether the notification configuration is consistent, this operation will not trigger the notification.
*/
checkNotifyConsistencyAndReplace(properties);
if (executorHolder.isEmpty() || !checkPropertiesConsistency(executorHolder, properties)) {
continue;
}
@ -65,6 +79,56 @@ public class DynamicThreadPoolRefreshListener implements Observer<BootstrapConfi
}
}
/**
* Check notify consistency and replace.
*
* @param executorProperties
*/
private void checkNotifyConsistencyAndReplace(ExecutorProperties executorProperties) {
boolean checkNotifyConfig = false;
boolean checkNotifyAlarm = false;
ThreadPoolBaseSendMessageService threadPoolBaseSendMessageService = ThreadPoolCheckAlarmSupport.getThreadPoolBaseSendMessageService();
AgentModeNotifyConfigBuilder agentNotifyConfigBuilder = ThreadPoolCheckAlarmSupport.getAgentNotifyConfigBuilder();
// Build a new notification configuration for the Agent
Map<String, List<NotifyConfigDTO>> newDynamicThreadPoolNotifyMap = agentNotifyConfigBuilder.buildSingleNotifyConfig(executorProperties);
Map<String, List<NotifyConfigDTO>> notifyConfigs = threadPoolBaseSendMessageService.getNotifyConfigs();
if (CollectionUtil.isNotEmpty(notifyConfigs)) {
for (Map.Entry<String, List<NotifyConfigDTO>> each : newDynamicThreadPoolNotifyMap.entrySet()) {
if (checkNotifyConfig) {
break;
}
List<NotifyConfigDTO> notifyConfigDTOS = notifyConfigs.get(each.getKey());
for (NotifyConfigDTO notifyConfig : each.getValue()) {
if (!notifyConfigDTOS.contains(notifyConfig)) {
checkNotifyConfig = true;
break;
}
}
}
}
if (checkNotifyConfig) {
agentNotifyConfigBuilder.initCacheAndLock(newDynamicThreadPoolNotifyMap);
threadPoolBaseSendMessageService.putPlatform(newDynamicThreadPoolNotifyMap);
}
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = GlobalNotifyAlarmManage.get(executorProperties.getThreadPoolId());
if (threadPoolNotifyAlarm != null) {
Boolean isAlarm = executorProperties.getAlarm();
Integer activeAlarm = executorProperties.getActiveAlarm();
Integer capacityAlarm =
executorProperties.getCapacityAlarm();
if ((isAlarm != null && !Objects.equals(isAlarm, threadPoolNotifyAlarm.getAlarm())) || (activeAlarm != null && !Objects.equals(activeAlarm,
threadPoolNotifyAlarm.getActiveAlarm())) || (capacityAlarm != null && !Objects.equals(capacityAlarm, threadPoolNotifyAlarm.getCapacityAlarm()))) {
checkNotifyAlarm = true;
threadPoolNotifyAlarm.setAlarm(Optional.ofNullable(isAlarm).orElse(threadPoolNotifyAlarm.getAlarm()));
threadPoolNotifyAlarm.setActiveAlarm(Optional.ofNullable(activeAlarm).orElse(threadPoolNotifyAlarm.getActiveAlarm()));
threadPoolNotifyAlarm.setCapacityAlarm(Optional.ofNullable(capacityAlarm).orElse(threadPoolNotifyAlarm.getCapacityAlarm()));
}
}
if (checkNotifyConfig || checkNotifyAlarm) {
LOG.info("[{}] Dynamic thread pool notification property changes.", executorProperties.getThreadPoolId());
}
}
/**
* Check consistency.
*
@ -132,15 +196,57 @@ public class DynamicThreadPoolRefreshListener implements Observer<BootstrapConfi
}
}
/**
* Sends a change notification message for a thread pool when its properties are updated.
* This method logs the changes in thread pool properties and sends a notification to the platform
* with the updated configuration details.
*
* @param executorHolder The holder object for the thread pool executor, containing its current state and properties.
* @param properties The new properties for the thread pool that are being applied.
*/
private void sendChangeNotificationMessage(ThreadPoolExecutorHolder executorHolder, ExecutorProperties properties) {
ExecutorProperties executorProperties = executorHolder.getExecutorProperties();
ExecutorProperties beforeProperties = executorHolder.getExecutorProperties();
LOG.info(CHANGE_THREAD_POOL_TEXT,
executorHolder.getThreadPoolId(),
String.format(CHANGE_DELIMITER, executorProperties.getCorePoolSize(), properties.getCorePoolSize()),
String.format(CHANGE_DELIMITER, executorProperties.getMaximumPoolSize(), properties.getMaximumPoolSize()),
String.format(CHANGE_DELIMITER, executorProperties.getQueueCapacity(), properties.getQueueCapacity()),
String.format(CHANGE_DELIMITER, executorProperties.getKeepAliveTime(), properties.getKeepAliveTime()),
String.format(CHANGE_DELIMITER, executorProperties.getRejectedHandler(), properties.getRejectedHandler()),
String.format(CHANGE_DELIMITER, executorProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut()));
String.format(CHANGE_DELIMITER, beforeProperties.getCorePoolSize(), properties.getCorePoolSize()),
String.format(CHANGE_DELIMITER, beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize()),
String.format(CHANGE_DELIMITER, beforeProperties.getQueueCapacity(), properties.getQueueCapacity()),
String.format(CHANGE_DELIMITER, beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime()),
String.format(CHANGE_DELIMITER, beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut()),
String.format(CHANGE_DELIMITER, beforeProperties.getRejectedHandler(), properties.getRejectedHandler()),
String.format(CHANGE_DELIMITER, beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut()));
// Send platform configuration notification
ChangeParameterNotifyRequest changeRequest = buildChangeRequest(beforeProperties, properties);
ThreadPoolCheckAlarmSupport.getThreadPoolConfigChangeHandler().sendPoolConfigChange(changeRequest);
}
/**
* Construct change parameter notify request instance.
*
* @param beforeProperties old properties
* @param properties new properties
* @return instance
*/
private ChangeParameterNotifyRequest buildChangeRequest(ExecutorProperties beforeProperties, ExecutorProperties properties) {
ChangeParameterNotifyRequest changeParameterNotifyRequest = ChangeParameterNotifyRequest.builder()
.beforeCorePoolSize(beforeProperties.getCorePoolSize())
.beforeMaximumPoolSize(beforeProperties.getMaximumPoolSize())
.beforeAllowsCoreThreadTimeOut(beforeProperties.getAllowCoreThreadTimeOut())
.beforeKeepAliveTime(beforeProperties.getKeepAliveTime())
.beforeQueueCapacity(beforeProperties.getQueueCapacity())
.beforeRejectedName(beforeProperties.getRejectedHandler())
.beforeExecuteTimeOut(beforeProperties.getExecuteTimeOut())
.blockingQueueName(properties.getBlockingQueue())
.nowCorePoolSize(Optional.ofNullable(properties.getCorePoolSize()).orElse(beforeProperties.getCorePoolSize()))
.nowMaximumPoolSize(Optional.ofNullable(properties.getMaximumPoolSize()).orElse(beforeProperties.getMaximumPoolSize()))
.nowAllowsCoreThreadTimeOut(Optional.ofNullable(properties.getAllowCoreThreadTimeOut()).orElse(beforeProperties.getAllowCoreThreadTimeOut()))
.nowKeepAliveTime(Optional.ofNullable(properties.getKeepAliveTime()).orElse(beforeProperties.getKeepAliveTime()))
.nowQueueCapacity(Optional.ofNullable(properties.getQueueCapacity()).orElse(beforeProperties.getQueueCapacity()))
.nowRejectedName(Optional.ofNullable(properties.getRejectedHandler()).orElse(beforeProperties.getRejectedHandler()))
.nowExecuteTimeOut(Optional.ofNullable(properties.getExecuteTimeOut()).orElse(beforeProperties.getExecuteTimeOut()))
.build();
changeParameterNotifyRequest.setThreadPoolId(beforeProperties.getThreadPoolId());
return changeParameterNotifyRequest;
}
}

@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.agent.plugin.spring.common.monitor;
import cn.hippo4j.agent.plugin.spring.common.support.SpringPropertiesLoader;
import cn.hippo4j.common.extension.spi.ServiceLoaderRegistry;
import cn.hippo4j.common.logging.api.ILog;
import cn.hippo4j.common.logging.api.LogManager;
import cn.hippo4j.common.monitor.MonitorCollectTypeEnum;
import cn.hippo4j.common.monitor.MonitorHandlerTypeEnum;
import cn.hippo4j.core.executor.state.ThreadPoolRunStateHandler;
import cn.hippo4j.monitor.elasticsearch.AdapterThreadPoolElasticSearchMonitorHandler;
import cn.hippo4j.monitor.elasticsearch.DynamicThreadPoolElasticSearchMonitorHandler;
import cn.hippo4j.monitor.elasticsearch.WebThreadPoolElasticSearchMonitorHandler;
import cn.hippo4j.monitor.local.log.AdapterThreadPoolLocalLogMonitorHandler;
import cn.hippo4j.monitor.local.log.DynamicThreadPoolLocalLogMonitorHandler;
import cn.hippo4j.monitor.local.log.WebThreadPoolLocalLogMonitorHandler;
import cn.hippo4j.monitor.micrometer.AdapterThreadPoolMicrometerMonitorHandler;
import cn.hippo4j.monitor.micrometer.DynamicThreadPoolMicrometerMonitorHandler;
import cn.hippo4j.monitor.micrometer.WebThreadPoolMicrometerMonitorHandler;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.MonitorProperties;
import cn.hippo4j.threadpool.monitor.api.ThreadPoolMonitor;
import org.springframework.core.env.ConfigurableEnvironment;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
/**
* This class is responsible for configuring and initializing monitoring handlers
* for various types of thread pools. It maps specific monitoring types (e.g., Micrometer,
* Log, Elasticsearch) to their corresponding handler initializers and manages the
* setup process based on the provided configuration.
*/
public class MonitorHandlersConfigurator {
private static final ILog LOGGER = LogManager.getLogger(MonitorHandlersConfigurator.class);
// Maps thread pool types to their corresponding handler constructors
private static final Map<String, BiConsumer<MonitorHandlerTypeEnum, MonitorHandlerContext>> handlerMap = new HashMap<>();
static {
// Initialize the handler map with specific monitoring types
handlerMap.put(MonitorCollectTypeEnum.MICROMETER.getValue(), MonitorHandlersConfigurator::handleMicrometer);
handlerMap.put(MonitorCollectTypeEnum.LOG.getValue(), MonitorHandlersConfigurator::handleLog);
handlerMap.put(MonitorCollectTypeEnum.ELASTICSEARCH.getValue(), MonitorHandlersConfigurator::handleElasticSearch);
}
/**
* Initializes the monitoring handlers based on the provided monitoring configuration.
* <p>
* This method performs the following tasks:
* <ul>
* <li>Parses the configured monitoring types and thread pool types.</li>
* <li>Initializes a monitoring context with the necessary thread pool monitors and state handler.</li>
* <li>For each configured monitoring type, invokes the corresponding handler initializer
* for each relevant thread pool type.</li>
* <li>Logs a warning if an unrecognized monitoring type is encountered.</li>
* <li>Registers and adds thread pool monitors that match the configured monitoring types.</li>
* </ul>
*
* @param monitor The monitoring properties configuration.
* @param environment The application environment from which additional configuration can be loaded.
* @param threadPoolMonitors A list to hold the initialized thread pool monitors.
*/
public static void initializeMonitorHandlers(MonitorProperties monitor, ConfigurableEnvironment environment, List<ThreadPoolMonitor> threadPoolMonitors) {
List<String> collectTypes = Arrays.asList(monitor.getCollectTypes().split(","));
List<String> threadPoolTypes = Arrays.asList(monitor.getThreadPoolTypes().split(","));
ThreadPoolRunStateHandler threadPoolRunStateHandler = new ThreadPoolRunStateHandler(
SpringPropertiesLoader.inetUtils, environment);
MonitorHandlerContext context = new MonitorHandlerContext(threadPoolMonitors, threadPoolRunStateHandler);
// Initialize handlers for each configured monitoring type and thread pool type
for (String collectType : collectTypes) {
if (handlerMap.containsKey(collectType)) {
for (MonitorHandlerTypeEnum type : MonitorHandlerTypeEnum.values()) {
if (threadPoolTypes.contains(type.name().toLowerCase())) {
handlerMap.get(collectType).accept(type, context);
}
}
} else {
LOGGER.warn("[Hippo4j-Agent] MonitorConfigurator initialize Unrecognized collect type: [{}]", collectType);
}
}
// Register and add dynamic thread pool monitors matching the configured types
Collection<ThreadPoolMonitor> dynamicThreadPoolMonitors = ServiceLoaderRegistry.getSingletonServiceInstances(ThreadPoolMonitor.class);
dynamicThreadPoolMonitors.stream().filter(each -> collectTypes.contains(each.getType())).forEach(threadPoolMonitors::add);
}
/**
* Initializes Micrometer-based monitoring handlers for the specified thread pool type.
*
* @param type The type of thread pool to be monitored.
* @param context The context containing the monitors and state handler.
*/
private static void handleMicrometer(MonitorHandlerTypeEnum type, MonitorHandlerContext context) {
switch (type) {
case DYNAMIC:
context.monitors.add(new DynamicThreadPoolMicrometerMonitorHandler(context.threadPoolRunStateHandler));
break;
case WEB:
context.monitors.add(new WebThreadPoolMicrometerMonitorHandler());
break;
case ADAPTER:
context.monitors.add(new AdapterThreadPoolMicrometerMonitorHandler());
break;
}
}
/**
* Initializes Log-based monitoring handlers for the specified thread pool type.
*
* @param type The type of thread pool to be monitored.
* @param context The context containing the monitors and state handler.
*/
private static void handleLog(MonitorHandlerTypeEnum type, MonitorHandlerContext context) {
switch (type) {
case DYNAMIC:
context.monitors.add(new DynamicThreadPoolLocalLogMonitorHandler(context.threadPoolRunStateHandler));
break;
case WEB:
context.monitors.add(new WebThreadPoolLocalLogMonitorHandler());
break;
case ADAPTER:
context.monitors.add(new AdapterThreadPoolLocalLogMonitorHandler());
break;
}
}
/**
* Initializes Elasticsearch-based monitoring handlers for the specified thread pool type.
*
* @param type The type of thread pool to be monitored.
* @param context The context containing the monitors and state handler.
*/
private static void handleElasticSearch(MonitorHandlerTypeEnum type, MonitorHandlerContext context) {
switch (type) {
case DYNAMIC:
context.monitors.add(new DynamicThreadPoolElasticSearchMonitorHandler(context.threadPoolRunStateHandler));
break;
case WEB:
context.monitors.add(new WebThreadPoolElasticSearchMonitorHandler(context.threadPoolRunStateHandler));
break;
case ADAPTER:
context.monitors.add(new AdapterThreadPoolElasticSearchMonitorHandler(context.threadPoolRunStateHandler));
break;
}
}
/**
* A helper class to manage the context in which monitoring handlers are initialized.
*/
private static class MonitorHandlerContext {
List<ThreadPoolMonitor> monitors;
ThreadPoolRunStateHandler threadPoolRunStateHandler;
MonitorHandlerContext(List<ThreadPoolMonitor> monitors, ThreadPoolRunStateHandler handler) {
this.monitors = monitors;
this.threadPoolRunStateHandler = handler;
}
}
}

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.agent.plugin.spring.common.monitor;
import cn.hippo4j.agent.plugin.spring.common.conf.SpringBootConfig;
import cn.hippo4j.common.logging.api.ILog;
import cn.hippo4j.common.logging.api.LogManager;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
/**
* This class is responsible for exposing Prometheus metrics via an HTTP endpoint.
* It initializes the Prometheus registry, binds it to the global metrics registry,
* and starts an HTTP server to serve the metrics data.
*/
public class MonitorMetricEndpoint {
private static final ILog LOGGER = LogManager.getLogger(MonitorHandlersConfigurator.class);
/**
* Starts the Prometheus metrics HTTP server.
* <p>
* This method performs the following steps:
* <ul>
* <li>Initializes the PrometheusMeterRegistry with the default configuration.</li>
* <li>Binds the Prometheus registry to the global CompositeMeterRegistry.</li>
* <li>Attempts to start an HTTP server on the configured port to expose the Prometheus metrics.</li>
* </ul>
* If the port is not configured, or if there is an error starting the server, appropriate error messages
* are logged, and the method returns without starting the server.
* </p>
*/
public static void startPrometheusEndpoint() {
// Initialize the Prometheus registry
PrometheusMeterRegistry prometheusRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
// Bind the Prometheus registry to the global Metrics registry
CompositeMeterRegistry globalRegistry = Metrics.globalRegistry;
globalRegistry.add(prometheusRegistry);
// Get the configured port for the Prometheus metrics HTTP server
Integer port = SpringBootConfig.Spring.Dynamic.Thread_Pool.Monitor.AGENT_MICROMETER_PORT;
if (port == null) {
LOGGER.error(
"[Hippo4j-Agent] Failed to start Prometheus metrics endpoint server. Please configure the exposed endpoint by adding: spring.dynamic.thread-pool.monitor.agent-micrometer-port=xxx to the configuration file");
return;
}
// Create the HTTP server
HttpServer server = null;
try {
server = HttpServer.create(new InetSocketAddress(port), 0);
} catch (IOException e) {
LOGGER.error("[Hippo4j-Agent] Failed to start Prometheus metrics endpoint server", e);
return;
}
// Register the /actuator/prometheus context to handle metrics requests
server.createContext("/actuator/prometheus", exchange -> {
String response = prometheusRegistry.scrape(); // Get metrics data in Prometheus format
exchange.sendResponseHeaders(200, response.getBytes().length);
try (OutputStream os = exchange.getResponseBody()) {
os.write(response.getBytes());
}
});
// Start the server
server.start();
LOGGER.info("[Hippo4j-Agent] Prometheus metrics server started on port {}", port);
}
}

@ -32,6 +32,6 @@ public class SpringEnvironmentSupport {
Map<String, Object> map = new HashMap<>();
map.put("spring.dynamic.thread-pool.enable", false); // Switch off in non-Agent mode
MapPropertySource propertySource = new MapPropertySource("Hippo4j-Agent-Properties", map);
environment.getPropertySources().addFirst(propertySource);
environment.getPropertySources().addLast(propertySource);
}
}

@ -18,16 +18,27 @@
package cn.hippo4j.agent.plugin.spring.common.support;
import cn.hippo4j.agent.core.boot.SpringBootConfigInitializer;
import cn.hippo4j.agent.plugin.spring.common.toolkit.SpringPropertyBinder;
import cn.hippo4j.common.extension.design.AbstractSubjectCenter;
import cn.hippo4j.common.logging.api.ILog;
import cn.hippo4j.common.logging.api.LogManager;
import cn.hippo4j.core.toolkit.inet.InetUtils;
import cn.hippo4j.core.toolkit.inet.InetUtilsProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import lombok.Getter;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.EnumerablePropertySource;
import org.springframework.core.env.PropertiesPropertySource;
import org.springframework.core.env.PropertySource;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import static cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties.PREFIX;
/**
* Spring properties loader
@ -36,6 +47,19 @@ public class SpringPropertiesLoader {
private static final ILog LOGGER = LogManager.getLogger(SpringPropertiesLoader.class);
/**
* A flag used to indicate whether loadSpringProperties() method has been called,
* Used to determine whether the SpringPropertiesLoader has been initialized
*/
@Getter
private static final AtomicBoolean active = new AtomicBoolean(Boolean.FALSE);
public static BootstrapConfigProperties BOOTSTRAP_CONFIG_PROPERTIES = new BootstrapConfigProperties();
public static InetUtilsProperties INET_UTILS_PROPERTIES = new InetUtilsProperties();
public static InetUtils inetUtils;
public static void loadSpringProperties(ConfigurableEnvironment environment) {
Iterator<PropertySource<?>> iterator = environment.getPropertySources().iterator();
Properties properties = new Properties();
@ -43,6 +67,12 @@ public class SpringPropertiesLoader {
while (iterator.hasNext()) {
propertySourceList.add(iterator.next());
}
// Sort to ensure that the configuration in the configuration center is after the array
// To get the latest configuration information
propertySourceList.sort(Comparator.comparing(
// Make sure that Nacos boot's propertySource is placed first in the propertySourceList
item -> !item.getClass().getName().equals("com.alibaba.nacos.spring.core.env.NacosPropertySource")));
for (int i = propertySourceList.size() - 1; i >= 0; i--) {
PropertySource<?> propertySource = propertySourceList.get(i);
if (!(propertySource instanceof EnumerablePropertySource)) {
@ -65,5 +95,20 @@ public class SpringPropertiesLoader {
}
}
SpringBootConfigInitializer.setSpringProperties(properties);
PropertiesPropertySource propertySource = new PropertiesPropertySource("customPropertySource", properties);
environment.getPropertySources().addFirst(propertySource);
// initialize BootstrapConfigProperties
BOOTSTRAP_CONFIG_PROPERTIES = SpringPropertyBinder.bindProperties(environment, PREFIX, BootstrapConfigProperties.class);
INET_UTILS_PROPERTIES = SpringPropertyBinder.bindProperties(environment, InetUtilsProperties.PREFIX, InetUtilsProperties.class);
// Send AGENT_SPRING_PROPERTIES_LOADER_COMPLETED notification event Before active is false
if (AbstractSubjectCenter.get(AbstractSubjectCenter.SubjectType.AGENT_SPRING_PROPERTIES_LOADER_COMPLETED) != null && Boolean.FALSE.equals(active.get())) {
AbstractSubjectCenter.notify(AbstractSubjectCenter.SubjectType.AGENT_SPRING_PROPERTIES_LOADER_COMPLETED, () -> "");
}
active.set(Boolean.TRUE);
// Enable the thread pool check alert handler
ThreadPoolCheckAlarmSupport.enableThreadPoolCheckAlarmHandler();
// Enable thread pool monitor handler
ThreadPoolMonitorSupport.enableThreadPoolMonitorHandler(environment);
}
}

@ -22,17 +22,30 @@ import cn.hippo4j.common.executor.ThreadPoolExecutorRegistry;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.handler.DynamicThreadPoolAdapterChoose;
import cn.hippo4j.common.model.executor.ExecutorNotifyProperties;
import cn.hippo4j.common.model.executor.ExecutorProperties;
import cn.hippo4j.common.toolkit.BooleanUtil;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import cn.hippo4j.threadpool.message.core.service.GlobalNotifyAlarmManage;
import cn.hippo4j.threadpool.message.core.service.ThreadPoolNotifyAlarm;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static cn.hippo4j.common.constant.Constants.DYNAMIC_THREAD_POOL_EXECUTOR;
/**
* Spring thread pool register support
@ -41,6 +54,14 @@ public class SpringThreadPoolRegisterSupport {
private static final Logger LOGGER = LoggerFactory.getLogger(SpringThreadPoolRegisterSupport.class);
private static final int DEFAULT_ACTIVE_ALARM = 80;
private static final int DEFAULT_CAPACITY_ALARM = 80;
private static final int DEFAULT_INTERVAL = 5;
private static final String DEFAULT_RECEIVES = "";
public static void registerThreadPoolInstances(ApplicationContext context) {
Map<ThreadPoolExecutor, Class<?>> referencedClassMap = ThreadPoolExecutorRegistry.REFERENCED_CLASS_MAP;
for (Map.Entry<ThreadPoolExecutor, Class<?>> entry : referencedClassMap.entrySet()) {
@ -52,7 +73,7 @@ public class SpringThreadPoolRegisterSupport {
Object value = field.get(null);
if (value == enhancedInstance) {
String threadPoolId = declaredClass.getName() + "#" + field.getName();
register(threadPoolId, enhancedInstance);
register(threadPoolId, enhancedInstance, Boolean.TRUE);
break;
}
} catch (IllegalAccessException e) {
@ -74,25 +95,147 @@ public class SpringThreadPoolRegisterSupport {
if (executor == null) {
LOGGER.warn("[Hippo4j-Agent] Thread pool is null, ignore bean registration. beanName={}, beanClass={}", beanName, bean.getClass().getName());
} else {
register(beanName, executor);
register(beanName, executor, Boolean.FALSE);
}
}
LOGGER.info("[Hippo4j-Agent] Registered thread pool instances successfully.");
}
public static void register(String threadPoolId, ThreadPoolExecutor executor) {
public static void register(String threadPoolId, ThreadPoolExecutor executor, Boolean isAgentScanEnhancePool) {
if (executor == null) {
return;
}
ExecutorProperties executorProperties = ExecutorProperties.builder()
.threadPoolId(threadPoolId)
.corePoolSize(executor.getCorePoolSize())
.maximumPoolSize(executor.getMaximumPoolSize())
.allowCoreThreadTimeOut(BooleanUtil.toBoolean(String.valueOf(executor.allowsCoreThreadTimeOut())))
.blockingQueue(BlockingQueueTypeEnum.getBlockingQueueTypeEnumByName(executor.getQueue().getClass().getSimpleName()).getName())
.queueCapacity(executor.getQueue().remainingCapacity())
.rejectedHandler(RejectedPolicyTypeEnum.getRejectedPolicyTypeEnumByName(executor.getRejectedExecutionHandler().getClass().getSimpleName()).getName())
.build();
ExecutorProperties executorProperties = SpringPropertiesLoader.BOOTSTRAP_CONFIG_PROPERTIES.getExecutors().stream()
.filter(each -> Objects.equals(threadPoolId, each.getThreadPoolId()))
.findFirst()
.orElse(null);
// Determines the thread pool that is currently obtained by bean scanning
if (Objects.isNull(executorProperties)) {
if (isAgentScanEnhancePool) {
throw new RuntimeException(String.format("The thread pool id [%s] does not exist in the configuration.", threadPoolId));
} else {
// Thread pool that do not require enhancement are skipped by the agent
return;
}
}
try {
executorProperties = buildActualExecutorProperties(executorProperties);
// Replace the original configuration and refresh the thread pool
threadPoolParamReplace(executor, executorProperties);
} catch (Exception ex) {
LOGGER.error("[Hippo4j-Agent] Failed to initialize thread pool configuration.", ex);
}
// Build notification information entity
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = buildThreadPoolNotifyAlarm(executorProperties);
GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm);
ThreadPoolExecutorRegistry.putHolder(threadPoolId, executor, executorProperties);
}
/**
* Thread-pool param replace.
*
* @param executor dynamic thread-pool executor
* @param executorProperties executor properties
*/
private static void threadPoolParamReplace(ThreadPoolExecutor executor, ExecutorProperties executorProperties) {
BlockingQueue workQueue = BlockingQueueTypeEnum.createBlockingQueue(executorProperties.getBlockingQueue(), executorProperties.getQueueCapacity());
cn.hippo4j.common.toolkit.ReflectUtil.setFieldValue(executor, "workQueue", workQueue);
ThreadPoolExecutorUtil.safeSetPoolSize(executor, executorProperties.getCorePoolSize(), executorProperties.getMaximumPoolSize());
executor.setKeepAliveTime(executorProperties.getKeepAliveTime(), TimeUnit.SECONDS);
executor.allowCoreThreadTimeOut(executorProperties.getAllowCoreThreadTimeOut());
executor.setRejectedExecutionHandler(RejectedPolicyTypeEnum.createPolicy(executorProperties.getRejectedHandler()));
// Reflection sets the thread pool setExecuteTimeOut
if (DYNAMIC_THREAD_POOL_EXECUTOR.equals(executor.getClass().getName())) {
try {
Method setExecuteTimeOutMethod = executor.getClass().getMethod("setExecuteTimeOut", Long.class);
Long executeTimeOut = executorProperties.getExecuteTimeOut();
if (executeTimeOut != null) {
setExecuteTimeOutMethod.invoke(executor, executeTimeOut);
}
} catch (Exception e) {
LOGGER.error("[Hippo4j-Agent] Failed to set executeTimeOut.", e);
}
}
}
/**
* Build actual executor properties.
*
* @param executorProperties executor properties
* @return executor properties
*/
private static ExecutorProperties buildActualExecutorProperties(ExecutorProperties executorProperties) {
return Optional.ofNullable(SpringPropertiesLoader.BOOTSTRAP_CONFIG_PROPERTIES.getDefaultExecutor()).map(each -> buildExecutorProperties(executorProperties)).orElse(executorProperties);
}
/**
* Build executor properties.
*
* @param executorProperties executor properties
* @return executor properties
*/
private static ExecutorProperties buildExecutorProperties(ExecutorProperties executorProperties) {
BootstrapConfigProperties configProperties = SpringPropertiesLoader.BOOTSTRAP_CONFIG_PROPERTIES;
return ExecutorProperties.builder()
.corePoolSize(Optional.ofNullable(executorProperties.getCorePoolSize())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getCorePoolSize).get()))
.maximumPoolSize(Optional.ofNullable(executorProperties.getMaximumPoolSize())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getMaximumPoolSize).get()))
.allowCoreThreadTimeOut(Optional.ofNullable(executorProperties.getAllowCoreThreadTimeOut())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getAllowCoreThreadTimeOut).get()))
.keepAliveTime(Optional.ofNullable(executorProperties.getKeepAliveTime())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getKeepAliveTime).get()))
.blockingQueue(Optional.ofNullable(executorProperties.getBlockingQueue())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getBlockingQueue).get()))
.executeTimeOut(Optional.ofNullable(executorProperties.getExecuteTimeOut())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getExecuteTimeOut).orElse(0L)))
.queueCapacity(Optional.ofNullable(executorProperties.getQueueCapacity())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getQueueCapacity).get()))
.rejectedHandler(Optional.ofNullable(executorProperties.getRejectedHandler())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getRejectedHandler).get()))
.threadNamePrefix(StringUtil.isBlank(executorProperties.getThreadNamePrefix()) ? executorProperties.getThreadPoolId() : executorProperties.getThreadNamePrefix())
.threadPoolId(executorProperties.getThreadPoolId())
.alarm(Optional.ofNullable(executorProperties.getAlarm())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getAlarm).orElse(null)))
.activeAlarm(Optional.ofNullable(executorProperties.getActiveAlarm())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getActiveAlarm).orElse(null)))
.capacityAlarm(Optional.ofNullable(executorProperties.getCapacityAlarm())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getCapacityAlarm).orElse(null)))
.notify(Optional.ofNullable(executorProperties.getNotify())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getNotify).orElse(null)))
.nodes(Optional.ofNullable(executorProperties.getNodes())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getNodes).orElse(null)))
.build();
}
/**
* Build thread-pool notify alarm
*
* @param executorProperties executor properties
* @return thread-pool notify alarm
*/
private static ThreadPoolNotifyAlarm buildThreadPoolNotifyAlarm(ExecutorProperties executorProperties) {
BootstrapConfigProperties configProperties = SpringPropertiesLoader.BOOTSTRAP_CONFIG_PROPERTIES;
ExecutorNotifyProperties notify = Optional.ofNullable(executorProperties).map(ExecutorProperties::getNotify).orElse(null);
boolean isAlarm = Optional.ofNullable(executorProperties.getAlarm())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getAlarm).orElse(true));
int activeAlarm = Optional.ofNullable(executorProperties.getActiveAlarm())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getActiveAlarm).orElse(DEFAULT_ACTIVE_ALARM));
int capacityAlarm = Optional.ofNullable(executorProperties.getCapacityAlarm())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getCapacityAlarm).orElse(DEFAULT_CAPACITY_ALARM));
int interval = Optional.ofNullable(notify)
.map(ExecutorNotifyProperties::getInterval)
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getNotify).map(ExecutorNotifyProperties::getInterval).orElse(DEFAULT_INTERVAL));
String receive = Optional.ofNullable(notify)
.map(ExecutorNotifyProperties::getReceives)
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getNotify).map(ExecutorNotifyProperties::getReceives).orElse(DEFAULT_RECEIVES));
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(isAlarm, activeAlarm, capacityAlarm);
threadPoolNotifyAlarm.setInterval(interval);
threadPoolNotifyAlarm.setReceives(receive);
return threadPoolNotifyAlarm;
}
}

@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.agent.plugin.spring.common.support;
import cn.hippo4j.agent.plugin.spring.common.alarm.AgentModeNotifyConfigBuilder;
import cn.hippo4j.agent.plugin.spring.common.conf.SpringBootConfig;
import cn.hippo4j.agent.plugin.spring.common.toolkit.SpringPropertyBinder;
import cn.hippo4j.common.propertie.EnvironmentProperties;
import cn.hippo4j.core.config.ApplicationContextHolder;
import cn.hippo4j.core.toolkit.IdentifyUtil;
import cn.hippo4j.core.toolkit.inet.InetUtils;
import cn.hippo4j.core.toolkit.inet.InetUtilsProperties;
import cn.hippo4j.threadpool.alarm.handler.DefaultThreadPoolCheckAlarmHandler;
import cn.hippo4j.threadpool.message.api.NotifyConfigDTO;
import cn.hippo4j.threadpool.message.core.platform.DingSendMessageHandler;
import cn.hippo4j.threadpool.message.core.platform.LarkSendMessageHandler;
import cn.hippo4j.threadpool.message.core.platform.WeChatSendMessageHandler;
import cn.hippo4j.threadpool.message.core.service.AlarmControlHandler;
import cn.hippo4j.threadpool.message.core.service.DefaultThreadPoolConfigChangeHandler;
import cn.hippo4j.threadpool.message.core.service.SendMessageHandler;
import cn.hippo4j.threadpool.message.core.service.ThreadPoolBaseSendMessageService;
import lombok.Getter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.env.ConfigurableEnvironment;
import java.util.List;
import java.util.Map;
import static cn.hippo4j.agent.plugin.spring.common.support.SpringPropertiesLoader.BOOTSTRAP_CONFIG_PROPERTIES;
/**
* The {@code ThreadPoolCheckAlarmSupport} class provides functionality to enable and configure
* a thread pool check alarm handler. This is typically used to monitor thread pools for potential
* issues and send notifications based on the configured alert mechanisms.
*/
public class ThreadPoolCheckAlarmSupport {
private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolCheckAlarmSupport.class);
@Getter
private static ThreadPoolBaseSendMessageService threadPoolBaseSendMessageService;
@Getter
private static DefaultThreadPoolConfigChangeHandler threadPoolConfigChangeHandler;
@Getter
private static AgentModeNotifyConfigBuilder agentNotifyConfigBuilder;
private static DefaultThreadPoolCheckAlarmHandler checkAlarmHandler;
/**
* Enables the thread pool check alarm handler if the corresponding configuration property is set to {@code true}.
* <p>
* This method performs the following actions:
* <ul>
* <li>Checks the value of the {@code enable} property in the bootstrap configuration. If it is {@code true}, it proceeds.</li>
* <li>Initializes environment properties needed for the monitoring process.</li>
* <li>Creates an instance of {@link AlarmControlHandler} and {@link ThreadPoolBaseSendMessageService} with necessary dependencies.</li>
* <li>Initializes and registers message handlers and notification configurations.</li>
* <li>Creates an instance of {@link DefaultThreadPoolCheckAlarmHandler} and schedules it to start monitoring the thread pool.</li>
* </ul>
*/
public static void enableThreadPoolCheckAlarmHandler() {
// Check if the thread pool checker is enabled in the bootstrap configuration properties
if (Boolean.TRUE.equals(BOOTSTRAP_CONFIG_PROPERTIES.getEnable())) {
// Initialize EnvironmentProperties
initializeEnvironmentProperties();
// Initialize the AlarmControlHandler and ThreadPoolBaseSendMessageService
AlarmControlHandler alarmControlHandler = new AlarmControlHandler();
threadPoolBaseSendMessageService = createThreadPoolBaseSendMessageService(alarmControlHandler);
threadPoolConfigChangeHandler = new DefaultThreadPoolConfigChangeHandler(threadPoolBaseSendMessageService);
// Initialize the alarm platform information
initializeSendMessageHandlers(threadPoolBaseSendMessageService, alarmControlHandler);
// Execute scheduled task to check an alarm
scheduleExecute(threadPoolBaseSendMessageService);
}
}
/**
* Initializes environment properties used for thread pool monitoring.
* <p>
* This method sets the state check interval, item ID, application name, and active profile from the bootstrap configuration.
*/
private static void initializeEnvironmentProperties() {
EnvironmentProperties.checkStateInterval = Long.valueOf(BOOTSTRAP_CONFIG_PROPERTIES.getCheckStateInterval());
EnvironmentProperties.itemId = BOOTSTRAP_CONFIG_PROPERTIES.getItemId();
EnvironmentProperties.applicationName = SpringBootConfig.Spring.Application.name;
EnvironmentProperties.active = SpringBootConfig.Spring.Profiles.active;
ConfigurableEnvironment environment = ApplicationContextHolder.getBean(ConfigurableEnvironment.class);
InetUtilsProperties inetUtilsProperties = SpringPropertyBinder.bindProperties(environment, InetUtilsProperties.PREFIX, InetUtilsProperties.class);
SpringPropertiesLoader.inetUtils = new InetUtils(inetUtilsProperties);
IdentifyUtil.generate(environment, SpringPropertiesLoader.inetUtils);
}
/**
* Creates and returns a new instance of {@link ThreadPoolBaseSendMessageService} with the specified {@link AlarmControlHandler}.
*
* @param alarmControlHandler The {@link AlarmControlHandler} used to control and handle alarms.
* @return A new instance of {@link ThreadPoolBaseSendMessageService}.
*/
private static ThreadPoolBaseSendMessageService createThreadPoolBaseSendMessageService(AlarmControlHandler alarmControlHandler) {
return new ThreadPoolBaseSendMessageService(alarmControlHandler);
}
/**
* Initializes and registers the message handlers and notification configurations in the specified
* {@link ThreadPoolBaseSendMessageService}.
* <p>
* This method creates instances of various {@link SendMessageHandler} implementations and registers them.
* It also constructs and registers notification configurations using the {@link AgentModeNotifyConfigBuilder}.
*
* @param threadPoolBaseSendMessageService The {@link ThreadPoolBaseSendMessageService} in which message handlers and notification configurations will be registered.
* @param alarmControlHandler The {@link AlarmControlHandler} used to handle alarms and notifications.
*/
private static void initializeSendMessageHandlers(ThreadPoolBaseSendMessageService threadPoolBaseSendMessageService, AlarmControlHandler alarmControlHandler) {
// Initialize message handlers
DingSendMessageHandler dingSendMessageHandler = new DingSendMessageHandler();
WeChatSendMessageHandler weChatSendMessageHandler = new WeChatSendMessageHandler();
LarkSendMessageHandler larkSendMessageHandler = new LarkSendMessageHandler();
// Register message handlers
threadPoolBaseSendMessageService.getSendMessageHandlers().put(dingSendMessageHandler.getType(), dingSendMessageHandler);
threadPoolBaseSendMessageService.getSendMessageHandlers().put(weChatSendMessageHandler.getType(), weChatSendMessageHandler);
threadPoolBaseSendMessageService.getSendMessageHandlers().put(larkSendMessageHandler.getType(), larkSendMessageHandler);
// Construct and register notification configurations
// TODO : register notify config for web , null Can be replaced with tomcat, jetty, undertow, etc. implementation classes
agentNotifyConfigBuilder = new AgentModeNotifyConfigBuilder(alarmControlHandler, null);
Map<String, List<NotifyConfigDTO>> notifyConfigs = agentNotifyConfigBuilder.buildNotify();
threadPoolBaseSendMessageService.getNotifyConfigs().putAll(notifyConfigs);
}
// 启动或重新启动检查任务
public static void scheduleExecute(ThreadPoolBaseSendMessageService threadPoolBaseSendMessageService) {
// If a task is already running, cancel it first
if (checkAlarmHandler != null) {
// Shut down the thread pool and prepare to regenerate the listener thread pool
checkAlarmHandler.destroyScheduleExecute();
}
// Initialize the thread pool check alarm handler with necessary services
checkAlarmHandler = new DefaultThreadPoolCheckAlarmHandler(threadPoolBaseSendMessageService);
// Run the check alarm handler to start monitoring the thread pool
checkAlarmHandler.scheduleExecute();
}
}

@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.agent.plugin.spring.common.support;
import cn.hippo4j.agent.plugin.spring.common.monitor.MonitorHandlersConfigurator;
import cn.hippo4j.agent.plugin.spring.common.monitor.MonitorMetricEndpoint;
import cn.hippo4j.common.executor.ThreadPoolExecutorRegistry;
import cn.hippo4j.common.extension.spi.ServiceLoaderRegistry;
import cn.hippo4j.common.monitor.MonitorCollectTypeEnum;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.MonitorProperties;
import cn.hippo4j.threadpool.monitor.api.ThreadPoolMonitor;
import lombok.Getter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.Environment;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static cn.hippo4j.agent.plugin.spring.common.support.SpringPropertiesLoader.BOOTSTRAP_CONFIG_PROPERTIES;
/**
* This class provides support for monitoring dynamic thread pools in an application.
* It includes methods to initialize and enable monitoring components, and schedules
* periodic data collection from the thread pools.
*/
public class ThreadPoolMonitorSupport {
private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolMonitorSupport.class);
/**
* A flag used to indicate whether enableThreadPoolMonitorHandler() method has been called,
* Used to determine whether the ThreadPoolMonitorHandler has been enable
*/
@Getter
private static final AtomicBoolean active = new AtomicBoolean(Boolean.FALSE);
private static final ScheduledExecutorService collectScheduledExecutor = new ScheduledThreadPoolExecutor(1, r -> new Thread(r, "client.agent.scheduled.collect.data"));
private static final List<ThreadPoolMonitor> threadPoolMonitors = new ArrayList<>();
static {
// Register the ThreadPoolMonitor service with the ServiceLoaderRegistry
ServiceLoaderRegistry.register(ThreadPoolMonitor.class);
}
/**
* Enables the dynamic thread pool monitoring handler.
* <p>
* This method performs the following steps:
* <ul>
* <li>Validates the monitoring configuration from the environment properties.</li>
* <li>Initializes monitoring components for the dynamic thread pools.</li>
* <li>Exposes metric endpoints, such as Prometheus, if configured.</li>
* <li>Schedules periodic collection of metrics from the thread pools.</li>
* </ul>
* If the monitoring configuration is invalid or disabled, the method returns without
* enabling the monitoring handler.
* </p>
*
* @param environment The environment from which the monitoring configuration is loaded.
*/
public static void enableThreadPoolMonitorHandler(Environment environment) {
BootstrapConfigProperties properties = BOOTSTRAP_CONFIG_PROPERTIES;
MonitorProperties monitor = properties.getMonitor();
if (Objects.isNull(monitor) || !monitor.getEnable() || StringUtil.isBlank(monitor.getThreadPoolTypes()) || StringUtil.isBlank(monitor.getCollectTypes())) {
return;
}
LOGGER.info("[Hippo4j-Agent] Start monitoring the running status of dynamic thread pools.");
// Initialize monitoring components for the dynamic thread pools
MonitorHandlersConfigurator.initializeMonitorHandlers(monitor, (ConfigurableEnvironment) environment, threadPoolMonitors);
// Determine whether the task is successfully enabled
// return directly if it has been enabled, and do not start the thread pool repeatedly
if (Boolean.TRUE.equals(active.get()))
return;
// Expose metric endpoints based on the configured collect types
List<String> collectTypes = Arrays.asList(monitor.getCollectTypes().split(","));
if (collectTypes.contains(MonitorCollectTypeEnum.MICROMETER.getValue())) {
MonitorMetricEndpoint.startPrometheusEndpoint();
}
// Schedule periodic collection of metrics from the thread pools
Runnable scheduledTask = scheduleRunnable();
collectScheduledExecutor.scheduleWithFixedDelay(scheduledTask, monitor.getInitialDelay(), monitor.getCollectInterval(), TimeUnit.MILLISECONDS);
active.set(true);
if (ThreadPoolExecutorRegistry.getThreadPoolExecutorSize() > 0) {
LOGGER.info("[Hippo4j-Agent] Dynamic thread pool: [{}]. The dynamic thread pool starts data collection and reporting.", ThreadPoolExecutorRegistry.getThreadPoolExecutorSize());
}
}
/**
* Returns a Runnable task that collects metrics from the dynamic thread pools.
* <p>
* This method is used to create a task that periodically iterates over the
* registered thread pool monitors and collects their metrics. If an exception
* occurs during the collection, it is logged.
* </p>
*
* @return A Runnable task that performs the metrics collection.
*/
private static Runnable scheduleRunnable() {
return () -> {
for (ThreadPoolMonitor each : threadPoolMonitors) {
try {
each.collect();
} catch (Throwable ex) {
LOGGER.error("[Hippo4j-Agent] Error monitoring the running status of dynamic thread pool. Type: {}", each.getType(), ex);
}
}
};
}
}

@ -0,0 +1,241 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.agent.plugin.spring.common.toolkit;
import cn.hippo4j.threadpool.dynamic.mode.config.parser.ConfigFileTypeEnum;
import org.springframework.beans.BeanWrapper;
import org.springframework.beans.BeanWrapperImpl;
import org.springframework.core.env.Environment;
import org.springframework.core.env.PropertySource;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.MapPropertySource;
import java.beans.PropertyEditorSupport;
import java.util.*;
/**
* CustomPropertyBinder is a utility class for binding properties from Spring's Environment
* to Java objects based on a given prefix. This is useful for dynamically binding configurations
* in Spring applications where configuration properties are organized hierarchically and need to be
* mapped to corresponding Java objects.
*
* <p>This class handles complex property structures, including nested properties and collections,
* ensuring that all properties prefixed with the specified string are correctly bound to the target
* Java object.</p>
*/
public class SpringPropertyBinder {
/**
* Binds properties from the Spring Environment to an instance of the specified configuration class.
*
* @param environment the Spring Environment containing property sources.
* @param prefix the prefix to filter properties for binding (e.g., "spring.dynamic.thread-pool").
* @param clazz the class type of the configuration object to bind properties to.
* @param <T> the type of the configuration class.
* @return an instance of the configuration class with properties bound from the environment.
* @throws RuntimeException if there is an error instantiating the configuration class or binding properties.
*/
public static <T> T bindProperties(Environment environment, String prefix, Class<T> clazz) {
try {
// Create an instance of the target class
T instance = clazz.getDeclaredConstructor().newInstance();
BeanWrapper beanWrapper = new BeanWrapperImpl(instance);
// Register custom editor for ConfigFileTypeEnum to handle specific type conversions
beanWrapper.registerCustomEditor(ConfigFileTypeEnum.class, new ConfigFileTypeEnumEditor());
// Iterate over all property keys that match the given prefix
for (String key : getAllPropertyKeys(environment, prefix)) {
String propertyName = key.substring(prefix.length() + 1); // Remove prefix from the property key
String[] tokens = propertyName.split("\\."); // Split the property name by dot for nested properties
setPropertyValue(tokens, beanWrapper, environment.getProperty(key)); // Set the property value recursively
}
return instance;
} catch (Exception e) {
throw new RuntimeException("Unable to bind properties to " + clazz.getName(), e);
}
}
/**
* Binds properties from a map to an instance of the specified configuration class.
*
* @param configInfo a map containing property paths and their values.
* @param prefix the prefix to filter properties for binding (e.g., "spring.dynamic.thread-pool").
* @param clazz the class type of the configuration object to bind properties to.
* @param <T> the type of the configuration class.
* @return an instance of the configuration class with properties bound from the configInfo map.
*/
public static <T> T bindProperties(Map<Object, Object> configInfo, String prefix, Class<T> clazz) {
try {
// Create an instance of the target class
T instance = clazz.getDeclaredConstructor().newInstance();
BeanWrapper beanWrapper = new BeanWrapperImpl(instance);
// Register custom editor for specific type conversions (if needed)
beanWrapper.registerCustomEditor(ConfigFileTypeEnum.class, new ConfigFileTypeEnumEditor());
// Iterate over all property keys that match the given prefix in the configInfo map
for (Map.Entry<Object, Object> entry : configInfo.entrySet()) {
String key = entry.getKey().toString();
if (key.startsWith(prefix)) {
String propertyName = key.substring(prefix.length() + 1); // Remove prefix from the property key
String[] tokens = propertyName.split("\\."); // Split the property name by dot for nested properties
setPropertyValue(tokens, beanWrapper, entry.getValue().toString()); // Set the property value recursively
}
}
return instance;
} catch (Exception e) {
throw new RuntimeException("Unable to bind properties to " + clazz.getName(), e);
}
}
/**
* Recursively sets property values on the target object, handling nested properties and collections.
*
* @param tokens an array of property path tokens (e.g., ["nested", "property", "name"]).
* @param beanWrapper the BeanWrapper instance used to manipulate the target object.
* @param value the value to set on the target property.
*/
private static void setPropertyValue(String[] tokens, BeanWrapper beanWrapper, String value) {
for (int i = 0; i < tokens.length - 1; i++) {
String token = tokens[i];
if (token.matches(".*\\[\\d+\\]$")) { // Handle array/list property
token = token.substring(0, token.indexOf('['));
int index = Integer.parseInt(tokens[i].substring(token.length() + 1, tokens[i].length() - 1));
token = convertToCamelCase(token); // Convert token to camelCase if necessary
List<Object> list = (List<Object>) beanWrapper.getPropertyValue(token);
if (list == null) {
list = new ArrayList<>();
beanWrapper.setPropertyValue(convertToCamelCase(token), list); // Initialize the list if it's null
}
// Ensure the list has enough size to accommodate the index
if (list.size() <= index) {
try {
// Instantiate the list element if it does not exist
list.add(index, beanWrapper.getPropertyTypeDescriptor(token)
.getElementTypeDescriptor().getType().newInstance());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
// Move the beanWrapper context to the current list element
beanWrapper = new BeanWrapperImpl(list.get(index));
} else { // Handle simple or nested property
Object nestedObject = beanWrapper.getPropertyValue(token);
if (nestedObject == null) {
Class<?> nestedClass = beanWrapper.getPropertyType(token);
if (Map.class.isAssignableFrom(nestedClass)) {
nestedObject = new HashMap<>(); // Initialize nested Map if necessary
} else {
try {
nestedObject = nestedClass.getDeclaredConstructor().newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
beanWrapper.setPropertyValue(convertToCamelCase(token), nestedObject);
}
// Move the beanWrapper context to the nested object
beanWrapper = new BeanWrapperImpl(nestedObject);
}
}
// Finally, set the actual property value on the resolved object
String finalPropertyName = tokens[tokens.length - 1];
Object currentObject = beanWrapper.getWrappedInstance();
if (currentObject instanceof Map) {
// If the current object is a Map, set the value as a key-value pair
((Map<String, Object>) currentObject).put(finalPropertyName, value);
} else {
// Otherwise, set it as a simple property
beanWrapper.setPropertyValue(convertToCamelCase(finalPropertyName), value);
}
}
/**
* Retrieves all property keys from the environment that start with the given prefix.
*
* @param environment the Spring Environment containing property sources.
* @param prefix the prefix to filter property keys.
* @return a set of property keys that match the prefix.
*/
private static Set<String> getAllPropertyKeys(Environment environment, String prefix) {
Set<String> keys = new HashSet<>();
// Iterate through all property sources in the environment
for (PropertySource<?> propertySource : ((ConfigurableEnvironment) environment).getPropertySources()) {
if (propertySource instanceof MapPropertySource) {
Map<String, Object> source = ((MapPropertySource) propertySource).getSource();
// Collect keys that start with the specified prefix
for (String key : source.keySet()) {
if (key.startsWith(prefix)) {
keys.add(key);
}
}
}
}
return keys;
}
/**
* Converts a dashed-separated string to camelCase.
* <p>
* For example, "my-property-name" -> "myPropertyName".
*
* @param dashed the dashed-separated string to be converted.
* @return the camelCase representation of the input string.
*/
private static String convertToCamelCase(String dashed) {
String[] parts = dashed.split("-");
return Arrays.stream(parts)
.map(part -> part.substring(0, 1).toUpperCase() + part.substring(1)) // Capitalize each part
.reduce((first, second) -> first + second) // Concatenate all parts together
.map(result -> result.substring(0, 1).toLowerCase() + result.substring(1)) // Lowercase the first letter
.orElse(dashed);
}
/**
* ConfigFileTypeEnumEditor is a custom property editor for converting string representations
* of {@link ConfigFileTypeEnum} into the corresponding enum instances.
* <p>
* This editor is useful in scenarios where properties are read as strings but need to be
* converted to enum types for further processing.
*/
public static class ConfigFileTypeEnumEditor extends PropertyEditorSupport {
/**
* Converts the given text value to the corresponding {@link ConfigFileTypeEnum} instance.
* <p>
* This method overrides the default implementation to parse the input string and convert
* it into a {@link ConfigFileTypeEnum}. If the input string does not match any known enum
* value, an {@link IllegalArgumentException} will be thrown.
*
* @param text the string representation of the enum to be converted.
* @throws IllegalArgumentException if the text does not match any known enum value.
*/
@Override
public void setAsText(String text) throws IllegalArgumentException {
setValue(ConfigFileTypeEnum.of(text));
}
}
}

@ -16,7 +16,6 @@
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>

@ -52,7 +52,16 @@ public class ThreadPoolExecutorConstructorMethodInterceptor implements InstanceC
}
StackTraceElement declaredClassStackTraceElement = stackTraceElements.get(0);
String declaredClassName = declaredClassStackTraceElement.getClassName();
Class<?> declaredClass = Thread.currentThread().getContextClassLoader().loadClass(declaredClassName);
Class<?> declaredClass = null;
try {
declaredClass = Thread.currentThread().getContextClassLoader().loadClass(declaredClassName);
} catch (ClassNotFoundException e) {
// The thread pool in the Agent plug-in is loaded by AgentclassLodaer.
// Due to the delegation model, it can only be searched upwards, so searching here will result in ClassNotFount.
// Because the parent of AgentClassLoader is AppclassLoder, it is ignored here ,skip the enhancement logic
LOGGER.debug("searching {} result in ClassNotFount , so skip the enhancement logic", declaredClassName);
return;
}
ThreadPoolExecutorRegistry.REFERENCED_CLASS_MAP.put((ThreadPoolExecutor) objInst, declaredClass);
}

@ -140,7 +140,6 @@
<groupId>org.objenesis</groupId>
<artifactId>objenesis</artifactId>
<version>${objenesis.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.tomakehurst</groupId>

@ -0,0 +1,43 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-agent-example</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>hippo4j-agent-example-core</artifactId>
<properties>
<maven.deploy.skip>true</maven.deploy.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-message</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>1.23</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>1.23</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.example.agent.config.apollo;
package cn.hippo4j.example.agent.core.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@ -46,7 +46,7 @@ public class ThreadPoolConfiguration {
// 演示 Agent 模式修改线程池
// -------------------------------------------------------------------------
public static final ThreadPoolExecutor RUN_MESSAGE_SEND_TASK_EXECUTOR = new ThreadPoolExecutor(
public static final ThreadPoolExecutor AGENT_RUN_MESSAGE_SEND_TASK_EXECUTOR = new ThreadPoolExecutor(
1,
10,
1024,

@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.example.agent.core.inittest;
import cn.hippo4j.common.executor.ThreadPoolExecutorHolder;
import cn.hippo4j.common.executor.ThreadPoolExecutorRegistry;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Test alarm send message.
*/
@Slf4j
@Component
public class AlarmSendMessageTest {
private static final int SLEEP_TIME = 10240124;
private static final int INITIAL_DELAY = 3;
private static final String RUN_MESSAGE_SEND_TASK_EXECUTOR = "runMessageSendTaskExecutor";
private static final String AGENT_RUN_MESSAGE_SEND_TASK_EXECUTOR = "cn.hippo4j.example.agent.core.config.ThreadPoolConfiguration#AGENT_RUN_MESSAGE_SEND_TASK_EXECUTOR";
/**
* Test alarm notification.
* If you need to run this single test, add @PostConstruct to the method.
*/
@SuppressWarnings("all")
// @PostConstruct
public void alarmSendMessageTest() {
ScheduledExecutorService scheduledThreadPool = Executors.newSingleThreadScheduledExecutor();
scheduledThreadPool.scheduleWithFixedDelay(() -> {
ThreadPoolExecutorHolder executorHolder = ThreadPoolExecutorRegistry.getHolder(AGENT_RUN_MESSAGE_SEND_TASK_EXECUTOR);
ThreadPoolExecutor poolExecutor = executorHolder.getExecutor();
try {
poolExecutor.execute(() -> {
try {
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
} catch (Exception ex) {
log.error("Throw reject policy.", ex.getMessage());
}
}, INITIAL_DELAY, 2, TimeUnit.SECONDS);
scheduledThreadPool.scheduleWithFixedDelay(() -> {
ThreadPoolExecutorHolder executorHolder = ThreadPoolExecutorRegistry.getHolder(RUN_MESSAGE_SEND_TASK_EXECUTOR);
ThreadPoolExecutor poolExecutor = executorHolder.getExecutor();
try {
poolExecutor.execute(() -> {
try {
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
} catch (Exception ex) {
log.error("Throw reject policy.", ex.getMessage());
}
}, INITIAL_DELAY, 2, TimeUnit.SECONDS);
}
}

@ -0,0 +1,83 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-agent-example</artifactId>
<version>2.0.0-SNAPSHOT</version>
</parent>
<artifactId>hippo4j-threadpool-agent-config-apollo-spring-boot-1x</artifactId>
<properties>
<maven.deploy.skip>true</maven.deploy.skip>
<spring-boot.version>1.5.22.RELEASE</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-agent-example-core</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-client</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-context</artifactId>
<version>1.3.6.RELEASE</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.example.agent.config.apollo.v1;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Agent config Nacos example application.
*/
@SpringBootApplication(scanBasePackages = "cn.hippo4j.example.agent.core")
public class AgentConfigApolloSpringBoot1xExampleApplication {
public static void main(String[] args) {
SpringApplication.run(AgentConfigApolloSpringBoot1xExampleApplication.class, args);
}
}

@ -0,0 +1,65 @@
server.port=8092
server.servlet.context-path=/example
app.id=dynamic-threadpool-example
apollo.meta=http://127.0.0.1:8080
apollo.autoUpdateInjectedSpringProperties=true
apollo.bootstrap.enabled=true
apollo.bootstrap.namespaces=application
apollo.bootstrap.eagerLoad.enabled=true
# The following parameters are used for testing
env=dev
apollo.configService=http://127.0.0.1:8080
spring.profiles.active=dev
spring.application.name=hippo4j-config-apollo-spring-boot-starter-example
spring.dynamic.thread-pool.enable=true
spring.dynamic.thread-pool.banner=true
spring.dynamic.thread-pool.check-state-interval=10
spring.dynamic.thread-pool.monitor.enable=true
spring.dynamic.thread-pool.monitor.collect-types=micrometer
spring.dynamic.thread-pool.monitor.thread-pool-types=dynamic
spring.dynamic.thread-pool.monitor.initial-delay=3000
spring.dynamic.thread-pool.monitor.collect-interval=3000
spring.dynamic.thread-pool.monitor.agent-micrometer-port=29999
spring.dynamic.thread-pool.notify-platforms[0].platform=LARK
spring.dynamic.thread-pool.notify-platforms[0].token=6de41bdc-0799-45be-b128-7cddb9e777f0
#spring.dynamic.thread-pool.notify-platforms[1].platform=WECHAT
#spring.dynamic.thread-pool.notify-platforms[1].token=ac0426a5-c712-474c-9bff-72b8b8f5caff
#spring.dynamic.thread-pool.notify-platforms[2].platform=DING
#spring.dynamic.thread-pool.notify-platforms[2].token=56417ebba6a27ca352f0de77a2ae9da66d01f39610b5ee8a6033c60ef9071c55
spring.dynamic.thread-pool.apollo.namespace=application
spring.dynamic.thread-pool.config-file-type=properties
spring.dynamic.thread-pool.executors[0].thread-name-prefix = DynamicThreadPoolConfig#FIELD1
spring.dynamic.thread-pool.executors[0].core-pool-size = 2
spring.dynamic.thread-pool.executors[0].thread-pool-id = cn.hippo4j.example.agent.core.config.ThreadPoolConfiguration#AGENT_RUN_MESSAGE_SEND_TASK_EXECUTOR
spring.dynamic.thread-pool.executors[0].maximum-pool-size = 20
spring.dynamic.thread-pool.executors[0].queue-capacity = 1024
spring.dynamic.thread-pool.executors[0].blocking-queue = ResizableCapacityLinkedBlockingQueue
spring.dynamic.thread-pool.executors[0].execute-time-out = 800
spring.dynamic.thread-pool.executors[0].rejected-handler = AbortPolicy
spring.dynamic.thread-pool.executors[0].keep-alive-time = 6691
spring.dynamic.thread-pool.executors[0].allow-core-thread-time-out = true
spring.dynamic.thread-pool.executors[0].alarm = true
spring.dynamic.thread-pool.executors[0].active-alarm = 80
spring.dynamic.thread-pool.executors[0].capacity-alarm = 80
spring.dynamic.thread-pool.executors[0].notify.interval = 8
spring.dynamic.thread-pool.executors[0].notify.receives = nobodyiam
spring.dynamic.thread-pool.executors[1].thread-pool-id = runMessageSendTaskExecutor
spring.dynamic.thread-pool.executors[1].thread-name-prefix = runMessageSendTaskExecutor
spring.dynamic.thread-pool.executors[1].core-pool-size = 3
spring.dynamic.thread-pool.executors[1].maximum-pool-size = 4
spring.dynamic.thread-pool.executors[1].queue-capacity = 1024
spring.dynamic.thread-pool.executors[1].blocking-queue = ResizableCapacityLinkedBlockingQueue
spring.dynamic.thread-pool.executors[1].execute-time-out = 800
spring.dynamic.thread-pool.executors[1].rejected-handler = AbortPolicy
spring.dynamic.thread-pool.executors[1].keep-alive-time = 6691
spring.dynamic.thread-pool.executors[1].allow-core-thread-time-out = true
spring.dynamic.thread-pool.executors[1].active-alarm = 80
spring.dynamic.thread-pool.executors[1].capacity-alarm = 80
spring.dynamic.thread-pool.executors[1].notify.interval = 8
spring.dynamic.thread-pool.executors[1].notify.receives = nobodyiam

@ -16,6 +16,12 @@
</properties>
<dependencies>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-agent-example-core</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>

@ -23,7 +23,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Agent config apollo example application.
*/
@SpringBootApplication
@SpringBootApplication(scanBasePackages = "cn.hippo4j.example.agent.core")
public class AgentConfigApolloExampleApplication {
public static void main(String[] args) {

@ -1,39 +1,42 @@
server.port=8092
server.servlet.context-path=/example
app.id=dynamic-threadpool-example
apollo.meta=http://127.0.0.1:8080
apollo.autoUpdateInjectedSpringProperties=true
apollo.bootstrap.enabled=true
apollo.bootstrap.namespaces=application
apollo.bootstrap.eagerLoad.enabled=true
# The following parameters are used for testing
env=dev
apollo.configService=http://127.0.0.1:8080
spring.profiles.active=dev
spring.application.name=hippo4j-config-apollo-spring-boot-starter-example
management.metrics.export.prometheus.enabled=true
management.server.port=29998
management.endpoints.web.exposure.include=*
spring.dynamic.thread-pool.enable=true
spring.dynamic.thread-pool.banner=true
spring.dynamic.thread-pool.check-state-interval=3
#spring.dynamic.thread-pool.monitor.enable=true
#spring.dynamic.thread-pool.monitor.collect-types=micrometer
#spring.dynamic.thread-pool.monitor.thread-pool-types=dynamic,web
#spring.dynamic.thread-pool.monitor.initial-delay=10000
#spring.dynamic.thread-pool.monitor.collect-interval=5000
#spring.dynamic.thread-pool.notify-platforms[0].platform=WECHAT
#spring.dynamic.thread-pool.notify-platforms[0].token=ac0426a5-c712-474c-9bff-72b8b8f5caff
#spring.dynamic.thread-pool.notify-platforms[1].platform=DING
#spring.dynamic.thread-pool.notify-platforms[1].token=56417ebba6a27ca352f0de77a2ae9da66d01f39610b5ee8a6033c60ef9071c55
#spring.dynamic.thread-pool.notify-platforms[2].platform=LARK
#spring.dynamic.thread-pool.notify-platforms[2].token=2cbf2808-3839-4c26-a04d-fd201dd51f9e
spring.dynamic.thread-pool.check-state-interval=10
spring.dynamic.thread-pool.monitor.enable=true
spring.dynamic.thread-pool.monitor.collect-types=micrometer
spring.dynamic.thread-pool.monitor.thread-pool-types=dynamic
spring.dynamic.thread-pool.monitor.initial-delay=3000
spring.dynamic.thread-pool.monitor.collect-interval=3000
spring.dynamic.thread-pool.monitor.agent-micrometer-port=29999
spring.dynamic.thread-pool.notify-platforms[0].platform=LARK
spring.dynamic.thread-pool.notify-platforms[0].token=6de41bdc-0799-45be-b128-7cddb9e777f0
#spring.dynamic.thread-pool.notify-platforms[1].platform=WECHAT
#spring.dynamic.thread-pool.notify-platforms[1].token=ac0426a5-c712-474c-9bff-72b8b8f5caff
#spring.dynamic.thread-pool.notify-platforms[2].platform=DING
#spring.dynamic.thread-pool.notify-platforms[2].token=56417ebba6a27ca352f0de77a2ae9da66d01f39610b5ee8a6033c60ef9071c55
spring.dynamic.thread-pool.apollo.namespace=application
spring.dynamic.thread-pool.config-file-type=properties
spring.dynamic.thread-pool.executors[0].thread-name-prefix = DynamicThreadPoolConfig#FIELD1
spring.dynamic.thread-pool.executors[0].core-pool-size = 2
spring.dynamic.thread-pool.executors[0].thread-pool-id = cn.hippo4j.example.agent.config.apollo.ThreadPoolConfiguration#RUN_MESSAGE_SEND_TASK_EXECUTOR
spring.dynamic.thread-pool.executors[0].thread-pool-id = cn.hippo4j.example.agent.core.config.ThreadPoolConfiguration#AGENT_RUN_MESSAGE_SEND_TASK_EXECUTOR
spring.dynamic.thread-pool.executors[0].maximum-pool-size = 20
spring.dynamic.thread-pool.executors[0].queue-capacity = 1024
spring.dynamic.thread-pool.executors[0].blocking-queue = ResizableCapacityLinkedBlockingQueue
@ -46,3 +49,18 @@ spring.dynamic.thread-pool.executors[0].active-alarm = 80
spring.dynamic.thread-pool.executors[0].capacity-alarm = 80
spring.dynamic.thread-pool.executors[0].notify.interval = 8
spring.dynamic.thread-pool.executors[0].notify.receives = nobodyiam
spring.dynamic.thread-pool.executors[1].thread-pool-id = runMessageSendTaskExecutor
spring.dynamic.thread-pool.executors[1].thread-name-prefix = runMessageSendTaskExecutor
spring.dynamic.thread-pool.executors[1].core-pool-size = 3
spring.dynamic.thread-pool.executors[1].maximum-pool-size = 4
spring.dynamic.thread-pool.executors[1].queue-capacity = 1024
spring.dynamic.thread-pool.executors[1].blocking-queue = ResizableCapacityLinkedBlockingQueue
spring.dynamic.thread-pool.executors[1].execute-time-out = 800
spring.dynamic.thread-pool.executors[1].rejected-handler = AbortPolicy
spring.dynamic.thread-pool.executors[1].keep-alive-time = 6691
spring.dynamic.thread-pool.executors[1].allow-core-thread-time-out = true
spring.dynamic.thread-pool.executors[1].active-alarm = 80
spring.dynamic.thread-pool.executors[1].capacity-alarm = 80
spring.dynamic.thread-pool.executors[1].notify.interval = 8
spring.dynamic.thread-pool.executors[1].notify.receives = nobodyiam

@ -0,0 +1,74 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-agent-example</artifactId>
<version>${revision}</version>
</parent>
<artifactId>hippo4j-threadpool-agent-config-nacos-spring-boot-1x</artifactId>
<properties>
<maven.deploy.skip>true</maven.deploy.skip>
<spring-boot.version>1.5.22.RELEASE</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-agent-example-core</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
<version>1.5.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-context</artifactId>
<version>1.3.0.RELEASE</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.example.agent.config.nacos.v1;
import com.alibaba.nacos.api.exception.NacosException;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Agent config Nacos example application.
*/
@SpringBootApplication(scanBasePackages = "cn.hippo4j.example.agent.core")
public class AgentConfigNacosSpringBoot1xExampleApplication {
public static void main(String[] args) throws NacosException {
SpringApplication.run(AgentConfigNacosSpringBoot1xExampleApplication.class, args);
}
}

@ -0,0 +1,63 @@
server.port=8092
server.servlet.context-path=/example
spring.profiles.active=dev
spring.application.name=hippo4j-config-nacos-spring-boot-starter-example
# The following parameters are used for testing
spring.cloud.nacos.config.server-addr=127.0.0.1:8848
spring.cloud.nacos.config.name=dynamic-threadpool-example-config
spring.cloud.nacos.config.file-extension=properties
spring.cloud.nacos.config.refresh.enabled=true
spring.dynamic.thread-pool.enable=true
spring.dynamic.thread-pool.banner=true
spring.dynamic.thread-pool.check-state-interval=3
spring.dynamic.thread-pool.monitor.enable=true
spring.dynamic.thread-pool.monitor.collect-types=micrometer
spring.dynamic.thread-pool.monitor.thread-pool-types=dynamic
spring.dynamic.thread-pool.monitor.agent-micrometer-port=29999
spring.dynamic.thread-pool.monitor.initial-delay=3000
spring.dynamic.thread-pool.monitor.collect-interval=3000
spring.dynamic.thread-pool.notify-platforms[0].platform=LARK
spring.dynamic.thread-pool.notify-platforms[0].token=6de41bdc-0799-45be-b128-7cddb9e777f0
#spring.dynamic.thread-pool.notify-platforms[1].platform=WECHAT
#spring.dynamic.thread-pool.notify-platforms[1].token=ac0426a5-c712-474c-9bff-72b8b8f5caff
#spring.dynamic.thread-pool.notify-platforms[2].platform=DING
#spring.dynamic.thread-pool.notify-platforms[2].token=56417ebba6a27ca352f0de77a2ae9da66d01f39610b5ee8a6033c60ef9071c55
spring.dynamic.thread-pool.nacos.data-id=dynamic-threadpool-example-config
spring.dynamic.thread-pool.nacos.group=DEFAULT_GROUP
spring.dynamic.thread-pool.nacos.namespace=public
spring.dynamic.thread-pool.config-file-type=properties
spring.dynamic.thread-pool.executors[0].thread-name-prefix = DynamicThreadPoolConfig#FIELD1
spring.dynamic.thread-pool.executors[0].core-pool-size = 2
spring.dynamic.thread-pool.executors[0].thread-pool-id = cn.hippo4j.example.agent.core.config.ThreadPoolConfiguration#AGENT_RUN_MESSAGE_SEND_TASK_EXECUTOR
spring.dynamic.thread-pool.executors[0].maximum-pool-size = 20
spring.dynamic.thread-pool.executors[0].queue-capacity = 1024
spring.dynamic.thread-pool.executors[0].blocking-queue = ResizableCapacityLinkedBlockingQueue
spring.dynamic.thread-pool.executors[0].execute-time-out = 800
spring.dynamic.thread-pool.executors[0].rejected-handler = AbortPolicy
spring.dynamic.thread-pool.executors[0].keep-alive-time = 6691
spring.dynamic.thread-pool.executors[0].allow-core-thread-time-out = true
spring.dynamic.thread-pool.executors[0].alarm = true
spring.dynamic.thread-pool.executors[0].active-alarm = 80
spring.dynamic.thread-pool.executors[0].capacity-alarm = 80
spring.dynamic.thread-pool.executors[0].notify.interval = 8
spring.dynamic.thread-pool.executors[0].notify.receives = nobodyiam
spring.dynamic.thread-pool.executors[1].thread-pool-id = runMessageSendTaskExecutor
spring.dynamic.thread-pool.executors[1].thread-name-prefix = runMessageSendTaskExecutor
spring.dynamic.thread-pool.executors[1].core-pool-size = 3
spring.dynamic.thread-pool.executors[1].maximum-pool-size = 4
spring.dynamic.thread-pool.executors[1].queue-capacity = 1024
spring.dynamic.thread-pool.executors[1].blocking-queue = ResizableCapacityLinkedBlockingQueue
spring.dynamic.thread-pool.executors[1].execute-time-out = 800
spring.dynamic.thread-pool.executors[1].rejected-handler = AbortPolicy
spring.dynamic.thread-pool.executors[1].keep-alive-time = 6691
spring.dynamic.thread-pool.executors[1].allow-core-thread-time-out = true
spring.dynamic.thread-pool.executors[1].active-alarm = 80
spring.dynamic.thread-pool.executors[1].capacity-alarm = 80
spring.dynamic.thread-pool.executors[1].notify.interval = 8
spring.dynamic.thread-pool.executors[1].notify.receives = nobodyiam

@ -0,0 +1,70 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-agent-example</artifactId>
<version>${revision}</version>
</parent>
<artifactId>hippo4j-threadpool-agent-config-nacos-example</artifactId>
<properties>
<maven.deploy.skip>true</maven.deploy.skip>
</properties>
<dependencies>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-agent-example-core</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- <dependency>
<groupId>com.alibaba.boot</groupId>
<artifactId>nacos-config-spring-boot-starter</artifactId>
<version>0.2.12</version>
</dependency>-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-context</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -15,17 +15,18 @@
* limitations under the License.
*/
package cn.hippo4j.agent.plugin.apollo.interceptor;
package cn.hippo4j.example.agent.config.nacos;
import cn.hippo4j.agent.core.plugin.interceptor.enhance.EnhancedInstance;
import cn.hippo4j.agent.core.plugin.interceptor.enhance.InstanceConstructorInterceptor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Default config constructor interceptor
* Agent config Nacos example application.
*/
public class DefaultConfigConstructorInterceptor implements InstanceConstructorInterceptor {
@SpringBootApplication(scanBasePackages = "cn.hippo4j.example.agent.core")
public class AgentConfigNacosExampleApplication {
@Override
public void onConstruct(EnhancedInstance objInst, Object[] allArguments) throws Throwable {
public static void main(String[] args) {
SpringApplication.run(AgentConfigNacosExampleApplication.class, args);
}
}

@ -0,0 +1,70 @@
server.port=8092
server.servlet.context-path=/example
nacos.config.auto-refresh=true
nacos.config.bootstrap.enable=true
# The following parameters are used for testing
spring.cloud.nacos.config.server-addr=127.0.0.1:8848
spring.cloud.nacos.config.extension-configs[0].data-id=dynamic-threadpool-example-config
spring.cloud.nacos.config.extension-configs[0].group=DEFAULT_GROUP
spring.cloud.nacos.config.extension-configs[0].refresh=true
spring.profiles.active=dev
spring.application.name=hippo4j-config-nacos-spring-boot-starter-example
management.metrics.export.prometheus.enabled=true
management.server.port=29998
management.endpoints.web.exposure.include=*
spring.dynamic.thread-pool.enable=true
spring.dynamic.thread-pool.banner=true
spring.dynamic.thread-pool.check-state-interval=10
#spring.dynamic.thread-pool.monitor.enable=true
#spring.dynamic.thread-pool.monitor.collect-types=micrometer
#spring.dynamic.thread-pool.monitor.thread-pool-types=dynamic,web
spring.dynamic.thread-pool.monitor.agent-micrometer-port=29999
spring.dynamic.thread-pool.monitor.initial-delay=3000
spring.dynamic.thread-pool.monitor.collect-interval=3000
spring.dynamic.thread-pool.notify-platforms[0].platform=LARK
spring.dynamic.thread-pool.notify-platforms[0].token=6de41bdc-0799-45be-b128-7cddb9e777f0
#spring.dynamic.thread-pool.notify-platforms[1].platform=WECHAT
#spring.dynamic.thread-pool.notify-platforms[1].token=ac0426a5-c712-474c-9bff-72b8b8f5caff
#spring.dynamic.thread-pool.notify-platforms[2].platform=DING
#spring.dynamic.thread-pool.notify-platforms[2].token=56417ebba6a27ca352f0de77a2ae9da66d01f39610b5ee8a6033c60ef9071c55
spring.dynamic.thread-pool.nacos.data-id=dynamic-threadpool-example-config
spring.dynamic.thread-pool.nacos.group=DEFAULT_GROUP
spring.dynamic.thread-pool.nacos.namespace=public
spring.dynamic.thread-pool.config-file-type=properties
spring.dynamic.thread-pool.executors[0].thread-name-prefix = DynamicThreadPoolConfig#FIELD1
spring.dynamic.thread-pool.executors[0].core-pool-size = 2
spring.dynamic.thread-pool.executors[0].thread-pool-id = cn.hippo4j.example.agent.core.config.ThreadPoolConfiguration#AGENT_RUN_MESSAGE_SEND_TASK_EXECUTOR
spring.dynamic.thread-pool.executors[0].maximum-pool-size = 20
spring.dynamic.thread-pool.executors[0].queue-capacity = 1024
spring.dynamic.thread-pool.executors[0].blocking-queue = ResizableCapacityLinkedBlockingQueue
spring.dynamic.thread-pool.executors[0].execute-time-out = 800
spring.dynamic.thread-pool.executors[0].rejected-handler = AbortPolicy
spring.dynamic.thread-pool.executors[0].keep-alive-time = 6691
spring.dynamic.thread-pool.executors[0].allow-core-thread-time-out = true
spring.dynamic.thread-pool.executors[0].alarm = true
spring.dynamic.thread-pool.executors[0].active-alarm = 80
spring.dynamic.thread-pool.executors[0].capacity-alarm = 80
spring.dynamic.thread-pool.executors[0].notify.interval = 8
spring.dynamic.thread-pool.executors[0].notify.receives = nobodyiam
spring.dynamic.thread-pool.executors[1].thread-pool-id = runMessageSendTaskExecutor
spring.dynamic.thread-pool.executors[1].thread-name-prefix = runMessageSendTaskExecutor
spring.dynamic.thread-pool.executors[1].core-pool-size = 3
spring.dynamic.thread-pool.executors[1].maximum-pool-size = 4
spring.dynamic.thread-pool.executors[1].queue-capacity = 1024
spring.dynamic.thread-pool.executors[1].blocking-queue = ResizableCapacityLinkedBlockingQueue
spring.dynamic.thread-pool.executors[1].execute-time-out = 800
spring.dynamic.thread-pool.executors[1].rejected-handler = AbortPolicy
spring.dynamic.thread-pool.executors[1].keep-alive-time = 6691
spring.dynamic.thread-pool.executors[1].allow-core-thread-time-out = true
spring.dynamic.thread-pool.executors[1].active-alarm = 80
spring.dynamic.thread-pool.executors[1].capacity-alarm = 80
spring.dynamic.thread-pool.executors[1].notify.interval = 8
spring.dynamic.thread-pool.executors[1].notify.receives = nobodyiam

@ -18,5 +18,9 @@
<modules>
<module>config-apollo</module>
<module>config-nacos</module>
<module>agent-example-core</module>
<module>config-nacos-spring-boot-1x</module>
<module>config-apollo-spring-boot-1x</module>
</modules>
</project>

@ -15,7 +15,7 @@
* limitations under the License.
*/
package cn.hippo4j.example.config.nacos;
package cn.hippo4j.example.agent.config.nacos;
import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication;

@ -56,6 +56,8 @@ public class Constants {
public static final String GENERAL_SPLIT_SYMBOL = ",";
public static final String DOT_SPLIT_SYMBOL = ".";
public static final String IDENTIFY_SLICER_SYMBOL = "_";
public static final String LONG_POLLING_LINE_SEPARATOR = "\r\n";
@ -128,5 +130,12 @@ public class Constants {
public static final String CONFIGURATION_PROPERTIES_PREFIX = "spring.dynamic.thread-pool";
public static final String EXECUTORS = "executors";
public static final long NO_REJECT_COUNT_NUM = -1L;
public static final String DYNAMIC_THREAD_POOL_EXECUTOR = "cn.hippo4j.core.executor.DynamicThreadPoolExecutor";
public static final int DEFAULT_INTERVAL = 5;
}

@ -96,6 +96,15 @@ public class AbstractSubjectCenter {
observers.remove(observer);
}
/**
* get observer by subject.
*
* @param subject
*/
public static List<Observer> get(SubjectType subjectType) {
return OBSERVERS_MAP.get(subjectType.name());
}
/**
* Notify.
*
@ -145,6 +154,11 @@ public class AbstractSubjectCenter {
/**
* Thread-pool dynamic refresh.
*/
THREAD_POOL_DYNAMIC_REFRESH
THREAD_POOL_DYNAMIC_REFRESH,
/**
* Agent Spring Properties Loader Completed.
*/
AGENT_SPRING_PROPERTIES_LOADER_COMPLETED
}
}

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.common.monitor;
/**
* MonitorCollect type enum.
*/
public enum MonitorCollectTypeEnum {
/**
* Micrometer
*/
MICROMETER("micrometer"),
/**
* ELASTICSEARCH
*/
ELASTICSEARCH("elasticsearch"),
/**
* LOG
*/
LOG("log");
private final String value;
MonitorCollectTypeEnum(String value) {
this.value = value;
}
public String getValue() {
return value;
}
}

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.common.monitor;
/**
* MonitorHandler type enum.
*/
public enum MonitorHandlerTypeEnum {
/**
* DYNAMIC
*/
DYNAMIC,
/**
* WEB
*/
WEB,
/**
* ADAPTER
*/
ADAPTER
}

@ -31,10 +31,13 @@ import lombok.NoArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import java.io.BufferedWriter;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;
@ -198,6 +201,18 @@ public class HttpUtil {
return executeJson(url, HttpMethod.POST, json, null);
}
/**
* Send a post network request.
*
* @param url target url
* @param json json data
* @param headers headers
* @return
*/
public static String postJson(String url, String json, Map<String, String> headers) {
return executeJson(url, HttpMethod.POST, json, headers);
}
/**
* Send a put network request.
*
@ -310,8 +325,10 @@ public class HttpUtil {
byte[] b = bodyString.getBytes();
connection.setRequestProperty(CONTENT_LENGTH, String.valueOf(b.length));
OutputStream outputStream = connection.getOutputStream();
outputStream.write(b, 0, b.length);
outputStream.flush();
OutputStreamWriter osw = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8);
BufferedWriter writer = new BufferedWriter(osw);
writer.write(bodyString);
writer.flush();
IoUtil.closeQuietly(outputStream);
}
connection.connect();

@ -235,4 +235,14 @@ public class DefaultThreadPoolCheckAlarmHandler implements Runnable, ThreadPoolC
.rejectCountNum(rejectCount)
.build();
}
/**
* Terminates the scheduled tasks and asynchronous alarm notifications by
* forcefully shutting down the respective thread pools.
*/
public void destroyScheduleExecute() {
alarmNotifyExecutor.shutdownNow();
asyncAlarmNotifyExecutor.shutdownNow();
}
}

@ -34,6 +34,7 @@ import java.util.List;
* Json config parser.
*/
public class JsonConfigParser extends AbstractConfigParser {
private static final ObjectMapper MAPPER;
private static final String DOT = ".";
private static final String LEFT_BRACE = "{";
@ -91,7 +92,7 @@ public class JsonConfigParser extends AbstractConfigParser {
return new HashMap<>(1);
}
return doParse(content,"");
return doParse(content, "");
}
@Override

@ -51,4 +51,9 @@ public class MonitorProperties {
* Collect interval. unit: ms
*/
private Long collectInterval = 5000L;
/**
* Agent micrometer exposed port
*/
private Integer agentMicrometerPort;
}

@ -43,6 +43,8 @@ import java.security.NoSuchAlgorithmException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
@ -154,7 +156,9 @@ public class LarkSendMessageHandler implements SendMessageHandler {
private void execute(String secretKey, String text) {
String serverUrl = LarkAlarmConstants.LARK_BOT_URL + secretKey;
try {
String responseBody = HttpUtil.postJson(serverUrl, text);
Map<String, String> headers = new HashMap<>();
headers.put("Content-Type", "application/json; charset=UTF-8");
String responseBody = HttpUtil.postJson(serverUrl, text, headers);
LarkRobotResponse response = JSONUtil.parseObject(responseBody, LarkRobotResponse.class);
Assert.isTrue(response != null, "Response is null.");
if (response.getCode() != 0) {

@ -17,12 +17,11 @@
package cn.hippo4j.threadpool.message.core.service;
import cn.hippo4j.threadpool.message.api.NotifyConfigDTO;
import cn.hippo4j.threadpool.message.core.request.AlarmNotifyRequest;
import cn.hippo4j.threadpool.message.core.request.ChangeParameterNotifyRequest;
import cn.hippo4j.threadpool.message.core.request.WebChangeParameterNotifyRequest;
import cn.hippo4j.threadpool.message.api.NotifyConfigDTO;
import lombok.SneakyThrows;
import org.springframework.core.io.ClassPathResource;
import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream;
@ -70,9 +69,9 @@ public interface SendMessageHandler {
default String readUtf8String(String path) {
int endFlagCode = -1;
String resultReadStr;
ClassPathResource classPathResource = new ClassPathResource(path);
ClassLoader classLoader = this.getClass().getClassLoader();
try (
InputStream inputStream = classPathResource.getInputStream();
InputStream inputStream = classLoader.getResourceAsStream(path);
BufferedInputStream bis = new BufferedInputStream(inputStream);
ByteArrayOutputStream buf = new ByteArrayOutputStream()) {
int result = bis.read();

@ -68,7 +68,7 @@ public class ZipkinExecutorAdapterTest {
@Test
public void testReplace() {
Object executor = new CustomWrappingExecutorService(Executors.newCachedThreadPool());
CustomWrappingExecutorService executorChange = (CustomWrappingExecutorService)executor;
CustomWrappingExecutorService executorChange = (CustomWrappingExecutorService) executor;
ExecutorService beforeReplace = executorChange.delegate();
zipkinExecutorAdapter.replace(executor, dynamicThreadPool);
ExecutorService afterReplace = executorChange.delegate();

Loading…
Cancel
Save