diff --git a/agent/hippo4j-agent-core/pom.xml b/agent/hippo4j-agent-core/pom.xml index e020e562..97412d51 100644 --- a/agent/hippo4j-agent-core/pom.xml +++ b/agent/hippo4j-agent-core/pom.xml @@ -77,7 +77,11 @@ cn.hippo4j hippo4j-threadpool-infra-common ${project.version} - provided + + + cn.hippo4j + hippo4j-threadpool-dynamic-mode-config + ${project.version} diff --git a/agent/hippo4j-agent-plugin/spring-plugins/pom.xml b/agent/hippo4j-agent-plugin/spring-plugins/pom.xml index d6855dbe..42b725b5 100644 --- a/agent/hippo4j-agent-plugin/spring-plugins/pom.xml +++ b/agent/hippo4j-agent-plugin/spring-plugins/pom.xml @@ -22,14 +22,4 @@ UTF-8 /.. - - - - - cn.hippo4j - hippo4j-agent-spring-plugin-common - ${project.version} - - - \ No newline at end of file diff --git a/agent/hippo4j-agent-plugin/spring-plugins/spring-boot-1x-plugin/pom.xml b/agent/hippo4j-agent-plugin/spring-plugins/spring-boot-1x-plugin/pom.xml index 1605aaf0..ccec3c7b 100644 --- a/agent/hippo4j-agent-plugin/spring-plugins/spring-boot-1x-plugin/pom.xml +++ b/agent/hippo4j-agent-plugin/spring-plugins/spring-boot-1x-plugin/pom.xml @@ -33,6 +33,7 @@ cn.hippo4j hippo4j-agent-spring-plugin-common provided + ${project.version} diff --git a/agent/hippo4j-agent-plugin/spring-plugins/spring-boot-2x-plugin/pom.xml b/agent/hippo4j-agent-plugin/spring-plugins/spring-boot-2x-plugin/pom.xml index 5eb55742..dc3d87cf 100644 --- a/agent/hippo4j-agent-plugin/spring-plugins/spring-boot-2x-plugin/pom.xml +++ b/agent/hippo4j-agent-plugin/spring-plugins/spring-boot-2x-plugin/pom.xml @@ -21,6 +21,7 @@ cn.hippo4j hippo4j-agent-spring-plugin-common provided + ${project.version} @@ -36,5 +37,11 @@ ${project.version} provided + + + com.ctrip.framework.apollo + apollo-client + provided + \ No newline at end of file diff --git a/agent/hippo4j-agent-plugin/spring-plugins/spring-boot-2x-plugin/src/main/java/cn/hippo4j/agent/plugin/spring/boot/v2/DynamicThreadPoolChangeHandlerSpring2x.java b/agent/hippo4j-agent-plugin/spring-plugins/spring-boot-2x-plugin/src/main/java/cn/hippo4j/agent/plugin/spring/boot/v2/DynamicThreadPoolChangeHandlerSpring2x.java index f490618c..bcda15e4 100644 --- a/agent/hippo4j-agent-plugin/spring-plugins/spring-boot-2x-plugin/src/main/java/cn/hippo4j/agent/plugin/spring/boot/v2/DynamicThreadPoolChangeHandlerSpring2x.java +++ b/agent/hippo4j-agent-plugin/spring-plugins/spring-boot-2x-plugin/src/main/java/cn/hippo4j/agent/plugin/spring/boot/v2/DynamicThreadPoolChangeHandlerSpring2x.java @@ -17,20 +17,57 @@ package cn.hippo4j.agent.plugin.spring.boot.v2; +import cn.hippo4j.agent.core.logging.api.ILog; +import cn.hippo4j.agent.core.logging.api.LogManager; +import cn.hippo4j.agent.plugin.spring.common.conf.SpringBootConfig; import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties; import cn.hippo4j.threadpool.dynamic.mode.config.refresher.AbstractConfigThreadPoolDynamicRefresh; +import com.ctrip.framework.apollo.Config; +import com.ctrip.framework.apollo.ConfigChangeListener; +import com.ctrip.framework.apollo.ConfigFile; +import com.ctrip.framework.apollo.ConfigService; +import com.ctrip.framework.apollo.core.enums.ConfigFileFormat; +import com.ctrip.framework.apollo.model.ConfigChange; import org.springframework.boot.context.properties.bind.Bindable; import org.springframework.boot.context.properties.bind.Binder; import org.springframework.boot.context.properties.source.ConfigurationPropertySource; import org.springframework.boot.context.properties.source.MapConfigurationPropertySource; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import static cn.hippo4j.agent.core.conf.Constants.SPRING_BOOT_CONFIG_PREFIX; + /** * Dynamic thread pool change handler spring 2x */ public class DynamicThreadPoolChangeHandlerSpring2x extends AbstractConfigThreadPoolDynamicRefresh { + private static ILog LOGGER = LogManager.getLogger(DynamicThreadPoolChangeHandlerSpring2x.class); + + @Override + public void registerListener() { + List apolloNamespaces = SpringBootConfig.Spring.Dynamic.Thread_Pool.Apollo.NAMESPACE; + String namespace = apolloNamespaces.get(0); + String configFileType = SpringBootConfig.Spring.Dynamic.Thread_Pool.CONFIG_FILE_TYPE; + Config config = ConfigService.getConfig(String.format("%s.%s", namespace, configFileType)); + ConfigChangeListener configChangeListener = configChangeEvent -> { + String replacedNamespace = namespace.replaceAll("." + configFileType, ""); + ConfigFileFormat configFileFormat = ConfigFileFormat.fromString(configFileType); + ConfigFile configFile = ConfigService.getConfigFile(replacedNamespace, configFileFormat); + Map newChangeValueMap = new HashMap<>(); + configChangeEvent.changedKeys().stream().filter(each -> each.contains(SPRING_BOOT_CONFIG_PREFIX)).forEach(each -> { + ConfigChange change = configChangeEvent.getChange(each); + String newValue = change.getNewValue(); + newChangeValueMap.put(each, newValue); + }); + dynamicRefresh(configFileType, configFile.getContent(), newChangeValueMap); + }; + config.addChangeListener(configChangeListener); + LOGGER.info("[Hippo4j-Agent] Dynamic thread pool refresher, add apollo listener success. namespace: {}", namespace); + } + @Override public BootstrapConfigProperties buildBootstrapProperties(Map configInfo) { BootstrapConfigProperties bindableBootstrapConfigProperties = new BootstrapConfigProperties(); diff --git a/agent/hippo4j-agent-plugin/spring-plugins/spring-boot-2x-plugin/src/main/java/cn/hippo4j/agent/plugin/spring/boot/v2/boot/SpringBootV2PluginBootService.java b/agent/hippo4j-agent-plugin/spring-plugins/spring-boot-2x-plugin/src/main/java/cn/hippo4j/agent/plugin/spring/boot/v2/boot/SpringBootV2PluginBootService.java new file mode 100644 index 00000000..c9b8c85b --- /dev/null +++ b/agent/hippo4j-agent-plugin/spring-plugins/spring-boot-2x-plugin/src/main/java/cn/hippo4j/agent/plugin/spring/boot/v2/boot/SpringBootV2PluginBootService.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.agent.plugin.spring.boot.v2.boot; + +import cn.hippo4j.agent.core.boot.BootService; +import cn.hippo4j.agent.core.boot.DefaultImplementor; +import cn.hippo4j.agent.core.logging.api.ILog; +import cn.hippo4j.agent.core.logging.api.LogManager; + +/** + * SpringBoot v1 plugin boot service + */ +@DefaultImplementor +public class SpringBootV2PluginBootService implements BootService { + + private static final ILog LOGGER = LogManager.getLogger(SpringBootV2PluginBootService.class); + + @Override + public void prepare() throws Throwable { + + } + + @Override + public void boot() throws Throwable { + LOGGER.info("Loader SpringBootV2PluginBootService..."); + } + + @Override + public void onComplete() throws Throwable { + + } + + @Override + public void shutdown() throws Throwable { + + } +} diff --git a/agent/hippo4j-agent-plugin/spring-plugins/spring-boot-2x-plugin/src/main/java/cn/hippo4j/agent/plugin/spring/boot/v2/interceptor/EventPublishingStartedInterceptor.java b/agent/hippo4j-agent-plugin/spring-plugins/spring-boot-2x-plugin/src/main/java/cn/hippo4j/agent/plugin/spring/boot/v2/interceptor/EventPublishingStartedInterceptor.java index 0aa9cc1e..818efb62 100644 --- a/agent/hippo4j-agent-plugin/spring-plugins/spring-boot-2x-plugin/src/main/java/cn/hippo4j/agent/plugin/spring/boot/v2/interceptor/EventPublishingStartedInterceptor.java +++ b/agent/hippo4j-agent-plugin/spring-plugins/spring-boot-2x-plugin/src/main/java/cn/hippo4j/agent/plugin/spring/boot/v2/interceptor/EventPublishingStartedInterceptor.java @@ -25,7 +25,9 @@ import cn.hippo4j.agent.core.plugin.interceptor.enhance.MethodInterceptResult; import cn.hippo4j.agent.plugin.spring.boot.v2.DynamicThreadPoolChangeHandlerSpring2x; import cn.hippo4j.agent.plugin.spring.common.support.SpringPropertiesLoader; import cn.hippo4j.agent.plugin.spring.common.support.SpringThreadPoolRegisterSupport; +import cn.hippo4j.common.extension.design.AbstractSubjectCenter; import cn.hippo4j.threadpool.dynamic.api.ThreadPoolDynamicRefresh; +import cn.hippo4j.threadpool.dynamic.mode.config.refresher.event.DynamicThreadPoolRefreshListener; import org.springframework.context.ConfigurableApplicationContext; import java.lang.reflect.Method; @@ -53,6 +55,8 @@ public class EventPublishingStartedInterceptor implements InstanceMethodsAroundI SpringPropertiesLoader.loadSpringProperties(context.getEnvironment()); ThreadPoolDynamicRefresh dynamicRefresh = new DynamicThreadPoolChangeHandlerSpring2x(); dynamicRefresh.registerListener(); + AbstractSubjectCenter.register(AbstractSubjectCenter.SubjectType.THREAD_POOL_DYNAMIC_REFRESH, + new DynamicThreadPoolRefreshListener()); return ret; } diff --git a/agent/hippo4j-agent-plugin/spring-plugins/spring-boot-2x-plugin/src/main/resources/META-INF/services/cn.hippo4j.agent.core.boot.BootService b/agent/hippo4j-agent-plugin/spring-plugins/spring-boot-2x-plugin/src/main/resources/META-INF/services/cn.hippo4j.agent.core.boot.BootService new file mode 100644 index 00000000..687ec9f8 --- /dev/null +++ b/agent/hippo4j-agent-plugin/spring-plugins/spring-boot-2x-plugin/src/main/resources/META-INF/services/cn.hippo4j.agent.core.boot.BootService @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cn.hippo4j.agent.plugin.spring.boot.v2.boot.SpringBootV2PluginBootService \ No newline at end of file diff --git a/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/pom.xml b/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/pom.xml index 7ac42387..f0336260 100644 --- a/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/pom.xml +++ b/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/pom.xml @@ -31,11 +31,6 @@ org.slf4j slf4j-api - - com.ctrip.framework.apollo - apollo-client - provided - cn.hippo4j hippo4j-threadpool-dynamic-api diff --git a/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/support/AbstractDynamicThreadPoolChangeHandlerSpring.java b/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/support/AbstractDynamicThreadPoolChangeHandlerSpring.java deleted file mode 100644 index 9cf071a0..00000000 --- a/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/support/AbstractDynamicThreadPoolChangeHandlerSpring.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.hippo4j.agent.plugin.spring.common.support; - -import cn.hippo4j.agent.plugin.spring.common.conf.SpringBootConfig; -import cn.hippo4j.threadpool.dynamic.api.ThreadPoolDynamicRefresh; -import com.ctrip.framework.apollo.ConfigChangeListener; -import com.ctrip.framework.apollo.ConfigFile; -import com.ctrip.framework.apollo.ConfigService; -import com.ctrip.framework.apollo.core.enums.ConfigFileFormat; -import com.ctrip.framework.apollo.model.ConfigChange; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static cn.hippo4j.agent.core.conf.Constants.SPRING_BOOT_CONFIG_PREFIX; - -/** - * Abstract dynamic thread poo change handler spring - */ -public abstract class AbstractDynamicThreadPoolChangeHandlerSpring implements ThreadPoolDynamicRefresh { - - private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDynamicThreadPoolChangeHandlerSpring.class); - - public void registerListener() { - List apolloNamespaces = SpringBootConfig.Spring.Dynamic.Thread_Pool.Apollo.NAMESPACE; - String namespace = apolloNamespaces.get(0); - String configFileType = SpringBootConfig.Spring.Dynamic.Thread_Pool.CONFIG_FILE_TYPE; - - com.ctrip.framework.apollo.Config config = ConfigService.getConfig(String.format("%s.%s", namespace, configFileType)); - ConfigChangeListener configChangeListener = configChangeEvent -> { - String replacedNamespace = namespace.replaceAll("." + configFileType, ""); - ConfigFileFormat configFileFormat = ConfigFileFormat.fromString(configFileType); - ConfigFile configFile = ConfigService.getConfigFile(replacedNamespace, configFileFormat); - Map newChangeValueMap = new HashMap<>(); - configChangeEvent.changedKeys().stream().filter(each -> each.contains(SPRING_BOOT_CONFIG_PREFIX)).forEach(each -> { - ConfigChange change = configChangeEvent.getChange(each); - String newValue = change.getNewValue(); - newChangeValueMap.put(each, newValue); - }); - dynamicRefresh(configFile.getContent(), newChangeValueMap); - }; - config.addChangeListener(configChangeListener); - LOGGER.info("[Hippo4j-Agent] Dynamic thread pool refresher, add apollo listener success. namespace: {}", namespace); - } - - public void dynamicRefresh(String configContent, Map newValueChangeMap) { - try { - // String configFileType = SpringBootConfig.Spring.Dynamic.Thread_Pool.CONFIG_FILE_TYPE; - // - // Map afterConfigMap = ConfigParserHandler.getInstance().parseConfig(configContent, - // ConfigFileTypeEnum.of(configFileType)); - // if (CollectionUtil.isNotEmpty(newValueChangeMap)) { - // Optional.ofNullable(afterConfigMap).ifPresent(each -> each.putAll(newValueChangeMap)); - // } - // TODO - // BootstrapConfigProperties afterConfigProperties = bindProperties(afterConfigMap, context); - // - // List executors = afterConfigProperties.getExecutors(); - // for (ExecutorProperties afterProperties : executors) { - // String threadPoolId = - // afterProperties.getThreadPoolId(); - // AgentThreadPoolExecutorHolder holder = AgentThreadPoolInstanceRegistry.getInstance().getHolder(threadPoolId); - // if (holder.isEmpty() || - // holder.getExecutor() == null) { - // continue; - // } - // ExecutorProperties beforeProperties = convert(holder.getProperties()); - // if (!checkConsistency(threadPoolId, beforeProperties, afterProperties)) { - // continue; - // } - // dynamicRefreshPool(beforeProperties, afterProperties); - // holder.setProperties(failDefaultExecutorProperties(beforeProperties, afterProperties)); // do refresh. - // ChangeParameterNotifyRequest changeRequest = buildChangeRequest(beforeProperties, afterProperties); - // LOGGER.info(CHANGE_THREAD_POOL_TEXT, threadPoolId, String.format(CHANGE_DELIMITER, - // beforeProperties.getCorePoolSize(), changeRequest.getNowCorePoolSize()), String.format(CHANGE_DELIMITER, beforeProperties.getMaximumPoolSize(), changeRequest.getNowMaximumPoolSize()), - // String.format(CHANGE_DELIMITER, beforeProperties.getQueueCapacity(), changeRequest.getNowQueueCapacity()), String.format(CHANGE_DELIMITER, beforeProperties.getKeepAliveTime(), - // changeRequest.getNowKeepAliveTime()), String.format(CHANGE_DELIMITER, beforeProperties.getExecuteTimeOut(), changeRequest.getNowExecuteTimeOut()), String.format(CHANGE_DELIMITER, - // beforeProperties.getRejectedHandler(), changeRequest.getNowRejectedName()), String.format(CHANGE_DELIMITER, beforeProperties.getAllowCoreThreadTimeOut(), - // changeRequest.getNowAllowsCoreThreadTimeOut())); - // } - } catch (Exception ex) { - LOGGER.error("[Hippo4j-Agent] config mode dynamic refresh failed.", ex); - } - } -} diff --git a/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/support/SpringThreadPoolRegisterSupport.java b/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/support/SpringThreadPoolRegisterSupport.java index 0fdb5704..cad4d1c0 100644 --- a/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/support/SpringThreadPoolRegisterSupport.java +++ b/agent/hippo4j-agent-plugin/spring-plugins/spring-plugin-common/src/main/java/cn/hippo4j/agent/plugin/spring/common/support/SpringThreadPoolRegisterSupport.java @@ -64,8 +64,8 @@ public class SpringThreadPoolRegisterSupport { for (Map.Entry entry : beansWithAnnotation.entrySet()) { String beanName = entry.getKey(); Executor bean = entry.getValue(); - ThreadPoolExecutor executor = null; - // + ThreadPoolExecutor executor = (ThreadPoolExecutor) bean; + // TODO // if (DynamicThreadPoolAdapterChoose.match(bean)) { // executor = DynamicThreadPoolAdapterChoose.unwrap(bean); // } else { diff --git a/examples/threadpool-example/agent/config-apollo/pom.xml b/examples/threadpool-example/agent/config-apollo/pom.xml new file mode 100644 index 00000000..7a7adfd4 --- /dev/null +++ b/examples/threadpool-example/agent/config-apollo/pom.xml @@ -0,0 +1,80 @@ + + + 4.0.0 + + cn.hippo4j + hippo4j-threadpool-agent-example + ${revision} + + + hippo4j-threadpool-agent-config-apollo-example + + + true + + + + + org.springframework.boot + spring-boot-starter + + + + org.springframework.boot + spring-boot-starter-web + + + + org.springframework.boot + spring-boot-starter-test + test + + + + com.ctrip.framework.apollo + apollo-client + + + + org.springframework.cloud + spring-cloud-context + 2.2.5.RELEASE + + + + org.slf4j + slf4j-api + 1.7.21 + + + + io.micrometer + micrometer-registry-prometheus + + + + org.springframework.boot + spring-boot-starter-actuator + + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + + + + repackage + + + + + + + \ No newline at end of file diff --git a/examples/threadpool-example/agent/config-apollo/src/main/java/cn/hippo4j/example/agent/config/apollo/AgentConfigApolloExampleApplication.java b/examples/threadpool-example/agent/config-apollo/src/main/java/cn/hippo4j/example/agent/config/apollo/AgentConfigApolloExampleApplication.java new file mode 100644 index 00000000..33283990 --- /dev/null +++ b/examples/threadpool-example/agent/config-apollo/src/main/java/cn/hippo4j/example/agent/config/apollo/AgentConfigApolloExampleApplication.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.example.agent.config.apollo; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * Agent config apollo example application. + */ +@SpringBootApplication +public class AgentConfigApolloExampleApplication { + + public static void main(String[] args) { + SpringApplication.run(AgentConfigApolloExampleApplication.class, args); + } +} diff --git a/examples/threadpool-example/agent/config-apollo/src/main/java/cn/hippo4j/example/agent/config/apollo/ThreadPoolConfiguration.java b/examples/threadpool-example/agent/config-apollo/src/main/java/cn/hippo4j/example/agent/config/apollo/ThreadPoolConfiguration.java new file mode 100644 index 00000000..cec7c3bb --- /dev/null +++ b/examples/threadpool-example/agent/config-apollo/src/main/java/cn/hippo4j/example/agent/config/apollo/ThreadPoolConfiguration.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.example.agent.config.apollo; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +@Configuration +public class ThreadPoolConfiguration { + + // ------------------------------------------------------------------------- + // 未使用 Hippo4j,原始定义线程池创建方式 + // ------------------------------------------------------------------------- + + @Bean + public ThreadPoolExecutor runMessageSendTaskExecutor() { + LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue<>(1024); + return new ThreadPoolExecutor( + 1, + 10, + 1024, + TimeUnit.SECONDS, + linkedBlockingQueue); + } + + // ------------------------------------------------------------------------- + // 演示 Agent 模式修改线程池 + // ------------------------------------------------------------------------- + + public static final ThreadPoolExecutor RUN_MESSAGE_SEND_TASK_EXECUTOR = new ThreadPoolExecutor( + 1, + 10, + 1024, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(1024)); +} diff --git a/examples/threadpool-example/agent/config-apollo/src/main/resources/bootstrap.properties b/examples/threadpool-example/agent/config-apollo/src/main/resources/bootstrap.properties new file mode 100644 index 00000000..88187c9c --- /dev/null +++ b/examples/threadpool-example/agent/config-apollo/src/main/resources/bootstrap.properties @@ -0,0 +1,33 @@ +server.port=8092 +server.servlet.context-path=/example +app.id=dynamic-threadpool-example +apollo.meta=http://127.0.0.1:8080 +apollo.autoUpdateInjectedSpringProperties=true +apollo.bootstrap.enabled=true +apollo.bootstrap.namespaces=application +apollo.bootstrap.eagerLoad.enabled=true +# The following parameters are used for testing +env=dev +apollo.configService=http://127.0.0.1:8080 +spring.profiles.active=dev +spring.application.name=hippo4j-config-apollo-spring-boot-starter-example +management.metrics.export.prometheus.enabled=true +management.server.port=29998 +management.endpoints.web.exposure.include=* +spring.dynamic.thread-pool.enable=true +spring.dynamic.thread-pool.banner=true +spring.dynamic.thread-pool.check-state-interval=3 +#spring.dynamic.thread-pool.monitor.enable=true +#spring.dynamic.thread-pool.monitor.collect-types=micrometer +#spring.dynamic.thread-pool.monitor.thread-pool-types=dynamic,web +#spring.dynamic.thread-pool.monitor.initial-delay=10000 +#spring.dynamic.thread-pool.monitor.collect-interval=5000 +#spring.dynamic.thread-pool.notify-platforms[0].platform=WECHAT +#spring.dynamic.thread-pool.notify-platforms[0].token=ac0426a5-c712-474c-9bff-72b8b8f5caff +#spring.dynamic.thread-pool.notify-platforms[1].platform=DING +#spring.dynamic.thread-pool.notify-platforms[1].token=56417ebba6a27ca352f0de77a2ae9da66d01f39610b5ee8a6033c60ef9071c55 +#spring.dynamic.thread-pool.notify-platforms[2].platform=LARK +#spring.dynamic.thread-pool.notify-platforms[2].token=2cbf2808-3839-4c26-a04d-fd201dd51f9e +spring.dynamic.thread-pool.apollo.namespace=application +spring.dynamic.thread-pool.config-file-type=properties + diff --git a/examples/threadpool-example/agent/pom.xml b/examples/threadpool-example/agent/pom.xml new file mode 100644 index 00000000..11fbc894 --- /dev/null +++ b/examples/threadpool-example/agent/pom.xml @@ -0,0 +1,22 @@ + + + 4.0.0 + + cn.hippo4j + hippo4j-threadpool-example + ${revision} + + + hippo4j-threadpool-agent-example + pom + + + true + + + + config-apollo + + \ No newline at end of file diff --git a/examples/threadpool-example/pom.xml b/examples/threadpool-example/pom.xml index 8f1f1d05..c25dd9b8 100644 --- a/examples/threadpool-example/pom.xml +++ b/examples/threadpool-example/pom.xml @@ -18,5 +18,6 @@ config example-core server + agent diff --git a/kernel/dynamic/mode/config/src/main/java/cn/hippo4j/threadpool/dynamic/mode/config/refresher/AbstractConfigThreadPoolDynamicRefresh.java b/kernel/dynamic/mode/config/src/main/java/cn/hippo4j/threadpool/dynamic/mode/config/refresher/AbstractConfigThreadPoolDynamicRefresh.java index aeef33d5..f27fc0a5 100644 --- a/kernel/dynamic/mode/config/src/main/java/cn/hippo4j/threadpool/dynamic/mode/config/refresher/AbstractConfigThreadPoolDynamicRefresh.java +++ b/kernel/dynamic/mode/config/src/main/java/cn/hippo4j/threadpool/dynamic/mode/config/refresher/AbstractConfigThreadPoolDynamicRefresh.java @@ -46,8 +46,7 @@ public abstract class AbstractConfigThreadPoolDynamicRefresh implements ThreadPo Optional.ofNullable(configInfo).ifPresent(each -> each.putAll(newValueChangeMap)); } BootstrapPropertiesInterface bootstrapProperties = buildBootstrapProperties(configInfo); - // publishDynamicThreadPoolEvent(binderCoreProperties); - AbstractSubjectCenter.notify("", null); + AbstractSubjectCenter.notify(AbstractSubjectCenter.SubjectType.THREAD_POOL_DYNAMIC_REFRESH, () -> bootstrapProperties); } catch (Exception ex) { log.error("Hippo4j config mode dynamic refresh failed.", ex); } diff --git a/kernel/dynamic/mode/config/src/main/java/cn/hippo4j/threadpool/dynamic/mode/config/refresher/event/DynamicThreadPoolRefreshListener.java b/kernel/dynamic/mode/config/src/main/java/cn/hippo4j/threadpool/dynamic/mode/config/refresher/event/DynamicThreadPoolRefreshListener.java index 41545e58..36584c56 100644 --- a/kernel/dynamic/mode/config/src/main/java/cn/hippo4j/threadpool/dynamic/mode/config/refresher/event/DynamicThreadPoolRefreshListener.java +++ b/kernel/dynamic/mode/config/src/main/java/cn/hippo4j/threadpool/dynamic/mode/config/refresher/event/DynamicThreadPoolRefreshListener.java @@ -17,6 +17,8 @@ package cn.hippo4j.threadpool.dynamic.mode.config.refresher.event; +import cn.hippo4j.common.executor.ThreadPoolExecutorHolder; +import cn.hippo4j.common.executor.ThreadPoolInstanceRegistry; import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum; import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum; import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue; @@ -24,11 +26,10 @@ import cn.hippo4j.common.extension.design.Observer; import cn.hippo4j.common.extension.design.ObserverMessage; import cn.hippo4j.common.model.executor.ExecutorProperties; import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil; -import cn.hippo4j.threadpool.dynamic.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties; -import cn.hippo4j.threadpool.dynamic.core.executor.manage.GlobalConfigThreadPoolManage; import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.List; import java.util.Objects; @@ -42,38 +43,58 @@ import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD /** * Dynamic thread-pool refresh listener. */ -@Slf4j @RequiredArgsConstructor public class DynamicThreadPoolRefreshListener implements Observer { + private static final Logger LOGGER = LoggerFactory.getLogger(DynamicThreadPoolRefreshListener.class); + @Override public void accept(ObserverMessage observerMessage) { BootstrapConfigProperties bindableConfigProperties = observerMessage.message(); List executors = bindableConfigProperties.getExecutors(); for (ExecutorProperties properties : executors) { String threadPoolId = properties.getThreadPoolId(); - dynamicRefreshPool(threadPoolId, properties); - ExecutorProperties beforeProperties = GlobalConfigThreadPoolManage.getProperties(properties.getThreadPoolId()); - log.info(CHANGE_THREAD_POOL_TEXT, - threadPoolId, - String.format(CHANGE_DELIMITER, beforeProperties.getCorePoolSize(), properties.getCorePoolSize()), - String.format(CHANGE_DELIMITER, beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize()), - String.format(CHANGE_DELIMITER, beforeProperties.getQueueCapacity(), properties.getQueueCapacity()), - String.format(CHANGE_DELIMITER, beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime()), - String.format(CHANGE_DELIMITER, beforeProperties.getRejectedHandler(), properties.getRejectedHandler()), - String.format(CHANGE_DELIMITER, beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut())); + // Check whether the thread pool configuration is empty and whether the parameters have changed + ThreadPoolExecutorHolder executorHolder = ThreadPoolInstanceRegistry.getInstance().getHolder(threadPoolId); + if (executorHolder.isEmpty() || !checkPropertiesConsistency(executorHolder, properties)) { + continue; + } + dynamicRefreshThreadPool(executorHolder, properties); + sendChangeNotificationMessage(executorHolder, properties); + executorHolder.setExecutorProperties(properties); } } /** - * Dynamic refresh pool. + * Check consistency. * - * @param threadPoolId - * @param properties + * @param executorHolder executor holder + * @param properties properties after dynamic thread pool change */ - private void dynamicRefreshPool(String threadPoolId, ExecutorProperties properties) { - ExecutorProperties beforeProperties = GlobalConfigThreadPoolManage.getProperties(properties.getThreadPoolId()); - ThreadPoolExecutor executor = GlobalThreadPoolManage.getExecutorService(threadPoolId); + private boolean checkPropertiesConsistency(ThreadPoolExecutorHolder executorHolder, ExecutorProperties properties) { + ExecutorProperties beforeProperties = executorHolder.getExecutorProperties(); + ThreadPoolExecutor executor = executorHolder.getExecutor(); + boolean result = (properties.getCorePoolSize() != null && !Objects.equals(beforeProperties.getCorePoolSize(), properties.getCorePoolSize())) + || (properties.getMaximumPoolSize() != null && !Objects.equals(beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize())) + || (properties.getAllowCoreThreadTimeOut() != null && !Objects.equals(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut())) + || (properties.getExecuteTimeOut() != null && !Objects.equals(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut())) + || (properties.getKeepAliveTime() != null && !Objects.equals(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime())) + || (properties.getRejectedHandler() != null && !Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler())) + || + ((properties.getQueueCapacity() != null && !Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity()) + && Objects.equals(BlockingQueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.getName(), executor.getQueue().getClass().getSimpleName()))); + return result; + } + + /** + * Dynamic refresh thread-pool. + * + * @param executorHolder executor holder + * @param properties properties after dynamic thread pool change + */ + private void dynamicRefreshThreadPool(ThreadPoolExecutorHolder executorHolder, ExecutorProperties properties) { + ExecutorProperties beforeProperties = executorHolder.getExecutorProperties(); + ThreadPoolExecutor executor = executorHolder.getExecutor(); if (properties.getMaximumPoolSize() != null && properties.getCorePoolSize() != null) { ThreadPoolExecutorUtil.safeSetPoolSize(executor, properties.getCorePoolSize(), properties.getMaximumPoolSize()); } else { @@ -106,8 +127,21 @@ public class DynamicThreadPoolRefreshListener implements Observer queue = (ResizableCapacityLinkedBlockingQueue) executor.getQueue(); queue.setCapacity(properties.getQueueCapacity()); } else { - log.warn("The queue length cannot be modified. Queue type mismatch. Current queue type: {}", executor.getQueue().getClass().getSimpleName()); + LOGGER.warn("The queue length cannot be modified. Queue type mismatch. Current queue type: {}", executor.getQueue().getClass().getSimpleName()); } } } + + private void sendChangeNotificationMessage(ThreadPoolExecutorHolder executorHolder, ExecutorProperties properties) { + ExecutorProperties executorProperties = executorHolder.getExecutorProperties(); + // TODO log cannot be printed + LOGGER.info(CHANGE_THREAD_POOL_TEXT, + executorHolder.getThreadPoolId(), + String.format(CHANGE_DELIMITER, executorProperties.getCorePoolSize(), properties.getCorePoolSize()), + String.format(CHANGE_DELIMITER, executorProperties.getMaximumPoolSize(), properties.getMaximumPoolSize()), + String.format(CHANGE_DELIMITER, executorProperties.getQueueCapacity(), properties.getQueueCapacity()), + String.format(CHANGE_DELIMITER, executorProperties.getKeepAliveTime(), properties.getKeepAliveTime()), + String.format(CHANGE_DELIMITER, executorProperties.getRejectedHandler(), properties.getRejectedHandler()), + String.format(CHANGE_DELIMITER, executorProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut())); + } }