Development Agent mode dynamic change (#1325)

pull/1314/head
magestack 2 years ago committed by GitHub
parent 1b81e00b56
commit 618b382401
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -77,7 +77,11 @@
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-infra-common</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-dynamic-mode-config</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

@ -22,14 +22,4 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<sdk.plugin.related.dir>/..</sdk.plugin.related.dir>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-agent-spring-plugin-common</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>

@ -33,6 +33,7 @@
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-agent-spring-plugin-common</artifactId>
<scope>provided</scope>
<version>${project.version}</version>
</dependency>
<dependency>

@ -21,6 +21,7 @@
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-agent-spring-plugin-common</artifactId>
<scope>provided</scope>
<version>${project.version}</version>
</dependency>
<dependency>
@ -36,5 +37,11 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-client</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

@ -17,20 +17,57 @@
package cn.hippo4j.agent.plugin.spring.boot.v2;
import cn.hippo4j.agent.core.logging.api.ILog;
import cn.hippo4j.agent.core.logging.api.LogManager;
import cn.hippo4j.agent.plugin.spring.common.conf.SpringBootConfig;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.mode.config.refresher.AbstractConfigThreadPoolDynamicRefresh;
import com.ctrip.framework.apollo.Config;
import com.ctrip.framework.apollo.ConfigChangeListener;
import com.ctrip.framework.apollo.ConfigFile;
import com.ctrip.framework.apollo.ConfigService;
import com.ctrip.framework.apollo.core.enums.ConfigFileFormat;
import com.ctrip.framework.apollo.model.ConfigChange;
import org.springframework.boot.context.properties.bind.Bindable;
import org.springframework.boot.context.properties.bind.Binder;
import org.springframework.boot.context.properties.source.ConfigurationPropertySource;
import org.springframework.boot.context.properties.source.MapConfigurationPropertySource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static cn.hippo4j.agent.core.conf.Constants.SPRING_BOOT_CONFIG_PREFIX;
/**
* Dynamic thread pool change handler spring 2x
*/
public class DynamicThreadPoolChangeHandlerSpring2x extends AbstractConfigThreadPoolDynamicRefresh {
private static ILog LOGGER = LogManager.getLogger(DynamicThreadPoolChangeHandlerSpring2x.class);
@Override
public void registerListener() {
List<String> apolloNamespaces = SpringBootConfig.Spring.Dynamic.Thread_Pool.Apollo.NAMESPACE;
String namespace = apolloNamespaces.get(0);
String configFileType = SpringBootConfig.Spring.Dynamic.Thread_Pool.CONFIG_FILE_TYPE;
Config config = ConfigService.getConfig(String.format("%s.%s", namespace, configFileType));
ConfigChangeListener configChangeListener = configChangeEvent -> {
String replacedNamespace = namespace.replaceAll("." + configFileType, "");
ConfigFileFormat configFileFormat = ConfigFileFormat.fromString(configFileType);
ConfigFile configFile = ConfigService.getConfigFile(replacedNamespace, configFileFormat);
Map<String, Object> newChangeValueMap = new HashMap<>();
configChangeEvent.changedKeys().stream().filter(each -> each.contains(SPRING_BOOT_CONFIG_PREFIX)).forEach(each -> {
ConfigChange change = configChangeEvent.getChange(each);
String newValue = change.getNewValue();
newChangeValueMap.put(each, newValue);
});
dynamicRefresh(configFileType, configFile.getContent(), newChangeValueMap);
};
config.addChangeListener(configChangeListener);
LOGGER.info("[Hippo4j-Agent] Dynamic thread pool refresher, add apollo listener success. namespace: {}", namespace);
}
@Override
public BootstrapConfigProperties buildBootstrapProperties(Map<Object, Object> configInfo) {
BootstrapConfigProperties bindableBootstrapConfigProperties = new BootstrapConfigProperties();

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.agent.plugin.spring.boot.v2.boot;
import cn.hippo4j.agent.core.boot.BootService;
import cn.hippo4j.agent.core.boot.DefaultImplementor;
import cn.hippo4j.agent.core.logging.api.ILog;
import cn.hippo4j.agent.core.logging.api.LogManager;
/**
* SpringBoot v1 plugin boot service
*/
@DefaultImplementor
public class SpringBootV2PluginBootService implements BootService {
private static final ILog LOGGER = LogManager.getLogger(SpringBootV2PluginBootService.class);
@Override
public void prepare() throws Throwable {
}
@Override
public void boot() throws Throwable {
LOGGER.info("Loader SpringBootV2PluginBootService...");
}
@Override
public void onComplete() throws Throwable {
}
@Override
public void shutdown() throws Throwable {
}
}

@ -25,7 +25,9 @@ import cn.hippo4j.agent.core.plugin.interceptor.enhance.MethodInterceptResult;
import cn.hippo4j.agent.plugin.spring.boot.v2.DynamicThreadPoolChangeHandlerSpring2x;
import cn.hippo4j.agent.plugin.spring.common.support.SpringPropertiesLoader;
import cn.hippo4j.agent.plugin.spring.common.support.SpringThreadPoolRegisterSupport;
import cn.hippo4j.common.extension.design.AbstractSubjectCenter;
import cn.hippo4j.threadpool.dynamic.api.ThreadPoolDynamicRefresh;
import cn.hippo4j.threadpool.dynamic.mode.config.refresher.event.DynamicThreadPoolRefreshListener;
import org.springframework.context.ConfigurableApplicationContext;
import java.lang.reflect.Method;
@ -53,6 +55,8 @@ public class EventPublishingStartedInterceptor implements InstanceMethodsAroundI
SpringPropertiesLoader.loadSpringProperties(context.getEnvironment());
ThreadPoolDynamicRefresh dynamicRefresh = new DynamicThreadPoolChangeHandlerSpring2x();
dynamicRefresh.registerListener();
AbstractSubjectCenter.register(AbstractSubjectCenter.SubjectType.THREAD_POOL_DYNAMIC_REFRESH,
new DynamicThreadPoolRefreshListener());
return ret;
}

@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cn.hippo4j.agent.plugin.spring.boot.v2.boot.SpringBootV2PluginBootService

@ -31,11 +31,6 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-dynamic-api</artifactId>

@ -1,104 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.agent.plugin.spring.common.support;
import cn.hippo4j.agent.plugin.spring.common.conf.SpringBootConfig;
import cn.hippo4j.threadpool.dynamic.api.ThreadPoolDynamicRefresh;
import com.ctrip.framework.apollo.ConfigChangeListener;
import com.ctrip.framework.apollo.ConfigFile;
import com.ctrip.framework.apollo.ConfigService;
import com.ctrip.framework.apollo.core.enums.ConfigFileFormat;
import com.ctrip.framework.apollo.model.ConfigChange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static cn.hippo4j.agent.core.conf.Constants.SPRING_BOOT_CONFIG_PREFIX;
/**
* Abstract dynamic thread poo change handler spring
*/
public abstract class AbstractDynamicThreadPoolChangeHandlerSpring implements ThreadPoolDynamicRefresh {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDynamicThreadPoolChangeHandlerSpring.class);
public void registerListener() {
List<String> apolloNamespaces = SpringBootConfig.Spring.Dynamic.Thread_Pool.Apollo.NAMESPACE;
String namespace = apolloNamespaces.get(0);
String configFileType = SpringBootConfig.Spring.Dynamic.Thread_Pool.CONFIG_FILE_TYPE;
com.ctrip.framework.apollo.Config config = ConfigService.getConfig(String.format("%s.%s", namespace, configFileType));
ConfigChangeListener configChangeListener = configChangeEvent -> {
String replacedNamespace = namespace.replaceAll("." + configFileType, "");
ConfigFileFormat configFileFormat = ConfigFileFormat.fromString(configFileType);
ConfigFile configFile = ConfigService.getConfigFile(replacedNamespace, configFileFormat);
Map<String, Object> newChangeValueMap = new HashMap<>();
configChangeEvent.changedKeys().stream().filter(each -> each.contains(SPRING_BOOT_CONFIG_PREFIX)).forEach(each -> {
ConfigChange change = configChangeEvent.getChange(each);
String newValue = change.getNewValue();
newChangeValueMap.put(each, newValue);
});
dynamicRefresh(configFile.getContent(), newChangeValueMap);
};
config.addChangeListener(configChangeListener);
LOGGER.info("[Hippo4j-Agent] Dynamic thread pool refresher, add apollo listener success. namespace: {}", namespace);
}
public void dynamicRefresh(String configContent, Map<String, Object> newValueChangeMap) {
try {
// String configFileType = SpringBootConfig.Spring.Dynamic.Thread_Pool.CONFIG_FILE_TYPE;
//
// Map<Object, Object> afterConfigMap = ConfigParserHandler.getInstance().parseConfig(configContent,
// ConfigFileTypeEnum.of(configFileType));
// if (CollectionUtil.isNotEmpty(newValueChangeMap)) {
// Optional.ofNullable(afterConfigMap).ifPresent(each -> each.putAll(newValueChangeMap));
// }
// TODO
// BootstrapConfigProperties afterConfigProperties = bindProperties(afterConfigMap, context);
//
// List<ExecutorProperties> executors = afterConfigProperties.getExecutors();
// for (ExecutorProperties afterProperties : executors) {
// String threadPoolId =
// afterProperties.getThreadPoolId();
// AgentThreadPoolExecutorHolder holder = AgentThreadPoolInstanceRegistry.getInstance().getHolder(threadPoolId);
// if (holder.isEmpty() ||
// holder.getExecutor() == null) {
// continue;
// }
// ExecutorProperties beforeProperties = convert(holder.getProperties());
// if (!checkConsistency(threadPoolId, beforeProperties, afterProperties)) {
// continue;
// }
// dynamicRefreshPool(beforeProperties, afterProperties);
// holder.setProperties(failDefaultExecutorProperties(beforeProperties, afterProperties)); // do refresh.
// ChangeParameterNotifyRequest changeRequest = buildChangeRequest(beforeProperties, afterProperties);
// LOGGER.info(CHANGE_THREAD_POOL_TEXT, threadPoolId, String.format(CHANGE_DELIMITER,
// beforeProperties.getCorePoolSize(), changeRequest.getNowCorePoolSize()), String.format(CHANGE_DELIMITER, beforeProperties.getMaximumPoolSize(), changeRequest.getNowMaximumPoolSize()),
// String.format(CHANGE_DELIMITER, beforeProperties.getQueueCapacity(), changeRequest.getNowQueueCapacity()), String.format(CHANGE_DELIMITER, beforeProperties.getKeepAliveTime(),
// changeRequest.getNowKeepAliveTime()), String.format(CHANGE_DELIMITER, beforeProperties.getExecuteTimeOut(), changeRequest.getNowExecuteTimeOut()), String.format(CHANGE_DELIMITER,
// beforeProperties.getRejectedHandler(), changeRequest.getNowRejectedName()), String.format(CHANGE_DELIMITER, beforeProperties.getAllowCoreThreadTimeOut(),
// changeRequest.getNowAllowsCoreThreadTimeOut()));
// }
} catch (Exception ex) {
LOGGER.error("[Hippo4j-Agent] config mode dynamic refresh failed.", ex);
}
}
}

@ -64,8 +64,8 @@ public class SpringThreadPoolRegisterSupport {
for (Map.Entry<String, Executor> entry : beansWithAnnotation.entrySet()) {
String beanName = entry.getKey();
Executor bean = entry.getValue();
ThreadPoolExecutor executor = null;
//
ThreadPoolExecutor executor = (ThreadPoolExecutor) bean;
// TODO
// if (DynamicThreadPoolAdapterChoose.match(bean)) {
// executor = DynamicThreadPoolAdapterChoose.unwrap(bean);
// } else {

@ -0,0 +1,80 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-agent-example</artifactId>
<version>${revision}</version>
</parent>
<artifactId>hippo4j-threadpool-agent-config-apollo-example</artifactId>
<properties>
<maven.deploy.skip>true</maven.deploy.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-context</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.example.agent.config.apollo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Agent config apollo example application.
*/
@SpringBootApplication
public class AgentConfigApolloExampleApplication {
public static void main(String[] args) {
SpringApplication.run(AgentConfigApolloExampleApplication.class, args);
}
}

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.example.agent.config.apollo;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Configuration
public class ThreadPoolConfiguration {
// -------------------------------------------------------------------------
// 未使用 Hippo4j原始定义线程池创建方式
// -------------------------------------------------------------------------
@Bean
public ThreadPoolExecutor runMessageSendTaskExecutor() {
LinkedBlockingQueue<Runnable> linkedBlockingQueue = new LinkedBlockingQueue<>(1024);
return new ThreadPoolExecutor(
1,
10,
1024,
TimeUnit.SECONDS,
linkedBlockingQueue);
}
// -------------------------------------------------------------------------
// 演示 Agent 模式修改线程池
// -------------------------------------------------------------------------
public static final ThreadPoolExecutor RUN_MESSAGE_SEND_TASK_EXECUTOR = new ThreadPoolExecutor(
1,
10,
1024,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1024));
}

@ -0,0 +1,33 @@
server.port=8092
server.servlet.context-path=/example
app.id=dynamic-threadpool-example
apollo.meta=http://127.0.0.1:8080
apollo.autoUpdateInjectedSpringProperties=true
apollo.bootstrap.enabled=true
apollo.bootstrap.namespaces=application
apollo.bootstrap.eagerLoad.enabled=true
# The following parameters are used for testing
env=dev
apollo.configService=http://127.0.0.1:8080
spring.profiles.active=dev
spring.application.name=hippo4j-config-apollo-spring-boot-starter-example
management.metrics.export.prometheus.enabled=true
management.server.port=29998
management.endpoints.web.exposure.include=*
spring.dynamic.thread-pool.enable=true
spring.dynamic.thread-pool.banner=true
spring.dynamic.thread-pool.check-state-interval=3
#spring.dynamic.thread-pool.monitor.enable=true
#spring.dynamic.thread-pool.monitor.collect-types=micrometer
#spring.dynamic.thread-pool.monitor.thread-pool-types=dynamic,web
#spring.dynamic.thread-pool.monitor.initial-delay=10000
#spring.dynamic.thread-pool.monitor.collect-interval=5000
#spring.dynamic.thread-pool.notify-platforms[0].platform=WECHAT
#spring.dynamic.thread-pool.notify-platforms[0].token=ac0426a5-c712-474c-9bff-72b8b8f5caff
#spring.dynamic.thread-pool.notify-platforms[1].platform=DING
#spring.dynamic.thread-pool.notify-platforms[1].token=56417ebba6a27ca352f0de77a2ae9da66d01f39610b5ee8a6033c60ef9071c55
#spring.dynamic.thread-pool.notify-platforms[2].platform=LARK
#spring.dynamic.thread-pool.notify-platforms[2].token=2cbf2808-3839-4c26-a04d-fd201dd51f9e
spring.dynamic.thread-pool.apollo.namespace=application
spring.dynamic.thread-pool.config-file-type=properties

@ -0,0 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-threadpool-example</artifactId>
<version>${revision}</version>
</parent>
<artifactId>hippo4j-threadpool-agent-example</artifactId>
<packaging>pom</packaging>
<properties>
<maven.deploy.skip>true</maven.deploy.skip>
</properties>
<modules>
<module>config-apollo</module>
</modules>
</project>

@ -18,5 +18,6 @@
<module>config</module>
<module>example-core</module>
<module>server</module>
<module>agent</module>
</modules>
</project>

@ -46,8 +46,7 @@ public abstract class AbstractConfigThreadPoolDynamicRefresh implements ThreadPo
Optional.ofNullable(configInfo).ifPresent(each -> each.putAll(newValueChangeMap));
}
BootstrapPropertiesInterface bootstrapProperties = buildBootstrapProperties(configInfo);
// publishDynamicThreadPoolEvent(binderCoreProperties);
AbstractSubjectCenter.notify("", null);
AbstractSubjectCenter.notify(AbstractSubjectCenter.SubjectType.THREAD_POOL_DYNAMIC_REFRESH, () -> bootstrapProperties);
} catch (Exception ex) {
log.error("Hippo4j config mode dynamic refresh failed.", ex);
}

