Merge branch 'mabaiwan:develop' into develop

pull/277/head
lucky 8 3 years ago committed by GitHub
commit 46a89b4aff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -17,6 +17,8 @@
package cn.hippo4j.common.api;
import java.util.Map;
/**
* Thread-pool dynamic refresh.
*/
@ -28,4 +30,13 @@ public interface ThreadPoolDynamicRefresh {
* @param content
*/
void dynamicRefresh(String content);
/**
* Dynamic refresh.
*
* @param content
* @param newValueChangeMap
*/
default void dynamicRefresh(String content, Map<String, Object> newValueChangeMap) {
}
}

@ -35,24 +35,28 @@
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-core</artifactId>
<version>${tomcat-embed-core.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jetty</artifactId>
<scope>compile</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-undertow</artifactId>
<scope>compile</scope>
<optional>true</optional>
</dependency>
</dependencies>

@ -43,7 +43,7 @@ public abstract class AbstractThreadPoolRuntime {
* @param threadPoolRunStateInfo
* @return
*/
protected abstract ThreadPoolRunStateInfo supplement(ThreadPoolRunStateInfo threadPoolRunStateInfo);
public abstract ThreadPoolRunStateInfo supplement(ThreadPoolRunStateInfo threadPoolRunStateInfo);
/**
* Get pool run state.

@ -51,7 +51,7 @@ public class ThreadPoolRunStateHandler extends AbstractThreadPoolRuntime {
private final ConfigurableEnvironment environment;
@Override
protected ThreadPoolRunStateInfo supplement(ThreadPoolRunStateInfo poolRunStateInfo) {
public ThreadPoolRunStateInfo supplement(ThreadPoolRunStateInfo poolRunStateInfo) {
RuntimeInfo runtimeInfo = new RuntimeInfo();
String memoryProportion = StrUtil.builder(
"已分配: ",

@ -22,11 +22,14 @@ import cn.hippo4j.common.model.ThreadPoolParameter;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.common.model.ThreadPoolRunStateInfo;
import cn.hippo4j.core.executor.state.AbstractThreadPoolRuntime;
import cn.hippo4j.core.toolkit.CalculateUtil;
import cn.hutool.core.date.DateUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.web.embedded.tomcat.TomcatWebServer;
import org.springframework.boot.web.server.WebServer;
import java.util.Date;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
@ -68,36 +71,58 @@ public class TomcatWebThreadPoolHandler extends AbstractWebThreadPoolService {
@Override
public ThreadPoolBaseInfo simpleInfo() {
ThreadPoolBaseInfo poolBaseInfo = new ThreadPoolBaseInfo();
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
int corePoolSize = threadPoolExecutor.getCorePoolSize();
int maximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler();
long keepAliveTime = threadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS);
BlockingQueue<Runnable> queue = threadPoolExecutor.getQueue();
int queueSize = queue.size();
int remainingCapacity = queue.remainingCapacity();
int queueCapacity = queueSize + remainingCapacity;
int corePoolSize, maximumPoolSize, queueCapacity;
long keepAliveTime;
String rejectedExecutionHandlerName;
BlockingQueue<Runnable> blockingQueue;
if (executor instanceof ThreadPoolExecutor) {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
corePoolSize = threadPoolExecutor.getCorePoolSize();
maximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
keepAliveTime = threadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS);
blockingQueue = threadPoolExecutor.getQueue();
int queueSize = blockingQueue.size();
int remainingCapacity = blockingQueue.remainingCapacity();
queueCapacity = queueSize + remainingCapacity;
RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler();
rejectedExecutionHandlerName = rejectedExecutionHandler.getClass().getSimpleName();
} else {
org.apache.tomcat.util.threads.ThreadPoolExecutor tomcatThreadPoolExecutor = (org.apache.tomcat.util.threads.ThreadPoolExecutor) executor;
corePoolSize = tomcatThreadPoolExecutor.getCorePoolSize();
maximumPoolSize = tomcatThreadPoolExecutor.getMaximumPoolSize();
keepAliveTime = tomcatThreadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS);
blockingQueue = tomcatThreadPoolExecutor.getQueue();
int queueSize = blockingQueue.size();
int remainingCapacity = blockingQueue.remainingCapacity();
queueCapacity = queueSize + remainingCapacity;
rejectedExecutionHandlerName = tomcatThreadPoolExecutor.getRejectedExecutionHandler().getClass().getSimpleName();
}
poolBaseInfo.setCoreSize(corePoolSize);
poolBaseInfo.setMaximumSize(maximumPoolSize);
poolBaseInfo.setKeepAliveTime(keepAliveTime);
poolBaseInfo.setQueueType(queue.getClass().getSimpleName());
poolBaseInfo.setQueueType(blockingQueue.getClass().getSimpleName());
poolBaseInfo.setQueueCapacity(queueCapacity);
poolBaseInfo.setRejectedName(rejectedExecutionHandler.getClass().getSimpleName());
poolBaseInfo.setRejectedName(rejectedExecutionHandlerName);
return poolBaseInfo;
}
@Override
public ThreadPoolParameter getWebThreadPoolParameter() {
ThreadPoolParameterInfo parameterInfo = null;
ThreadPoolParameterInfo parameterInfo = new ThreadPoolParameterInfo();
int minThreads, maxThreads;
long keepAliveTime;
try {
parameterInfo = new ThreadPoolParameterInfo();
ThreadPoolExecutor tomcatExecutor = (ThreadPoolExecutor) executor;
int minThreads = tomcatExecutor.getCorePoolSize();
int maxThreads = tomcatExecutor.getMaximumPoolSize();
long keepAliveTime = tomcatExecutor.getKeepAliveTime(TimeUnit.SECONDS);
if (executor instanceof ThreadPoolExecutor) {
ThreadPoolExecutor tomcatExecutor = (ThreadPoolExecutor) executor;
minThreads = tomcatExecutor.getCorePoolSize();
maxThreads = tomcatExecutor.getMaximumPoolSize();
keepAliveTime = tomcatExecutor.getKeepAliveTime(TimeUnit.SECONDS);
} else {
org.apache.tomcat.util.threads.ThreadPoolExecutor tomcatThreadPoolExecutor = (org.apache.tomcat.util.threads.ThreadPoolExecutor) executor;
minThreads = tomcatThreadPoolExecutor.getCorePoolSize();
maxThreads = tomcatThreadPoolExecutor.getMaximumPoolSize();
keepAliveTime = tomcatThreadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS);
}
parameterInfo.setCoreSize(minThreads);
parameterInfo.setMaxSize(maxThreads);
parameterInfo.setKeepAliveTime((int) keepAliveTime);
@ -109,19 +134,75 @@ public class TomcatWebThreadPoolHandler extends AbstractWebThreadPoolService {
@Override
public ThreadPoolRunStateInfo getWebRunStateInfo() {
return webThreadPoolRunStateHandler.getPoolRunState(null, executor);
if (executor instanceof ThreadPoolExecutor) {
return webThreadPoolRunStateHandler.getPoolRunState(null, executor);
}
ThreadPoolRunStateInfo runStateInfo = new ThreadPoolRunStateInfo();
org.apache.tomcat.util.threads.ThreadPoolExecutor tomcatThreadPoolExecutor = (org.apache.tomcat.util.threads.ThreadPoolExecutor) executor;
// 核心线程数
int corePoolSize = tomcatThreadPoolExecutor.getCorePoolSize();
// 最大线程数
int maximumPoolSize = tomcatThreadPoolExecutor.getMaximumPoolSize();
// 线程池当前线程数 (有锁)
int poolSize = tomcatThreadPoolExecutor.getPoolSize();
// 活跃线程数 (有锁)
int activeCount = tomcatThreadPoolExecutor.getActiveCount();
// 同时进入池中的最大线程数 (有锁)
int largestPoolSize = tomcatThreadPoolExecutor.getLargestPoolSize();
// 线程池中执行任务总数量 (有锁)
long completedTaskCount = tomcatThreadPoolExecutor.getCompletedTaskCount();
// 当前负载
String currentLoad = CalculateUtil.divide(activeCount, maximumPoolSize) + "";
// 峰值负载
String peakLoad = CalculateUtil.divide(largestPoolSize, maximumPoolSize) + "";
BlockingQueue<Runnable> queue = tomcatThreadPoolExecutor.getQueue();
// 队列元素个数
int queueSize = queue.size();
// 队列类型
String queueType = queue.getClass().getSimpleName();
// 队列剩余容量
int remainingCapacity = queue.remainingCapacity();
// 队列容量
int queueCapacity = queueSize + remainingCapacity;
runStateInfo.setCoreSize(corePoolSize);
runStateInfo.setPoolSize(poolSize);
runStateInfo.setMaximumSize(maximumPoolSize);
runStateInfo.setActiveSize(activeCount);
runStateInfo.setCurrentLoad(currentLoad);
runStateInfo.setPeakLoad(peakLoad);
runStateInfo.setQueueType(queueType);
runStateInfo.setQueueSize(queueSize);
runStateInfo.setQueueCapacity(queueCapacity);
runStateInfo.setQueueRemainingCapacity(remainingCapacity);
runStateInfo.setLargestPoolSize(largestPoolSize);
runStateInfo.setCompletedTaskCount(completedTaskCount);
runStateInfo.setClientLastRefreshTime(DateUtil.formatDateTime(new Date()));
runStateInfo.setTimestamp(System.currentTimeMillis());
return webThreadPoolRunStateHandler.supplement(runStateInfo);
}
@Override
public void updateWebThreadPool(ThreadPoolParameterInfo threadPoolParameterInfo) {
int originalCoreSize, originalMaximumPoolSize;
long originalKeepAliveTime;
try {
ThreadPoolExecutor tomcatExecutor = (ThreadPoolExecutor) executor;
int originalCoreSize = tomcatExecutor.getCorePoolSize();
int originalMaximumPoolSize = tomcatExecutor.getMaximumPoolSize();
long originalKeepAliveTime = tomcatExecutor.getKeepAliveTime(TimeUnit.SECONDS);
tomcatExecutor.setCorePoolSize(threadPoolParameterInfo.corePoolSizeAdapt());
tomcatExecutor.setMaximumPoolSize(threadPoolParameterInfo.maximumPoolSizeAdapt());
tomcatExecutor.setKeepAliveTime(threadPoolParameterInfo.getKeepAliveTime(), TimeUnit.SECONDS);
if (executor instanceof ThreadPoolExecutor) {
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
originalCoreSize = threadPoolExecutor.getCorePoolSize();
originalMaximumPoolSize = threadPoolExecutor.getMaximumPoolSize();
originalKeepAliveTime = threadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS);
threadPoolExecutor.setCorePoolSize(threadPoolParameterInfo.corePoolSizeAdapt());
threadPoolExecutor.setMaximumPoolSize(threadPoolParameterInfo.maximumPoolSizeAdapt());
threadPoolExecutor.setKeepAliveTime(threadPoolParameterInfo.getKeepAliveTime(), TimeUnit.SECONDS);
} else {
org.apache.tomcat.util.threads.ThreadPoolExecutor tomcatThreadPoolExecutor = (org.apache.tomcat.util.threads.ThreadPoolExecutor) executor;
originalCoreSize = tomcatThreadPoolExecutor.getCorePoolSize();
originalMaximumPoolSize = tomcatThreadPoolExecutor.getMaximumPoolSize();
originalKeepAliveTime = tomcatThreadPoolExecutor.getKeepAliveTime(TimeUnit.SECONDS);
tomcatThreadPoolExecutor.setCorePoolSize(threadPoolParameterInfo.corePoolSizeAdapt());
tomcatThreadPoolExecutor.setMaximumPoolSize(threadPoolParameterInfo.maximumPoolSizeAdapt());
tomcatThreadPoolExecutor.setKeepAliveTime(threadPoolParameterInfo.getKeepAliveTime(), TimeUnit.SECONDS);
}
log.info("[TOMCAT] Changed web thread pool. corePoolSize :: [{}], maximumPoolSize :: [{}], keepAliveTime :: [{}]",
String.format(CHANGE_DELIMITER, originalCoreSize, threadPoolParameterInfo.corePoolSizeAdapt()),
String.format(CHANGE_DELIMITER, originalMaximumPoolSize, threadPoolParameterInfo.maximumPoolSizeAdapt()),

@ -34,7 +34,7 @@ import lombok.extern.slf4j.Slf4j;
public class WebThreadPoolRunStateHandler extends AbstractThreadPoolRuntime {
@Override
protected ThreadPoolRunStateInfo supplement(ThreadPoolRunStateInfo poolRunStateInfo) {
public ThreadPoolRunStateInfo supplement(ThreadPoolRunStateInfo poolRunStateInfo) {
RuntimeInfo runtimeInfo = new RuntimeInfo();
String memoryProportion = StrUtil.builder(
"已分配: ",

@ -47,7 +47,7 @@
<dependency>
<groupId>com.ctrip.framework.apollo</groupId>
<artifactId>apollo-client</artifactId>
<version>1.3.0</version>
<version>${apollo.version}</version>
</dependency>
<dependency>
@ -56,6 +56,12 @@
<version>2.2.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>

@ -0,0 +1,56 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-example</artifactId>
<version>${revision}</version>
</parent>
<artifactId>hippo4j-spring-boot-starter-adapter-kafka-example</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-json</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-spring-boot-starter-adapter-kafka</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-spring-boot-starter</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>cn.hippo4j</groupId>
<artifactId>hippo4j-example-core</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
</project>

@ -0,0 +1,14 @@
package cn.hippo4j.springboot.starter.adapter.kafka.example;
import cn.hippo4j.core.enable.EnableDynamicThreadPool;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableDynamicThreadPool
public class Hippo4jAdapterKafkaExampleApplication {
public static void main(String[] args) {
SpringApplication.run(Hippo4jAdapterKafkaExampleApplication.class, args);
}
}

@ -0,0 +1,26 @@
package cn.hippo4j.springboot.starter.adapter.kafka.example.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* Kafka message consumer.
*/
@Slf4j
@Component
public class KafkaMessageConsumer {
@KafkaListener(topics = "kafka_message_hippo4j", groupId = "hippo4j")
public void onMessage(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
Optional message = Optional.ofNullable(record.value());
message.ifPresent(each -> log.info(each.toString()));
ack.acknowledge();
}
}

@ -0,0 +1,39 @@
package cn.hippo4j.springboot.starter.adapter.kafka.example.produce;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.example.core.dto.SendMessageDTO;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
/**
* Kafka message produce.
*/
@Slf4j
@Component
@RestController
@AllArgsConstructor
public class KafkaMessageProduce {
private final KafkaTemplate kafkaTemplate;
private final String TOPIC = "kafka_message_hippo4j";
@GetMapping("/message/send")
public String sendMessage(Integer count) {
for (int i = 0; i < count; i++) {
String keys = UUID.randomUUID().toString();
SendMessageDTO payload = SendMessageDTO.builder()
.receiver("156011xxx91")
.uid(keys)
.build();
kafkaTemplate.send(TOPIC, JSONUtil.toJSONString(payload));
}
return "success";
}
}

@ -0,0 +1,23 @@
server.port=8092
spring.profiles.active=dev
spring.dynamic.thread-pool.server-addr=http://localhost:6691
spring.dynamic.thread-pool.namespace=prescription
spring.dynamic.thread-pool.item-id=dynamic-threadpool-example
spring.dynamic.thread-pool.username=admin
spring.dynamic.thread-pool.password=123456
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.acks=1
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.listener.concurrency=2
spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.listener.missing-topics-fatal=false

@ -33,6 +33,12 @@
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-core</artifactId>
<version>${tomcat-embed-core.version}</version>
</dependency>
<!--<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>

@ -7,19 +7,16 @@
<artifactId>hippo4j-all</artifactId>
<version>${revision}</version>
</parent>
<artifactId>hippo4j-example</artifactId>
<packaging>pom</packaging>
<name>${project.artifactId}</name>
<description>${project.artifactId}</description>
<modules>
<module>hippo4j-example-core</module>
<module>hippo4j-spring-boot-starter-example</module>
<module>hippo4j-core-nacos-spring-boot-starter-example</module>
<module>hippo4j-core-apollo-spring-boot-starter-example</module>
<module>hippo4j-core-zookeeper-spring-boot-starter-example</module>
<module>hippo4j-spring-boot-starter-adapter-kafka-example</module>
<module>hippo4j-spring-boot-starter-adapter-rabbitmq-example</module>
<module>hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq-example</module>
<module>hippo4j-spring-boot-starter-adapter-rocketmq-example</module>

@ -19,6 +19,7 @@ package cn.hippo4j.core.springboot.starter.refresher;
import cn.hippo4j.common.api.ThreadPoolDynamicRefresh;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import cn.hippo4j.core.springboot.starter.config.BootstrapCoreProperties;
import cn.hippo4j.core.springboot.starter.parser.ConfigParserHandler;
@ -28,6 +29,7 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
/**
@ -50,8 +52,15 @@ public abstract class AbstractCoreThreadPoolDynamicRefresh implements ThreadPool
@Override
public void dynamicRefresh(String configContent) {
dynamicRefresh(configContent, null);
}
public void dynamicRefresh(String configContent, Map<String, Object> newValueChangeMap) {
try {
Map<Object, Object> configInfo = ConfigParserHandler.getInstance().parseConfig(configContent, bootstrapCoreProperties.getConfigFileType());
if (CollectionUtil.isNotEmpty(newValueChangeMap)) {
Optional.ofNullable(configInfo).ifPresent(each -> each.putAll(newValueChangeMap));
}
BootstrapCoreProperties bindableCoreProperties = BootstrapCorePropertiesBinderAdapt.bootstrapCorePropertiesBinder(configInfo, bootstrapCoreProperties);
ApplicationContextHolder.getInstance().publishEvent(new Hippo4jCoreDynamicRefreshEvent(this, bindableCoreProperties));
} catch (Exception ex) {

@ -22,9 +22,15 @@ import com.ctrip.framework.apollo.ConfigChangeListener;
import com.ctrip.framework.apollo.ConfigFile;
import com.ctrip.framework.apollo.ConfigService;
import com.ctrip.framework.apollo.core.enums.ConfigFileFormat;
import com.ctrip.framework.apollo.model.ConfigChange;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import java.util.Map;
import static cn.hippo4j.core.springboot.starter.config.BootstrapCoreProperties.PREFIX;
/**
* @author : wh
* @date : 2022/2/28 21:32
@ -42,13 +48,18 @@ public class ApolloRefresherHandler extends AbstractCoreThreadPoolDynamicRefresh
public void afterPropertiesSet() {
String[] apolloNamespaces = this.namespace.split(",");
this.namespace = apolloNamespaces[0];
Config config = ConfigService.getConfig(namespace);
Config config = ConfigService.getConfig(String.format("%s.%s", namespace, bootstrapCoreProperties.getConfigFileType().getValue()));
ConfigChangeListener configChangeListener = configChangeEvent -> {
ConfigFile configFile = ConfigService.getConfigFile(
this.namespace.replaceAll("." + bootstrapCoreProperties.getConfigFileType().getValue(), ""),
ConfigFileFormat.fromString(bootstrapCoreProperties.getConfigFileType().getValue()));
String configInfo = configFile.getContent();
dynamicRefresh(configInfo);
String namespace = this.namespace.replaceAll("." + bootstrapCoreProperties.getConfigFileType().getValue(), "");
ConfigFileFormat configFileFormat = ConfigFileFormat.fromString(bootstrapCoreProperties.getConfigFileType().getValue());
ConfigFile configFile = ConfigService.getConfigFile(namespace, configFileFormat);
Map<String, Object> newChangeValueMap = Maps.newHashMap();
configChangeEvent.changedKeys().stream().filter(each -> each.contains(PREFIX)).forEach(each -> {
ConfigChange change = configChangeEvent.getChange(each);
String newValue = change.getNewValue();
newChangeValueMap.put(each, newValue);
});
dynamicRefresh(configFile.getContent(), newChangeValueMap);
};
config.addChangeListener(configChangeListener);
log.info("dynamic-thread-pool refresher, add apollo listener success, namespace: {}", namespace);

@ -61,7 +61,7 @@ public class RunTimeInfoCollector extends AbstractThreadPoolRuntime implements C
}
@Override
protected ThreadPoolRunStateInfo supplement(ThreadPoolRunStateInfo threadPoolRunStateInfo) {
public ThreadPoolRunStateInfo supplement(ThreadPoolRunStateInfo threadPoolRunStateInfo) {
return threadPoolRunStateInfo;
}
}

@ -48,6 +48,7 @@
<spring-boot.version>2.3.2.RELEASE</spring-boot.version>
<apollo.version>1.9.1</apollo.version>
<rocketmq.version>2.2.2</rocketmq.version>
<tomcat-embed-core.version>9.0.55</tomcat-embed-core.version>
<spring-cloud-starter-stream-rocketmq.version>2.2.6.RELEASE</spring-cloud-starter-stream-rocketmq.version>
<netty.version>4.1.10.Final</netty.version>

Loading…
Cancel
Save