@ -17,6 +17,8 @@
package cn.hippo4j.threadpool.dynamic.mode.config.refresher.event;
import cn.hippo4j.common.executor.ThreadPoolExecutorHolder;
import cn.hippo4j.common.executor.ThreadPoolInstanceRegistry;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue;
@ -24,11 +26,10 @@ import cn.hippo4j.common.extension.design.Observer;
import cn.hippo4j.common.extension.design.ObserverMessage;
import cn.hippo4j.common.model.executor.ExecutorProperties;
import cn.hippo4j.common.toolkit.ThreadPoolExecutorUtil;
import cn.hippo4j.threadpool.dynamic.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.threadpool.dynamic.mode.config.properties.BootstrapConfigProperties;
import cn.hippo4j.threadpool.dynamic.core.executor.manage.GlobalConfigThreadPoolManage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Objects;
@ -42,38 +43,58 @@ import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD
/**
* Dynamic thread-pool refresh listener.
*/
@Slf4j
@RequiredArgsConstructor
public class DynamicThreadPoolRefreshListener implements Observer<BootstrapConfigProperties> {
private static final Logger LOGGER = LoggerFactory.getLogger(DynamicThreadPoolRefreshListener.class);
@Override
public void accept(ObserverMessage<BootstrapConfigProperties> observerMessage) {
BootstrapConfigProperties bindableConfigProperties = observerMessage.message();
List<ExecutorProperties> executors = bindableConfigProperties.getExecutors();
for (ExecutorProperties properties : executors) {
String threadPoolId = properties.getThreadPoolId();
dynamicRefreshPool(threadPoolId, properties);
ExecutorProperties beforeProperties = GlobalConfigThreadPoolManage.getProperties(properties.getThreadPoolId());
log.info(CHANGE_THREAD_POOL_TEXT,
threadPoolId,
String.format(CHANGE_DELIMITER, beforeProperties.getCorePoolSize(), properties.getCorePoolSize()),
String.format(CHANGE_DELIMITER, beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize()),
String.format(CHANGE_DELIMITER, beforeProperties.getQueueCapacity(), properties.getQueueCapacity()),
String.format(CHANGE_DELIMITER, beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime()),
String.format(CHANGE_DELIMITER, beforeProperties.getRejectedHandler(), properties.getRejectedHandler()),
String.format(CHANGE_DELIMITER, beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut()));
// Check whether the thread pool configuration is empty and whether the parameters have changed
ThreadPoolExecutorHolder executorHolder = ThreadPoolInstanceRegistry.getInstance().getHolder(threadPoolId);
if (executorHolder.isEmpty() || !checkPropertiesConsistency(executorHolder, properties)) {
continue;
}
dynamicRefreshThreadPool(executorHolder, properties);
sendChangeNotificationMessage(executorHolder, properties);
executorHolder.setExecutorProperties(properties);
}
}
/**
* Dynamic refresh pool.
* Check consistency.
*
* @param threadPoolId
* @param properties
* @param executorHolder executor holder
* @param properties properties after dynamic thread pool change
*/
private void dynamicRefreshPool(String threadPoolId, ExecutorProperties properties) {
ExecutorProperties beforeProperties = GlobalConfigThreadPoolManage.getProperties(properties.getThreadPoolId());
ThreadPoolExecutor executor = GlobalThreadPoolManage.getExecutorService(threadPoolId);
private boolean checkPropertiesConsistency(ThreadPoolExecutorHolder executorHolder, ExecutorProperties properties) {
ExecutorProperties beforeProperties = executorHolder.getExecutorProperties();
ThreadPoolExecutor executor = executorHolder.getExecutor();
boolean result = (properties.getCorePoolSize() != null && !Objects.equals(beforeProperties.getCorePoolSize(), properties.getCorePoolSize()))
|| (properties.getMaximumPoolSize() != null && !Objects.equals(beforeProperties.getMaximumPoolSize(), properties.getMaximumPoolSize()))
|| (properties.getAllowCoreThreadTimeOut() != null && !Objects.equals(beforeProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut()))
|| (properties.getExecuteTimeOut() != null && !Objects.equals(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut()))
|| (properties.getKeepAliveTime() != null && !Objects.equals(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime()))
|| (properties.getRejectedHandler() != null && !Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler()))
||
((properties.getQueueCapacity() != null && !Objects.equals(beforeProperties.getQueueCapacity(), properties.getQueueCapacity())
&& Objects.equals(BlockingQueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.getName(), executor.getQueue().getClass().getSimpleName())));
return result;
}
/**
* Dynamic refresh thread-pool.
*
* @param executorHolder executor holder
* @param properties properties after dynamic thread pool change
*/
private void dynamicRefreshThreadPool(ThreadPoolExecutorHolder executorHolder, ExecutorProperties properties) {
ExecutorProperties beforeProperties = executorHolder.getExecutorProperties();
ThreadPoolExecutor executor = executorHolder.getExecutor();
if (properties.getMaximumPoolSize() != null && properties.getCorePoolSize() != null) {
ThreadPoolExecutorUtil.safeSetPoolSize(executor, properties.getCorePoolSize(), properties.getMaximumPoolSize());
} else {
@ -106,8 +127,21 @@ public class DynamicThreadPoolRefreshListener implements Observer<BootstrapConfi
ResizableCapacityLinkedBlockingQueue<?> queue = (ResizableCapacityLinkedBlockingQueue<?>) executor.getQueue();
queue.setCapacity(properties.getQueueCapacity());
} else {
log.warn("The queue length cannot be modified. Queue type mismatch. Current queue type: {}", executor.getQueue().getClass().getSimpleName());
LOGGER.warn("The queue length cannot be modified. Queue type mismatch. Current queue type: {}", executor.getQueue().getClass().getSimpleName());
}
}
}
private void sendChangeNotificationMessage(ThreadPoolExecutorHolder executorHolder, ExecutorProperties properties) {
ExecutorProperties executorProperties = executorHolder.getExecutorProperties();
// TODO log cannot be printed
LOGGER.info(CHANGE_THREAD_POOL_TEXT,
executorHolder.getThreadPoolId(),
String.format(CHANGE_DELIMITER, executorProperties.getCorePoolSize(), properties.getCorePoolSize()),
String.format(CHANGE_DELIMITER, executorProperties.getMaximumPoolSize(), properties.getMaximumPoolSize()),
String.format(CHANGE_DELIMITER, executorProperties.getQueueCapacity(), properties.getQueueCapacity()),
String.format(CHANGE_DELIMITER, executorProperties.getKeepAliveTime(), properties.getKeepAliveTime()),
String.format(CHANGE_DELIMITER, executorProperties.getRejectedHandler(), properties.getRejectedHandler()),
String.format(CHANGE_DELIMITER, executorProperties.getAllowCoreThreadTimeOut(), properties.getAllowCoreThreadTimeOut()));
}
}

Loading…
Cancel
Save