From 3ed35efee1350377ccc8b622353aba298ba36de1 Mon Sep 17 00:00:00 2001 From: shiming-stars-lk <1031900093@qq.com> Date: Fri, 15 Jul 2022 10:45:18 +0800 Subject: [PATCH 01/10] Add monitoring of Hystrix thread pools --- .../hippo4j-adapter-hystrix/pom.xml | 55 ++++++++ .../hystrix/HystrixThreadPoolAdapter.java | 118 ++++++++++++++++++ hippo4j-adapter/pom.xml | 1 + .../pom.xml | 6 + .../pom.xml | 50 ++++++++ .../HystrixAdapterAutoConfiguration.java | 28 +++++ .../main/resources/META-INF/spring.factories | 1 + .../pom.xml | 1 + pom.xml | 7 ++ 9 files changed, 267 insertions(+) create mode 100644 hippo4j-adapter/hippo4j-adapter-hystrix/pom.xml create mode 100644 hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java create mode 100644 hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/pom.xml create mode 100644 hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/src/main/java/cn/hippo4j/springboot/starter/adapter/hystrix/HystrixAdapterAutoConfiguration.java create mode 100644 hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/src/main/resources/META-INF/spring.factories diff --git a/hippo4j-adapter/hippo4j-adapter-hystrix/pom.xml b/hippo4j-adapter/hippo4j-adapter-hystrix/pom.xml new file mode 100644 index 00000000..83e1dba1 --- /dev/null +++ b/hippo4j-adapter/hippo4j-adapter-hystrix/pom.xml @@ -0,0 +1,55 @@ + + + 4.0.0 + + cn.hippo4j + hippo4j-adapter + ${revision} + + hippo4j-adapter-hystrix + + + + cn.hippo4j + hippo4j-adapter-base + + + org.springframework.cloud + spring-cloud-starter-netflix-hystrix + ${spring-cloud-starter-netflix-hystrix.version} + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + ${project.artifactId} + ${project.version} + ${maven.build.timestamp} + chen.ma + + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 2.10.3 + + + + jar + + + + + + + diff --git a/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java new file mode 100644 index 00000000..1fc1e336 --- /dev/null +++ b/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.adapter.hystrix; + +import cn.hippo4j.adapter.base.ThreadPoolAdapter; +import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter; +import cn.hippo4j.adapter.base.ThreadPoolAdapterState; +import cn.hippo4j.common.toolkit.CollectionUtil; +import com.google.common.collect.Maps; +import com.netflix.hystrix.HystrixThreadPool; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.context.event.ApplicationStartedEvent; +import org.springframework.context.ApplicationListener; + +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadPoolExecutor; + +import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER; + +/** + * hystrix thread-pool adapter. + */ +@Slf4j +public class HystrixThreadPoolAdapter implements ThreadPoolAdapter, ApplicationListener { + + private static final String THREAD_POOL_FIELD = "threadPool"; + + private static final String THREAD_POOLS_FIELD = "threadPools"; + + private final Map HYSTRIX_CONSUME_EXECUTOR = Maps.newHashMap(); + + @Override + public String mark() { + return "hystrix"; + } + + @Override + public ThreadPoolAdapterState getThreadPoolState(String identify) { + ThreadPoolAdapterState result = new ThreadPoolAdapterState(); + ThreadPoolExecutor rocketMQConsumeExecutor = HYSTRIX_CONSUME_EXECUTOR.get(identify); + if (rocketMQConsumeExecutor != null) { + result.setThreadPoolKey(identify); + result.setCoreSize(rocketMQConsumeExecutor.getCorePoolSize()); + result.setMaximumSize(rocketMQConsumeExecutor.getMaximumPoolSize()); + return result; + } + log.warn("[{}] hystrix thread pool not found.", identify); + return result; + } + + @Override + public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) { + String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey(); + ThreadPoolExecutor threadPoolExecutor = HYSTRIX_CONSUME_EXECUTOR.get(threadPoolKey); + if (threadPoolExecutor == null) { + log.warn("[{}] hystrix thread pool not found.", threadPoolKey); + return false; + } + int originalCoreSize = threadPoolExecutor.getCorePoolSize(); + int originalMaximumPoolSize = threadPoolExecutor.getMaximumPoolSize(); + threadPoolExecutor.setCorePoolSize(threadPoolAdapterParameter.getCorePoolSize()); + threadPoolExecutor.setMaximumPoolSize(threadPoolAdapterParameter.getMaximumPoolSize()); + log.info("[{}] hystrix thread pool parameter change. coreSize :: {}, maximumSize :: {}", + threadPoolKey, + String.format(CHANGE_DELIMITER, originalCoreSize, threadPoolExecutor.getCorePoolSize()), + String.format(CHANGE_DELIMITER, originalMaximumPoolSize, threadPoolExecutor.getMaximumPoolSize())); + return true; + } + + @Override + public void onApplicationEvent(ApplicationStartedEvent event) { + try { + Class factoryClass = HystrixThreadPool.Factory.class; + Field threadPoolsField = factoryClass.getDeclaredField(THREAD_POOLS_FIELD); + threadPoolsField.setAccessible(true); + ConcurrentHashMap threadPools = + (ConcurrentHashMap)threadPoolsField.get(factoryClass); + if (CollectionUtil.isNotEmpty(threadPools)) { + for (Map.Entry stringHystrixThreadPoolEntry : threadPools.entrySet()) { + String key = stringHystrixThreadPoolEntry.getKey(); + HystrixThreadPool value = stringHystrixThreadPoolEntry.getValue(); + if (value instanceof HystrixThreadPool.HystrixThreadPoolDefault) { + HystrixThreadPool.HystrixThreadPoolDefault hystrixThreadPoolDefault = + (HystrixThreadPool.HystrixThreadPoolDefault)value; + Class hystrixThreadPoolDefaultClass = hystrixThreadPoolDefault.getClass(); + Field threadPoolField = hystrixThreadPoolDefaultClass.getDeclaredField(THREAD_POOL_FIELD); + threadPoolField.setAccessible(true); + ThreadPoolExecutor threadPoolExecutor = + (ThreadPoolExecutor)threadPoolField.get(hystrixThreadPoolDefault); + if (threadPoolExecutor != null) { + HYSTRIX_CONSUME_EXECUTOR.put(key,threadPoolExecutor); + } + } + } + } + }catch (Exception e) { + log.error("Failed to get Hystrix thread pool.", e); + } + } +} diff --git a/hippo4j-adapter/pom.xml b/hippo4j-adapter/pom.xml index 6eef9cd9..2d03f504 100644 --- a/hippo4j-adapter/pom.xml +++ b/hippo4j-adapter/pom.xml @@ -16,6 +16,7 @@ hippo4j-adapter-kafka hippo4j-adapter-rabbitmq hippo4j-adapter-rocketmq + hippo4j-adapter-hystrix hippo4j-adapter-spring-cloud-stream-rocketmq hippo4j-adapter-spring-cloud-stream-kafka diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-all/pom.xml b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-all/pom.xml index 926a59e3..012fb69b 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-all/pom.xml +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-all/pom.xml @@ -33,6 +33,12 @@ hippo4j-spring-boot-starter-adapter-dubbo ${revision} + + + cn.hippo4j + hippo4j-spring-boot-starter-adapter-hystrix + ${revision} + diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/pom.xml b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/pom.xml new file mode 100644 index 00000000..cf5db2f1 --- /dev/null +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/pom.xml @@ -0,0 +1,50 @@ + + + 4.0.0 + + cn.hippo4j + hippo4j-spring-boot-starter-adapter + ${revision} + + hippo4j-spring-boot-starter-adapter-hystrix + + + + cn.hippo4j + hippo4j-adapter-hystrix + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + ${project.artifactId} + ${project.version} + ${maven.build.timestamp} + chen.ma + + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 2.10.3 + + + + jar + + + + + + + diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/src/main/java/cn/hippo4j/springboot/starter/adapter/hystrix/HystrixAdapterAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/src/main/java/cn/hippo4j/springboot/starter/adapter/hystrix/HystrixAdapterAutoConfiguration.java new file mode 100644 index 00000000..9e2d1ac2 --- /dev/null +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/src/main/java/cn/hippo4j/springboot/starter/adapter/hystrix/HystrixAdapterAutoConfiguration.java @@ -0,0 +1,28 @@ +package cn.hippo4j.springboot.starter.adapter.hystrix; + +import cn.hippo4j.adapter.hystrix.HystrixThreadPoolAdapter; +import cn.hippo4j.common.config.ApplicationContextHolder; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * @program: hippo4j + * @description: + * @author: lk + * @create: 2022-07-15 + **/ +@Configuration(proxyBeanMethods = false) +public class HystrixAdapterAutoConfiguration { + + @Bean + @ConditionalOnMissingBean + public ApplicationContextHolder simpleApplicationContextHolder() { + return new ApplicationContextHolder(); + } + + @Bean + public HystrixThreadPoolAdapter hystrixThreadPoolAdapter(){ + return new HystrixThreadPoolAdapter(); + } +} diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/src/main/resources/META-INF/spring.factories b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/src/main/resources/META-INF/spring.factories new file mode 100644 index 00000000..e00410f3 --- /dev/null +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/src/main/resources/META-INF/spring.factories @@ -0,0 +1 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=cn.hippo4j.springboot.starter.adapter.hystrix.HystrixAdapterAutoConfiguration \ No newline at end of file diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/pom.xml b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/pom.xml index 7caf36aa..647602f7 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/pom.xml +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/pom.xml @@ -18,6 +18,7 @@ hippo4j-spring-boot-starter-adapter-kafka hippo4j-spring-boot-starter-adapter-rabbitmq hippo4j-spring-boot-starter-adapter-rocketmq + hippo4j-spring-boot-starter-adapter-hystrix hippo4j-spring-boot-starter-adapter-spring-cloud-stream-kafka hippo4j-spring-boot-starter-adapter-spring-cloud-stream-rocketmq diff --git a/pom.xml b/pom.xml index ad28f873..2cca8e09 100644 --- a/pom.xml +++ b/pom.xml @@ -51,6 +51,7 @@ 9.0.55 2.2.6.RELEASE 4.1.10.Final + 2.2.9.RELEASE false UTF-8 @@ -172,6 +173,12 @@ ${revision} + + cn.hippo4j + hippo4j-adapter-hystrix + ${revision} + + cn.hippo4j hippo4j-adapter-spring-cloud-stream-kafka From 26dea183c6fd1927c522e64098b18a9ca7b4f134 Mon Sep 17 00:00:00 2001 From: shiming-stars-lk <1031900093@qq.com> Date: Fri, 15 Jul 2022 16:44:41 +0800 Subject: [PATCH 02/10] Add monitoring of Hystrix thread pools --- .../adapter/base/ThreadPoolAdapterExtra.java | 57 ++++++++++++ ...readPoolAdapterExtraAutoConfiguration.java | 34 ++++++++ .../base/ThreadPoolAdapterExtraHandle.java | 29 +++++++ .../main/resources/META-INF/spring.factories | 1 + .../hystrix/HystrixThreadPoolAdapter.java | 86 ++++++++++++++++--- .../config/config/NettyServerConfig.java | 27 ++++-- .../config/netty/MonitorNettyServer.java | 34 ++++++-- .../hippo4j/config/netty/ServerHandler.java | 17 ++++ ...Hippo4jAdapterKafkaExampleApplication.java | 17 ++++ .../consumer/KafkaMessageConsumer.java | 17 ++++ .../example/produce/KafkaMessageProduce.java | 17 ++++ .../HystrixAdapterAutoConfiguration.java | 25 +++++- .../DynamicThreadPoolAutoConfiguration.java | 9 +- .../config/NettyClientConfiguration.java | 19 +++- .../starter/core/DiscoveryClient.java | 5 +- .../core/ThreadPoolAdapterRegister.java | 11 ++- .../send/netty/NettyConnectSender.java | 28 ++++-- .../monitor/send/netty/SenderHandler.java | 17 ++++ .../starter/remote/ServerListManager.java | 2 +- .../starter/remote/ServerNettyAgent.java | 25 +++++- 20 files changed, 432 insertions(+), 45 deletions(-) create mode 100644 hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtra.java create mode 100644 hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtraAutoConfiguration.java create mode 100644 hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtraHandle.java create mode 100644 hippo4j-adapter/hippo4j-adapter-base/src/main/resources/META-INF/spring.factories diff --git a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtra.java b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtra.java new file mode 100644 index 00000000..695f2402 --- /dev/null +++ b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtra.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.adapter.base; + +import lombok.extern.slf4j.Slf4j; + +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * thread pool adapter extra. + */ +@Slf4j +public class ThreadPoolAdapterExtra { + + private static final int BLOCKING_QUEUE_CAPACITY = 100; + + private BlockingQueue> blockingQueue; + + public ThreadPoolAdapterExtra() { + blockingQueue = new ArrayBlockingQueue(BLOCKING_QUEUE_CAPACITY); + } + + public void offerQueue(Map map) throws InterruptedException { + blockingQueue.offer(map, 5, TimeUnit.SECONDS); + } + + public void extraStart(ThreadPoolAdapterExtraHandle threadPoolAdapterExtraHandle) { + new Thread(() -> { + try { + for (;;) { + Map map = blockingQueue.take(); + threadPoolAdapterExtraHandle.execute(map); + } + } catch (InterruptedException e) { + log.error("extraStart error", e); + } + }, "threadPoolAdapterExtra").start(); + } +} diff --git a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtraAutoConfiguration.java b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtraAutoConfiguration.java new file mode 100644 index 00000000..a856a52b --- /dev/null +++ b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtraAutoConfiguration.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.adapter.base; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * thread pool adapter extra auto configuration. + */ +@Configuration(proxyBeanMethods = false) +public class ThreadPoolAdapterExtraAutoConfiguration { + + @Bean + public ThreadPoolAdapterExtra threadPoolAdapterExtra() { + return new ThreadPoolAdapterExtra(); + } + +} diff --git a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtraHandle.java b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtraHandle.java new file mode 100644 index 00000000..a6cde2e7 --- /dev/null +++ b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtraHandle.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.adapter.base; + +import java.util.Map; + +/** + * Thread Pool Adapter Extra Handle + **/ +@FunctionalInterface +public interface ThreadPoolAdapterExtraHandle { + + void execute(Map map); +} diff --git a/hippo4j-adapter/hippo4j-adapter-base/src/main/resources/META-INF/spring.factories b/hippo4j-adapter/hippo4j-adapter-base/src/main/resources/META-INF/spring.factories new file mode 100644 index 00000000..4236a045 --- /dev/null +++ b/hippo4j-adapter/hippo4j-adapter-base/src/main/resources/META-INF/spring.factories @@ -0,0 +1 @@ +org.springframework.boot.autoconfigure.EnableAutoConfiguration=cn.hippo4j.adapter.base.ThreadPoolAdapterExtraAutoConfiguration \ No newline at end of file diff --git a/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java index 1fc1e336..fd2bd316 100644 --- a/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java +++ b/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java @@ -18,20 +18,29 @@ package cn.hippo4j.adapter.hystrix; import cn.hippo4j.adapter.base.ThreadPoolAdapter; +import cn.hippo4j.adapter.base.ThreadPoolAdapterExtra; import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter; import cn.hippo4j.adapter.base.ThreadPoolAdapterState; +import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.toolkit.CollectionUtil; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.netflix.hystrix.HystrixThreadPool; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.context.event.ApplicationStartedEvent; import org.springframework.context.ApplicationListener; import java.lang.reflect.Field; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER; @@ -45,8 +54,25 @@ public class HystrixThreadPoolAdapter implements ThreadPoolAdapter, ApplicationL private static final String THREAD_POOLS_FIELD = "threadPools"; + private static final int TASK_INTERVAL_SECONDS = 2; + private final Map HYSTRIX_CONSUME_EXECUTOR = Maps.newHashMap(); + private final ScheduledExecutorService scheduler; + + private ThreadPoolAdapterExtra threadPoolAdapterExtra; + + public HystrixThreadPoolAdapter(ThreadPoolAdapterExtra threadPoolAdapterExtra) { + + this.threadPoolAdapterExtra = threadPoolAdapterExtra; + + scheduler = new ScheduledThreadPoolExecutor(2, + new ThreadFactoryBuilder() + .setNameFormat("hystrixThreadPoolAdapter") + .setDaemon(true) + .build()); + } + @Override public String mark() { return "hystrix"; @@ -55,17 +81,24 @@ public class HystrixThreadPoolAdapter implements ThreadPoolAdapter, ApplicationL @Override public ThreadPoolAdapterState getThreadPoolState(String identify) { ThreadPoolAdapterState result = new ThreadPoolAdapterState(); - ThreadPoolExecutor rocketMQConsumeExecutor = HYSTRIX_CONSUME_EXECUTOR.get(identify); - if (rocketMQConsumeExecutor != null) { + ThreadPoolExecutor threadPoolExecutor = HYSTRIX_CONSUME_EXECUTOR.get(identify); + if (threadPoolExecutor != null) { result.setThreadPoolKey(identify); - result.setCoreSize(rocketMQConsumeExecutor.getCorePoolSize()); - result.setMaximumSize(rocketMQConsumeExecutor.getMaximumPoolSize()); + result.setCoreSize(threadPoolExecutor.getCorePoolSize()); + result.setMaximumSize(threadPoolExecutor.getMaximumPoolSize()); return result; } log.warn("[{}] hystrix thread pool not found.", identify); return result; } + @Override + public List getThreadPoolStates() { + List threadPoolAdapterStates = new ArrayList<>(); + HYSTRIX_CONSUME_EXECUTOR.forEach((kel, val) -> threadPoolAdapterStates.add(getThreadPoolState(String.valueOf(val)))); + return threadPoolAdapterStates; + } + @Override public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) { String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey(); @@ -87,32 +120,65 @@ public class HystrixThreadPoolAdapter implements ThreadPoolAdapter, ApplicationL @Override public void onApplicationEvent(ApplicationStartedEvent event) { + HystrixThreadPoolRefreshTask hystrixThreadPoolRefreshTask = new HystrixThreadPoolRefreshTask(scheduler); + scheduler.schedule(hystrixThreadPoolRefreshTask, TASK_INTERVAL_SECONDS, TimeUnit.SECONDS); + } + + public void hystrixThreadPoolRefresh() { try { + boolean addExtraFlag = false; Class factoryClass = HystrixThreadPool.Factory.class; Field threadPoolsField = factoryClass.getDeclaredField(THREAD_POOLS_FIELD); threadPoolsField.setAccessible(true); ConcurrentHashMap threadPools = - (ConcurrentHashMap)threadPoolsField.get(factoryClass); + (ConcurrentHashMap) threadPoolsField.get(factoryClass); if (CollectionUtil.isNotEmpty(threadPools)) { for (Map.Entry stringHystrixThreadPoolEntry : threadPools.entrySet()) { String key = stringHystrixThreadPoolEntry.getKey(); HystrixThreadPool value = stringHystrixThreadPoolEntry.getValue(); if (value instanceof HystrixThreadPool.HystrixThreadPoolDefault) { HystrixThreadPool.HystrixThreadPoolDefault hystrixThreadPoolDefault = - (HystrixThreadPool.HystrixThreadPoolDefault)value; + (HystrixThreadPool.HystrixThreadPoolDefault) value; Class hystrixThreadPoolDefaultClass = hystrixThreadPoolDefault.getClass(); Field threadPoolField = hystrixThreadPoolDefaultClass.getDeclaredField(THREAD_POOL_FIELD); threadPoolField.setAccessible(true); ThreadPoolExecutor threadPoolExecutor = - (ThreadPoolExecutor)threadPoolField.get(hystrixThreadPoolDefault); - if (threadPoolExecutor != null) { - HYSTRIX_CONSUME_EXECUTOR.put(key,threadPoolExecutor); + (ThreadPoolExecutor) threadPoolField.get(hystrixThreadPoolDefault); + if (threadPoolExecutor != null && HYSTRIX_CONSUME_EXECUTOR.get(key) == null) { + HYSTRIX_CONSUME_EXECUTOR.put(key, threadPoolExecutor); + addExtraFlag = true; } } } } - }catch (Exception e) { + if (addExtraFlag) { + Map map = Maps.newHashMap(); + map.putAll(ApplicationContextHolder.getBeansOfType(HystrixThreadPoolAdapter.class)); + threadPoolAdapterExtra.offerQueue(map); + } + } catch (Exception e) { log.error("Failed to get Hystrix thread pool.", e); } + + } + + class HystrixThreadPoolRefreshTask implements Runnable { + + private ScheduledExecutorService scheduler; + + public HystrixThreadPoolRefreshTask(ScheduledExecutorService scheduler) { + this.scheduler = scheduler; + } + + @Override + public void run() { + try { + hystrixThreadPoolRefresh(); + } finally { + if (!scheduler.isShutdown()) { + scheduler.schedule(this, TASK_INTERVAL_SECONDS, TimeUnit.MILLISECONDS); + } + } + } } } diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/config/NettyServerConfig.java b/hippo4j-config/src/main/java/cn/hippo4j/config/config/NettyServerConfig.java index 57373cd4..d326dff1 100644 --- a/hippo4j-config/src/main/java/cn/hippo4j/config/config/NettyServerConfig.java +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/config/NettyServerConfig.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cn.hippo4j.config.config; import cn.hippo4j.config.netty.MonitorNettyServer; @@ -11,12 +28,12 @@ import org.springframework.context.annotation.Configuration; public class NettyServerConfig { @Bean - public EventLoopGroup bossGroup(){ + public EventLoopGroup bossGroup() { return new NioEventLoopGroup(); } @Bean - public EventLoopGroup workGroup(){ + public EventLoopGroup workGroup() { return new NioEventLoopGroup(); } @@ -24,9 +41,7 @@ public class NettyServerConfig { public MonitorNettyServer monitorNettyServer(ServerBootstrapProperties serverBootstrapProperties, HisRunDataService hisRunDataService, EventLoopGroup bossGroup, - EventLoopGroup workGroup){ - return new MonitorNettyServer(serverBootstrapProperties,hisRunDataService,bossGroup,workGroup); + EventLoopGroup workGroup) { + return new MonitorNettyServer(serverBootstrapProperties, hisRunDataService, bossGroup, workGroup); } } - - diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/netty/MonitorNettyServer.java b/hippo4j-config/src/main/java/cn/hippo4j/config/netty/MonitorNettyServer.java index b39e04e5..1a2d1a49 100644 --- a/hippo4j-config/src/main/java/cn/hippo4j/config/netty/MonitorNettyServer.java +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/netty/MonitorNettyServer.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cn.hippo4j.config.netty; import cn.hippo4j.config.config.ServerBootstrapProperties; @@ -44,16 +61,17 @@ public class MonitorNettyServer { private EventLoopGroup workGroup; @PostConstruct - public void nettyServerInit(){ + public void nettyServerInit() { new Thread(() -> { try { ServerBootstrap serverBootstrap = new ServerBootstrap(); - serverBootstrap.group(bossGroup,workGroup) + serverBootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) - //childHandler的任务由workGroup来执行 - //如果是handler,则由bossGroup来执行 - .childHandler(new ChannelInitializer(){ + // childHandler的任务由workGroup来执行 + // 如果是handler,则由bossGroup来执行 + .childHandler(new ChannelInitializer() { + @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); @@ -66,13 +84,13 @@ public class MonitorNettyServer { ChannelFuture channelFuture = serverBootstrap.bind(Integer.parseInt(serverBootstrapProperties.getNettyServerPort())).sync(); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { - log.error("nettyServerInit error",e); + log.error("nettyServerInit error", e); } - },"nettyServerInit thread").start(); + }, "nettyServerInit thread").start(); } @PreDestroy - public void destroy(){ + public void destroy() { bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } diff --git a/hippo4j-config/src/main/java/cn/hippo4j/config/netty/ServerHandler.java b/hippo4j-config/src/main/java/cn/hippo4j/config/netty/ServerHandler.java index c8bce8a6..3f21fe5e 100644 --- a/hippo4j-config/src/main/java/cn/hippo4j/config/netty/ServerHandler.java +++ b/hippo4j-config/src/main/java/cn/hippo4j/config/netty/ServerHandler.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cn.hippo4j.config.netty; import cn.hippo4j.common.monitor.Message; diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/Hippo4jAdapterKafkaExampleApplication.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/Hippo4jAdapterKafkaExampleApplication.java index 0cf6e45c..e4c0e80a 100644 --- a/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/Hippo4jAdapterKafkaExampleApplication.java +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/Hippo4jAdapterKafkaExampleApplication.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cn.hippo4j.springboot.starter.adapter.kafka.example; import cn.hippo4j.core.enable.EnableDynamicThreadPool; diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/consumer/KafkaMessageConsumer.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/consumer/KafkaMessageConsumer.java index 8f6a5821..43087cc9 100644 --- a/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/consumer/KafkaMessageConsumer.java +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/consumer/KafkaMessageConsumer.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cn.hippo4j.springboot.starter.adapter.kafka.example.consumer; import lombok.extern.slf4j.Slf4j; diff --git a/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/produce/KafkaMessageProduce.java b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/produce/KafkaMessageProduce.java index 119fb92c..04d01c71 100644 --- a/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/produce/KafkaMessageProduce.java +++ b/hippo4j-example/hippo4j-spring-boot-starter-adapter-kafka-example/src/main/java/cn/hippo4j/springboot/starter/adapter/kafka/example/produce/KafkaMessageProduce.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cn.hippo4j.springboot.starter.adapter.kafka.example.produce; import cn.hippo4j.common.toolkit.JSONUtil; diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/src/main/java/cn/hippo4j/springboot/starter/adapter/hystrix/HystrixAdapterAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/src/main/java/cn/hippo4j/springboot/starter/adapter/hystrix/HystrixAdapterAutoConfiguration.java index 9e2d1ac2..5d8a9243 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/src/main/java/cn/hippo4j/springboot/starter/adapter/hystrix/HystrixAdapterAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/src/main/java/cn/hippo4j/springboot/starter/adapter/hystrix/HystrixAdapterAutoConfiguration.java @@ -1,7 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cn.hippo4j.springboot.starter.adapter.hystrix; +import cn.hippo4j.adapter.base.ThreadPoolAdapterExtra; +import cn.hippo4j.adapter.base.ThreadPoolAdapterExtraAutoConfiguration; import cn.hippo4j.adapter.hystrix.HystrixThreadPoolAdapter; import cn.hippo4j.common.config.ApplicationContextHolder; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -13,6 +33,7 @@ import org.springframework.context.annotation.Configuration; * @create: 2022-07-15 **/ @Configuration(proxyBeanMethods = false) +@AutoConfigureAfter(ThreadPoolAdapterExtraAutoConfiguration.class) public class HystrixAdapterAutoConfiguration { @Bean @@ -22,7 +43,7 @@ public class HystrixAdapterAutoConfiguration { } @Bean - public HystrixThreadPoolAdapter hystrixThreadPoolAdapter(){ - return new HystrixThreadPoolAdapter(); + public HystrixThreadPoolAdapter hystrixThreadPoolAdapter(ThreadPoolAdapterExtra threadPoolAdapterExtra) { + return new HystrixThreadPoolAdapter(threadPoolAdapterExtra); } } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java index dd8d8c93..d218a7e4 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java @@ -18,6 +18,7 @@ package cn.hippo4j.springboot.starter.config; import cn.hippo4j.adapter.base.ThreadPoolAdapterBeanContainer; +import cn.hippo4j.adapter.base.ThreadPoolAdapterExtra; import cn.hippo4j.common.api.ThreadDetailState; import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.core.config.UtilAutoConfiguration; @@ -64,7 +65,8 @@ import org.springframework.core.env.ConfigurableEnvironment; @ConditionalOnBean(MarkerConfiguration.Marker.class) @EnableConfigurationProperties(BootstrapProperties.class) @ConditionalOnProperty(prefix = BootstrapProperties.PREFIX, value = "enable", matchIfMissing = true, havingValue = "true") -@ImportAutoConfiguration({HttpClientConfiguration.class, NettyClientConfiguration.class, DiscoveryConfiguration.class, MessageNotifyConfiguration.class, UtilAutoConfiguration.class, WebThreadPoolConfiguration.class}) +@ImportAutoConfiguration({HttpClientConfiguration.class, NettyClientConfiguration.class, DiscoveryConfiguration.class, MessageNotifyConfiguration.class, UtilAutoConfiguration.class, + WebThreadPoolConfiguration.class}) public class DynamicThreadPoolAutoConfiguration { private final BootstrapProperties properties; @@ -116,7 +118,6 @@ public class DynamicThreadPoolAutoConfiguration { return new WebThreadPoolRunStateController(threadPoolRunStateHandler, threadDetailState); } - @Bean @ConditionalOnMissingBean @SuppressWarnings("all") @@ -165,7 +166,7 @@ public class DynamicThreadPoolAutoConfiguration { @Bean @SuppressWarnings("all") - public ThreadPoolAdapterRegister threadPoolAdapterRegister(HttpAgent httpAgent, InetUtils hippo4JInetUtils) { - return new ThreadPoolAdapterRegister(httpAgent, properties, environment, hippo4JInetUtils); + public ThreadPoolAdapterRegister threadPoolAdapterRegister(HttpAgent httpAgent, InetUtils hippo4JInetUtils, ThreadPoolAdapterExtra threadPoolAdapterExtra) { + return new ThreadPoolAdapterRegister(httpAgent, properties, environment, hippo4JInetUtils, threadPoolAdapterExtra); } } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/NettyClientConfiguration.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/NettyClientConfiguration.java index 85363404..1ac2a0fa 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/NettyClientConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/NettyClientConfiguration.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cn.hippo4j.springboot.starter.config; import cn.hippo4j.springboot.starter.monitor.send.netty.NettyConnectSender; @@ -22,7 +39,7 @@ public class NettyClientConfiguration { } @Bean - public MessageSender messageSender(ServerNettyAgent serverNettyAgent){ + public MessageSender messageSender(ServerNettyAgent serverNettyAgent) { return new NettyConnectSender(serverNettyAgent); } } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DiscoveryClient.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DiscoveryClient.java index d081f1a1..2beea738 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DiscoveryClient.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DiscoveryClient.java @@ -17,6 +17,7 @@ package cn.hippo4j.springboot.starter.core; +import cn.hippo4j.adapter.base.ThreadPoolAdapter; import cn.hippo4j.common.api.ClientCloseHookExecute; import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.constant.Constants; @@ -31,6 +32,7 @@ import cn.hutool.core.util.StrUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.DisposableBean; +import java.util.Map; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -141,7 +143,8 @@ public class DiscoveryClient implements DisposableBean { boolean success = register(); // TODO Abstract server registration logic ThreadPoolAdapterRegister adapterRegister = ApplicationContextHolder.getBean(ThreadPoolAdapterRegister.class); - adapterRegister.register(); + Map threadPoolAdapterMap = ApplicationContextHolder.getBeansOfType(ThreadPoolAdapter.class); + adapterRegister.register(threadPoolAdapterMap); if (success) { instanceInfo.unsetIsDirty(timestamp); } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java index a51c06ab..6a1e3b1a 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java @@ -19,6 +19,7 @@ package cn.hippo4j.springboot.starter.core; import cn.hippo4j.adapter.base.ThreadPoolAdapter; import cn.hippo4j.adapter.base.ThreadPoolAdapterCacheConfig; +import cn.hippo4j.adapter.base.ThreadPoolAdapterExtra; import cn.hippo4j.adapter.base.ThreadPoolAdapterState; import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.toolkit.CollectionUtil; @@ -56,13 +57,17 @@ public class ThreadPoolAdapterRegister implements ApplicationRunner { private final InetUtils hippo4JInetUtils; + private final ThreadPoolAdapterExtra threadPoolAdapterExtra; + @Override public void run(ApplicationArguments args) throws Exception { - register(); + Map threadPoolAdapterMap = ApplicationContextHolder.getBeansOfType(ThreadPoolAdapter.class); + register(threadPoolAdapterMap); + threadPoolAdapterExtra.extraStart(map -> register(map)); } - public void register() { - Map threadPoolAdapterMap = ApplicationContextHolder.getBeansOfType(ThreadPoolAdapter.class); + public void register(Map threadPoolAdapterMap) { + List cacheConfigList = Lists.newArrayList(); threadPoolAdapterMap.forEach((key, val) -> { List threadPoolStates = val.getThreadPoolStates(); diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/monitor/send/netty/NettyConnectSender.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/monitor/send/netty/NettyConnectSender.java index c30cfa6b..370aaf6b 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/monitor/send/netty/NettyConnectSender.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/monitor/send/netty/NettyConnectSender.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cn.hippo4j.springboot.starter.monitor.send.netty; import cn.hippo4j.common.monitor.Message; @@ -39,7 +56,8 @@ public class NettyConnectSender implements MessageSender { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(eventLoopGroup) .channel(NioSocketChannel.class) - .handler(new ChannelInitializer(){ + .handler(new ChannelInitializer() { + @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); @@ -52,9 +70,9 @@ public class NettyConnectSender implements MessageSender { bootstrap.connect(serverNettyAgent.getNettyServerAddress(), serverNettyAgent.getNettyServerPort()).sync(); } catch (Exception e) { - log.error("netty send error ",e); - } /*finally { - eventLoopGroup.shutdownGracefully(); - }*/ + log.error("netty send error ", e); + } /* + * finally { eventLoopGroup.shutdownGracefully(); } + */ } } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/monitor/send/netty/SenderHandler.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/monitor/send/netty/SenderHandler.java index 72832b52..7cf68542 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/monitor/send/netty/SenderHandler.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/monitor/send/netty/SenderHandler.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cn.hippo4j.springboot.starter.monitor.send.netty; import cn.hippo4j.common.monitor.MessageWrapper; diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/ServerListManager.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/ServerListManager.java index 34f6cfc2..c384e2eb 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/ServerListManager.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/ServerListManager.java @@ -79,7 +79,7 @@ public class ServerListManager { return currentServerAddr; } - public String getNettyServerPort(){ + public String getNettyServerPort() { return nettyServerPort; } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/ServerNettyAgent.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/ServerNettyAgent.java index 52bf884f..a2958173 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/ServerNettyAgent.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/remote/ServerNettyAgent.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package cn.hippo4j.springboot.starter.remote; import cn.hippo4j.springboot.starter.config.BootstrapProperties; @@ -18,21 +35,21 @@ public class ServerNettyAgent { private EventLoopGroup eventLoopGroup; - public ServerNettyAgent(BootstrapProperties properties){ + public ServerNettyAgent(BootstrapProperties properties) { this.dynamicThreadPoolProperties = properties; this.serverListManager = new ServerListManager(dynamicThreadPoolProperties); this.eventLoopGroup = new NioEventLoopGroup(); } - public EventLoopGroup getEventLoopGroup(){ + public EventLoopGroup getEventLoopGroup() { return eventLoopGroup; } public String getNettyServerAddress() { - return serverListManager.getCurrentServerAddr().split(":")[1].replace("//",""); + return serverListManager.getCurrentServerAddr().split(":")[1].replace("//", ""); } - public Integer getNettyServerPort(){ + public Integer getNettyServerPort() { return Integer.parseInt(serverListManager.getNettyServerPort()); } } From 9f00d5ee411c8a16e8fdab321602a4961c64cb2f Mon Sep 17 00:00:00 2001 From: shiming-stars-lk <1031900093@qq.com> Date: Fri, 15 Jul 2022 17:36:34 +0800 Subject: [PATCH 03/10] Hystrix thread pool monitoring optimization --- .../cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java index fd2bd316..48fc670b 100644 --- a/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java +++ b/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java @@ -75,7 +75,7 @@ public class HystrixThreadPoolAdapter implements ThreadPoolAdapter, ApplicationL @Override public String mark() { - return "hystrix"; + return "Hystrix"; } @Override From 694fa24dc5f8220de673546794c8c1b9fb4e42b1 Mon Sep 17 00:00:00 2001 From: "chen.ma" Date: Fri, 15 Jul 2022 09:09:57 +0800 Subject: [PATCH 04/10] Reconstitute the post processor of the thread pool --- .../core/DynamicThreadPoolPostProcessor.java | 79 +++++++++---------- 1 file changed, 37 insertions(+), 42 deletions(-) diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolPostProcessor.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolPostProcessor.java index 344b809d..effeb8c9 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolPostProcessor.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DynamicThreadPoolPostProcessor.java @@ -51,10 +51,7 @@ import java.util.concurrent.TimeUnit; import static cn.hippo4j.common.constant.Constants.*; /** - * Dynamic threadPool post processor. - * - * @author chen.ma - * @date 2021/8/2 20:40 + * Dynamic thread-pool post processor. */ @Slf4j @AllArgsConstructor @@ -68,18 +65,6 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { private final ServerThreadPoolDynamicRefresh threadPoolDynamicRefresh; - private final ExecutorService executorService = ThreadPoolBuilder.builder() - .corePoolSize(2) - .maxPoolNum(4) - .keepAliveTime(2000) - .timeUnit(TimeUnit.MILLISECONDS) - .workQueue(QueueTypeEnum.ARRAY_BLOCKING_QUEUE) - .capacity(1024) - .allowCoreThreadTimeOut(true) - .threadFactory("client.dynamic.threadPool.change.config") - .rejected(new ThreadPoolExecutor.AbortPolicy()) - .build(); - @Override public Object postProcessBeforeInitialization(Object bean, String beanName) { return bean; @@ -102,8 +87,8 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { log.error("Failed to create dynamic thread pool in annotation mode.", ex); return bean; } - DynamicThreadPoolExecutor dynamicExecutor = (DynamicThreadPoolExecutor) bean; - DynamicThreadPoolWrapper wrap = new DynamicThreadPoolWrapper(dynamicExecutor.getThreadPoolId(), dynamicExecutor); + DynamicThreadPoolExecutor dynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) bean; + DynamicThreadPoolWrapper wrap = new DynamicThreadPoolWrapper(dynamicThreadPoolExecutor.getThreadPoolId(), dynamicThreadPoolExecutor); ThreadPoolExecutor remoteExecutor = fillPoolAndRegister(wrap); subscribeConfig(wrap); return remoteExecutor; @@ -136,49 +121,47 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { queryStrMap.put(TP_ID, tpId); queryStrMap.put(ITEM_ID, properties.getItemId()); queryStrMap.put(NAMESPACE, properties.getNamespace()); - - Result result; boolean isSubscribe = false; - ThreadPoolExecutor newDynamicPoolExecutor = null; - ThreadPoolParameterInfo ppi = new ThreadPoolParameterInfo(); + ThreadPoolExecutor newDynamicThreadPoolExecutor = null; + ThreadPoolParameterInfo threadPoolParameterInfo = new ThreadPoolParameterInfo(); try { - result = httpAgent.httpGetByConfig(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 5000L); + Result result = httpAgent.httpGetByConfig(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 5000L); if (result.isSuccess() && result.getData() != null) { String resultJsonStr = JSONUtil.toJSONString(result.getData()); - if ((ppi = JSONUtil.parseObject(resultJsonStr, ThreadPoolParameterInfo.class)) != null) { + if ((threadPoolParameterInfo = JSONUtil.parseObject(resultJsonStr, ThreadPoolParameterInfo.class)) != null) { // Create a thread pool with relevant parameters. - BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(ppi.getQueueType(), ppi.getCapacity()); - newDynamicPoolExecutor = ThreadPoolBuilder.builder() + BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(threadPoolParameterInfo.getQueueType(), threadPoolParameterInfo.getCapacity()); + newDynamicThreadPoolExecutor = ThreadPoolBuilder.builder() .dynamicPool() .workQueue(workQueue) .threadFactory(tpId) - .poolThreadSize(ppi.getCoreSize(), ppi.getMaxSize()) - .keepAliveTime(ppi.getKeepAliveTime(), TimeUnit.SECONDS) - .rejected(RejectedTypeEnum.createPolicy(ppi.getRejectedType())) - .allowCoreThreadTimeOut(EnableEnum.getBool(ppi.getAllowCoreThreadTimeOut())) + .poolThreadSize(threadPoolParameterInfo.corePoolSizeAdapt(), threadPoolParameterInfo.maximumPoolSizeAdapt()) + .keepAliveTime(threadPoolParameterInfo.getKeepAliveTime(), TimeUnit.SECONDS) + .rejected(RejectedTypeEnum.createPolicy(threadPoolParameterInfo.getRejectedType())) + .allowCoreThreadTimeOut(EnableEnum.getBool(threadPoolParameterInfo.getAllowCoreThreadTimeOut())) .build(); // Set dynamic thread pool enhancement parameters. if (dynamicThreadPoolWrap.getExecutor() instanceof AbstractDynamicExecutorSupport) { ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm( - BooleanUtil.toBoolean(ppi.getIsAlarm().toString()), - ppi.getCapacityAlarm(), - ppi.getLivenessAlarm()); + BooleanUtil.toBoolean(threadPoolParameterInfo.getIsAlarm().toString()), + threadPoolParameterInfo.getCapacityAlarm(), + threadPoolParameterInfo.getLivenessAlarm()); GlobalNotifyAlarmManage.put(tpId, threadPoolNotifyAlarm); TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).getTaskDecorator(); - ((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskDecorator(taskDecorator); + ((DynamicThreadPoolExecutor) newDynamicThreadPoolExecutor).setTaskDecorator(taskDecorator); long awaitTerminationMillis = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).awaitTerminationMillis; boolean waitForTasksToCompleteOnShutdown = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).waitForTasksToCompleteOnShutdown; - ((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown); + ((DynamicThreadPoolExecutor) newDynamicThreadPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown); long executeTimeOut = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).getExecuteTimeOut(); - ((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setExecuteTimeOut(executeTimeOut); + ((DynamicThreadPoolExecutor) newDynamicThreadPoolExecutor).setExecuteTimeOut(executeTimeOut); } - dynamicThreadPoolWrap.setExecutor(newDynamicPoolExecutor); + dynamicThreadPoolWrap.setExecutor(newDynamicThreadPoolExecutor); isSubscribe = true; } } } catch (Exception ex) { - newDynamicPoolExecutor = dynamicThreadPoolWrap.getExecutor() != null ? dynamicThreadPoolWrap.getExecutor() : CommonDynamicThreadPool.getInstance(tpId); - dynamicThreadPoolWrap.setExecutor(newDynamicPoolExecutor); + newDynamicThreadPoolExecutor = dynamicThreadPoolWrap.getExecutor() != null ? dynamicThreadPoolWrap.getExecutor() : CommonDynamicThreadPool.getInstance(tpId); + dynamicThreadPoolWrap.setExecutor(newDynamicThreadPoolExecutor); log.error("Failed to initialize thread pool configuration. error message :: {}", ex.getMessage()); } finally { if (Objects.isNull(dynamicThreadPoolWrap.getExecutor())) { @@ -187,10 +170,22 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { // Set whether to subscribe to the remote thread pool configuration. dynamicThreadPoolWrap.setSubscribeFlag(isSubscribe); } - GlobalThreadPoolManage.register(dynamicThreadPoolWrap.getThreadPoolId(), ppi, dynamicThreadPoolWrap); - return newDynamicPoolExecutor; + GlobalThreadPoolManage.register(dynamicThreadPoolWrap.getThreadPoolId(), threadPoolParameterInfo, dynamicThreadPoolWrap); + return newDynamicThreadPoolExecutor; } + private final ExecutorService configRefreshExecutorService = ThreadPoolBuilder.builder() + .corePoolSize(2) + .maxPoolNum(4) + .keepAliveTime(2000) + .timeUnit(TimeUnit.MILLISECONDS) + .workQueue(QueueTypeEnum.ARRAY_BLOCKING_QUEUE) + .capacity(1024) + .allowCoreThreadTimeOut(true) + .threadFactory("client.dynamic.threadPool.change.config") + .rejected(new ThreadPoolExecutor.AbortPolicy()) + .build(); + /** * Client dynamic thread pool subscription server configuration. * @@ -198,7 +193,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { */ protected void subscribeConfig(DynamicThreadPoolWrapper dynamicThreadPoolWrap) { if (dynamicThreadPoolWrap.isSubscribeFlag()) { - threadPoolOperation.subscribeConfig(dynamicThreadPoolWrap.getThreadPoolId(), executorService, config -> threadPoolDynamicRefresh.dynamicRefresh(config)); + threadPoolOperation.subscribeConfig(dynamicThreadPoolWrap.getThreadPoolId(), configRefreshExecutorService, config -> threadPoolDynamicRefresh.dynamicRefresh(config)); } } } From 241de5bccf0a9e58a2de039ab5738e76a9d86ef5 Mon Sep 17 00:00:00 2001 From: shiming-stars-lk <1031900093@qq.com> Date: Fri, 15 Jul 2022 19:22:51 +0800 Subject: [PATCH 05/10] Hystrix thread pool monitoring optimization --- .../adapter/base/ThreadPoolAdapterExtra.java | 29 +++---- .../hystrix/HystrixThreadPoolAdapter.java | 19 +---- .../starter/core/DiscoveryClient.java | 3 +- .../core/ThreadPoolAdapterRegister.java | 84 +++++++++++++++++-- 4 files changed, 93 insertions(+), 42 deletions(-) diff --git a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtra.java b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtra.java index 695f2402..e12fbeaf 100644 --- a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtra.java +++ b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtra.java @@ -17,11 +17,14 @@ package cn.hippo4j.adapter.base; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** @@ -30,28 +33,18 @@ import java.util.concurrent.TimeUnit; @Slf4j public class ThreadPoolAdapterExtra { - private static final int BLOCKING_QUEUE_CAPACITY = 100; + private final ScheduledExecutorService scheduler; - private BlockingQueue> blockingQueue; public ThreadPoolAdapterExtra() { - blockingQueue = new ArrayBlockingQueue(BLOCKING_QUEUE_CAPACITY); + scheduler = new ScheduledThreadPoolExecutor(2, + new ThreadFactoryBuilder() + .setNameFormat("threadPoolAdapter") + .setDaemon(true) + .build()); } - public void offerQueue(Map map) throws InterruptedException { - blockingQueue.offer(map, 5, TimeUnit.SECONDS); - } - - public void extraStart(ThreadPoolAdapterExtraHandle threadPoolAdapterExtraHandle) { - new Thread(() -> { - try { - for (;;) { - Map map = blockingQueue.take(); - threadPoolAdapterExtraHandle.execute(map); - } - } catch (InterruptedException e) { - log.error("extraStart error", e); - } - }, "threadPoolAdapterExtra").start(); + public ScheduledExecutorService getScheduler() { + return scheduler; } } diff --git a/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java index 48fc670b..e0f53040 100644 --- a/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java +++ b/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java @@ -58,19 +58,12 @@ public class HystrixThreadPoolAdapter implements ThreadPoolAdapter, ApplicationL private final Map HYSTRIX_CONSUME_EXECUTOR = Maps.newHashMap(); - private final ScheduledExecutorService scheduler; - private ThreadPoolAdapterExtra threadPoolAdapterExtra; public HystrixThreadPoolAdapter(ThreadPoolAdapterExtra threadPoolAdapterExtra) { this.threadPoolAdapterExtra = threadPoolAdapterExtra; - scheduler = new ScheduledThreadPoolExecutor(2, - new ThreadFactoryBuilder() - .setNameFormat("hystrixThreadPoolAdapter") - .setDaemon(true) - .build()); } @Override @@ -120,13 +113,13 @@ public class HystrixThreadPoolAdapter implements ThreadPoolAdapter, ApplicationL @Override public void onApplicationEvent(ApplicationStartedEvent event) { + ScheduledExecutorService scheduler = threadPoolAdapterExtra.getScheduler(); HystrixThreadPoolRefreshTask hystrixThreadPoolRefreshTask = new HystrixThreadPoolRefreshTask(scheduler); scheduler.schedule(hystrixThreadPoolRefreshTask, TASK_INTERVAL_SECONDS, TimeUnit.SECONDS); } public void hystrixThreadPoolRefresh() { try { - boolean addExtraFlag = false; Class factoryClass = HystrixThreadPool.Factory.class; Field threadPoolsField = factoryClass.getDeclaredField(THREAD_POOLS_FIELD); threadPoolsField.setAccessible(true); @@ -144,18 +137,10 @@ public class HystrixThreadPoolAdapter implements ThreadPoolAdapter, ApplicationL threadPoolField.setAccessible(true); ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) threadPoolField.get(hystrixThreadPoolDefault); - if (threadPoolExecutor != null && HYSTRIX_CONSUME_EXECUTOR.get(key) == null) { - HYSTRIX_CONSUME_EXECUTOR.put(key, threadPoolExecutor); - addExtraFlag = true; - } + HYSTRIX_CONSUME_EXECUTOR.put(key, threadPoolExecutor); } } } - if (addExtraFlag) { - Map map = Maps.newHashMap(); - map.putAll(ApplicationContextHolder.getBeansOfType(HystrixThreadPoolAdapter.class)); - threadPoolAdapterExtra.offerQueue(map); - } } catch (Exception e) { log.error("Failed to get Hystrix thread pool.", e); } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DiscoveryClient.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DiscoveryClient.java index 2beea738..247ca276 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DiscoveryClient.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/DiscoveryClient.java @@ -143,8 +143,7 @@ public class DiscoveryClient implements DisposableBean { boolean success = register(); // TODO Abstract server registration logic ThreadPoolAdapterRegister adapterRegister = ApplicationContextHolder.getBean(ThreadPoolAdapterRegister.class); - Map threadPoolAdapterMap = ApplicationContextHolder.getBeansOfType(ThreadPoolAdapter.class); - adapterRegister.register(threadPoolAdapterMap); + adapterRegister.register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java index 6a1e3b1a..b2c9b8fd 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java @@ -38,6 +38,9 @@ import org.springframework.core.env.ConfigurableEnvironment; import java.util.List; import java.util.Map; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL; import static cn.hippo4j.common.constant.Constants.REGISTER_ADAPTER_PATH; @@ -59,15 +62,24 @@ public class ThreadPoolAdapterRegister implements ApplicationRunner { private final ThreadPoolAdapterExtra threadPoolAdapterExtra; + private List cacheConfigList = Lists.newArrayList(); + + private static final int TASK_INTERVAL_SECONDS = 2; + + + @Override public void run(ApplicationArguments args) throws Exception { - Map threadPoolAdapterMap = ApplicationContextHolder.getBeansOfType(ThreadPoolAdapter.class); - register(threadPoolAdapterMap); - threadPoolAdapterExtra.extraStart(map -> register(map)); - } - public void register(Map threadPoolAdapterMap) { + ScheduledExecutorService scheduler = threadPoolAdapterExtra.getScheduler(); + + ThreadPoolAdapterRegisterTask threadPoolAdapterRegisterTask = new ThreadPoolAdapterRegisterTask(scheduler); + + scheduler.schedule(threadPoolAdapterRegisterTask, TASK_INTERVAL_SECONDS, TimeUnit.SECONDS); + } + public List getThreadPoolAdapterCacheConfigs(){ + Map threadPoolAdapterMap = ApplicationContextHolder.getBeansOfType(ThreadPoolAdapter.class); List cacheConfigList = Lists.newArrayList(); threadPoolAdapterMap.forEach((key, val) -> { List threadPoolStates = val.getThreadPoolStates(); @@ -84,6 +96,10 @@ public class ThreadPoolAdapterRegister implements ApplicationRunner { cacheConfig.setThreadPoolAdapterStates(threadPoolStates); cacheConfigList.add(cacheConfig); }); + return cacheConfigList; + } + + public void doRegister(List cacheConfigList){ if (CollectionUtil.isNotEmpty(cacheConfigList)) { try { Result result = httpAgent.httpPost(REGISTER_ADAPTER_PATH, cacheConfigList); @@ -95,4 +111,62 @@ public class ThreadPoolAdapterRegister implements ApplicationRunner { } } } + + public void register() { + List threadPoolAdapterCacheConfigs = getThreadPoolAdapterCacheConfigs(); + doRegister(threadPoolAdapterCacheConfigs); + } + + class ThreadPoolAdapterRegisterTask implements Runnable{ + + private ScheduledExecutorService scheduler; + + public ThreadPoolAdapterRegisterTask(ScheduledExecutorService scheduler){ + this.scheduler = scheduler; + } + + @Override + public void run() { + try { + boolean registerFlag = false; + + List newThreadPoolAdapterCacheConfigs = getThreadPoolAdapterCacheConfigs(); + + Map> newThreadPoolAdapterCacheConfigMap = + newThreadPoolAdapterCacheConfigs.stream().collect(Collectors.toMap( + ThreadPoolAdapterCacheConfig::getMark, ThreadPoolAdapterCacheConfig::getThreadPoolAdapterStates, (k1, k2) -> k2)); + + Map> oldThreadPoolAdapterCacheConfigMap = + cacheConfigList.stream().collect(Collectors.toMap( + ThreadPoolAdapterCacheConfig::getMark, ThreadPoolAdapterCacheConfig::getThreadPoolAdapterStates, (k1, k2) -> k2)); + + for (Map.Entry> entry : newThreadPoolAdapterCacheConfigMap.entrySet()) { + String key = entry.getKey(); + List newValue = entry.getValue(); + List oldValue = oldThreadPoolAdapterCacheConfigMap.get(key); + if (oldValue == null) { + registerFlag = true; + break; + }else { + if (newValue.size() != oldValue.size()) { + registerFlag = true; + break; + } + } + } + + cacheConfigList = newThreadPoolAdapterCacheConfigs; + + if (registerFlag) { + doRegister(cacheConfigList); + } + }catch (Exception e){ + log.error("Register Task Error",e); + }finally { + if (!scheduler.isShutdown()) { + scheduler.schedule(this, TASK_INTERVAL_SECONDS, TimeUnit.MILLISECONDS); + } + } + } + } } From 36e7f649265a0900a2e5c951fca507f4bdfa312e Mon Sep 17 00:00:00 2001 From: shiming-stars-lk <1031900093@qq.com> Date: Fri, 15 Jul 2022 19:45:51 +0800 Subject: [PATCH 06/10] Hystrix thread pool monitoring optimization --- .../springboot/starter/core/ThreadPoolAdapterRegister.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java index b2c9b8fd..f855dc9f 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java @@ -31,6 +31,7 @@ import cn.hippo4j.springboot.starter.remote.HttpAgent; import cn.hippo4j.springboot.starter.toolkit.CloudCommonIdUtil; import com.google.common.collect.Lists; import lombok.AllArgsConstructor; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; @@ -50,6 +51,7 @@ import static cn.hippo4j.common.constant.Constants.REGISTER_ADAPTER_PATH; */ @Slf4j @AllArgsConstructor +@RequiredArgsConstructor public class ThreadPoolAdapterRegister implements ApplicationRunner { private final HttpAgent httpAgent; From c1e5b4a6178b0e9c0baa199eded448cb8e4fed26 Mon Sep 17 00:00:00 2001 From: shiming-stars-lk <1031900093@qq.com> Date: Fri, 15 Jul 2022 19:50:44 +0800 Subject: [PATCH 07/10] Hystrix thread pool monitoring optimization --- ...readPoolAdapterExtraAutoConfiguration.java | 4 +-- .../base/ThreadPoolAdapterExtraHandle.java | 29 ------------------- ...a.java => ThreadPoolAdapterScheduler.java} | 17 ++++++----- .../hystrix/HystrixThreadPoolAdapter.java | 29 +++++++++---------- .../HystrixAdapterAutoConfiguration.java | 6 ++-- .../DynamicThreadPoolAutoConfiguration.java | 6 ++-- .../core/ThreadPoolAdapterRegister.java | 6 ++-- 7 files changed, 33 insertions(+), 64 deletions(-) delete mode 100644 hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtraHandle.java rename hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/{ThreadPoolAdapterExtra.java => ThreadPoolAdapterScheduler.java} (84%) diff --git a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtraAutoConfiguration.java b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtraAutoConfiguration.java index a856a52b..750b4879 100644 --- a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtraAutoConfiguration.java +++ b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtraAutoConfiguration.java @@ -27,8 +27,8 @@ import org.springframework.context.annotation.Configuration; public class ThreadPoolAdapterExtraAutoConfiguration { @Bean - public ThreadPoolAdapterExtra threadPoolAdapterExtra() { - return new ThreadPoolAdapterExtra(); + public ThreadPoolAdapterScheduler threadPoolAdapterExtra() { + return new ThreadPoolAdapterScheduler(); } } diff --git a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtraHandle.java b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtraHandle.java deleted file mode 100644 index a6cde2e7..00000000 --- a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtraHandle.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cn.hippo4j.adapter.base; - -import java.util.Map; - -/** - * Thread Pool Adapter Extra Handle - **/ -@FunctionalInterface -public interface ThreadPoolAdapterExtraHandle { - - void execute(Map map); -} diff --git a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtra.java b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterScheduler.java similarity index 84% rename from hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtra.java rename to hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterScheduler.java index e12fbeaf..2ce8a5d6 100644 --- a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtra.java +++ b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterScheduler.java @@ -19,24 +19,21 @@ package cn.hippo4j.adapter.base; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; - -import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; /** - * thread pool adapter extra. + * thread pool adapter schedule. */ @Slf4j -public class ThreadPoolAdapterExtra { +public class ThreadPoolAdapterScheduler { + + private static final int TASK_INTERVAL_SECONDS = 2; private final ScheduledExecutorService scheduler; - public ThreadPoolAdapterExtra() { + public ThreadPoolAdapterScheduler() { scheduler = new ScheduledThreadPoolExecutor(2, new ThreadFactoryBuilder() .setNameFormat("threadPoolAdapter") @@ -47,4 +44,8 @@ public class ThreadPoolAdapterExtra { public ScheduledExecutorService getScheduler() { return scheduler; } + + public int getTaskIntervalSeconds(){ + return TASK_INTERVAL_SECONDS; + } } diff --git a/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java b/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java index e0f53040..2f2e2b1f 100644 --- a/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java +++ b/hippo4j-adapter/hippo4j-adapter-hystrix/src/main/java/cn/hippo4j/adapter/hystrix/HystrixThreadPoolAdapter.java @@ -18,13 +18,11 @@ package cn.hippo4j.adapter.hystrix; import cn.hippo4j.adapter.base.ThreadPoolAdapter; -import cn.hippo4j.adapter.base.ThreadPoolAdapterExtra; +import cn.hippo4j.adapter.base.ThreadPoolAdapterScheduler; import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter; import cn.hippo4j.adapter.base.ThreadPoolAdapterState; -import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.toolkit.CollectionUtil; import com.google.common.collect.Maps; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.netflix.hystrix.HystrixThreadPool; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.context.event.ApplicationStartedEvent; @@ -32,13 +30,10 @@ import org.springframework.context.ApplicationListener; import java.lang.reflect.Field; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -54,15 +49,13 @@ public class HystrixThreadPoolAdapter implements ThreadPoolAdapter, ApplicationL private static final String THREAD_POOLS_FIELD = "threadPools"; - private static final int TASK_INTERVAL_SECONDS = 2; - private final Map HYSTRIX_CONSUME_EXECUTOR = Maps.newHashMap(); - private ThreadPoolAdapterExtra threadPoolAdapterExtra; + private ThreadPoolAdapterScheduler threadPoolAdapterScheduler; - public HystrixThreadPoolAdapter(ThreadPoolAdapterExtra threadPoolAdapterExtra) { + public HystrixThreadPoolAdapter(ThreadPoolAdapterScheduler threadPoolAdapterScheduler) { - this.threadPoolAdapterExtra = threadPoolAdapterExtra; + this.threadPoolAdapterScheduler = threadPoolAdapterScheduler; } @@ -113,9 +106,10 @@ public class HystrixThreadPoolAdapter implements ThreadPoolAdapter, ApplicationL @Override public void onApplicationEvent(ApplicationStartedEvent event) { - ScheduledExecutorService scheduler = threadPoolAdapterExtra.getScheduler(); - HystrixThreadPoolRefreshTask hystrixThreadPoolRefreshTask = new HystrixThreadPoolRefreshTask(scheduler); - scheduler.schedule(hystrixThreadPoolRefreshTask, TASK_INTERVAL_SECONDS, TimeUnit.SECONDS); + ScheduledExecutorService scheduler = threadPoolAdapterScheduler.getScheduler(); + int taskIntervalSeconds = threadPoolAdapterScheduler.getTaskIntervalSeconds(); + HystrixThreadPoolRefreshTask hystrixThreadPoolRefreshTask = new HystrixThreadPoolRefreshTask(scheduler,taskIntervalSeconds); + scheduler.schedule(hystrixThreadPoolRefreshTask, taskIntervalSeconds, TimeUnit.SECONDS); } public void hystrixThreadPoolRefresh() { @@ -151,8 +145,11 @@ public class HystrixThreadPoolAdapter implements ThreadPoolAdapter, ApplicationL private ScheduledExecutorService scheduler; - public HystrixThreadPoolRefreshTask(ScheduledExecutorService scheduler) { + private int taskIntervalSeconds; + + public HystrixThreadPoolRefreshTask(ScheduledExecutorService scheduler, int taskIntervalSeconds) { this.scheduler = scheduler; + this.taskIntervalSeconds = taskIntervalSeconds; } @Override @@ -161,7 +158,7 @@ public class HystrixThreadPoolAdapter implements ThreadPoolAdapter, ApplicationL hystrixThreadPoolRefresh(); } finally { if (!scheduler.isShutdown()) { - scheduler.schedule(this, TASK_INTERVAL_SECONDS, TimeUnit.MILLISECONDS); + scheduler.schedule(this, taskIntervalSeconds, TimeUnit.MILLISECONDS); } } } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/src/main/java/cn/hippo4j/springboot/starter/adapter/hystrix/HystrixAdapterAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/src/main/java/cn/hippo4j/springboot/starter/adapter/hystrix/HystrixAdapterAutoConfiguration.java index 5d8a9243..2873f04d 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/src/main/java/cn/hippo4j/springboot/starter/adapter/hystrix/HystrixAdapterAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/src/main/java/cn/hippo4j/springboot/starter/adapter/hystrix/HystrixAdapterAutoConfiguration.java @@ -17,7 +17,7 @@ package cn.hippo4j.springboot.starter.adapter.hystrix; -import cn.hippo4j.adapter.base.ThreadPoolAdapterExtra; +import cn.hippo4j.adapter.base.ThreadPoolAdapterScheduler; import cn.hippo4j.adapter.base.ThreadPoolAdapterExtraAutoConfiguration; import cn.hippo4j.adapter.hystrix.HystrixThreadPoolAdapter; import cn.hippo4j.common.config.ApplicationContextHolder; @@ -43,7 +43,7 @@ public class HystrixAdapterAutoConfiguration { } @Bean - public HystrixThreadPoolAdapter hystrixThreadPoolAdapter(ThreadPoolAdapterExtra threadPoolAdapterExtra) { - return new HystrixThreadPoolAdapter(threadPoolAdapterExtra); + public HystrixThreadPoolAdapter hystrixThreadPoolAdapter(ThreadPoolAdapterScheduler threadPoolAdapterScheduler) { + return new HystrixThreadPoolAdapter(threadPoolAdapterScheduler); } } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java index d218a7e4..f8b512ba 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/config/DynamicThreadPoolAutoConfiguration.java @@ -18,7 +18,7 @@ package cn.hippo4j.springboot.starter.config; import cn.hippo4j.adapter.base.ThreadPoolAdapterBeanContainer; -import cn.hippo4j.adapter.base.ThreadPoolAdapterExtra; +import cn.hippo4j.adapter.base.ThreadPoolAdapterScheduler; import cn.hippo4j.common.api.ThreadDetailState; import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.core.config.UtilAutoConfiguration; @@ -166,7 +166,7 @@ public class DynamicThreadPoolAutoConfiguration { @Bean @SuppressWarnings("all") - public ThreadPoolAdapterRegister threadPoolAdapterRegister(HttpAgent httpAgent, InetUtils hippo4JInetUtils, ThreadPoolAdapterExtra threadPoolAdapterExtra) { - return new ThreadPoolAdapterRegister(httpAgent, properties, environment, hippo4JInetUtils, threadPoolAdapterExtra); + public ThreadPoolAdapterRegister threadPoolAdapterRegister(HttpAgent httpAgent, InetUtils hippo4JInetUtils, ThreadPoolAdapterScheduler threadPoolAdapterScheduler) { + return new ThreadPoolAdapterRegister(httpAgent, properties, environment, hippo4JInetUtils, threadPoolAdapterScheduler); } } diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java index f855dc9f..bb39ea71 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java @@ -19,7 +19,7 @@ package cn.hippo4j.springboot.starter.core; import cn.hippo4j.adapter.base.ThreadPoolAdapter; import cn.hippo4j.adapter.base.ThreadPoolAdapterCacheConfig; -import cn.hippo4j.adapter.base.ThreadPoolAdapterExtra; +import cn.hippo4j.adapter.base.ThreadPoolAdapterScheduler; import cn.hippo4j.adapter.base.ThreadPoolAdapterState; import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.toolkit.CollectionUtil; @@ -62,7 +62,7 @@ public class ThreadPoolAdapterRegister implements ApplicationRunner { private final InetUtils hippo4JInetUtils; - private final ThreadPoolAdapterExtra threadPoolAdapterExtra; + private final ThreadPoolAdapterScheduler threadPoolAdapterScheduler; private List cacheConfigList = Lists.newArrayList(); @@ -73,7 +73,7 @@ public class ThreadPoolAdapterRegister implements ApplicationRunner { @Override public void run(ApplicationArguments args) throws Exception { - ScheduledExecutorService scheduler = threadPoolAdapterExtra.getScheduler(); + ScheduledExecutorService scheduler = threadPoolAdapterScheduler.getScheduler(); ThreadPoolAdapterRegisterTask threadPoolAdapterRegisterTask = new ThreadPoolAdapterRegisterTask(scheduler); From 45fd3d846e553e9049adce93581a48b2f4184c9d Mon Sep 17 00:00:00 2001 From: shiming-stars-lk <1031900093@qq.com> Date: Fri, 15 Jul 2022 19:53:09 +0800 Subject: [PATCH 08/10] Hystrix thread pool monitoring optimization --- ...n.java => ThreadPoolAdapterScheduleAutoConfiguration.java} | 2 +- .../src/main/resources/META-INF/spring.factories | 2 +- .../adapter/hystrix/HystrixAdapterAutoConfiguration.java | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) rename hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/{ThreadPoolAdapterExtraAutoConfiguration.java => ThreadPoolAdapterScheduleAutoConfiguration.java} (95%) diff --git a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtraAutoConfiguration.java b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterScheduleAutoConfiguration.java similarity index 95% rename from hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtraAutoConfiguration.java rename to hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterScheduleAutoConfiguration.java index 750b4879..1ca62b4d 100644 --- a/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterExtraAutoConfiguration.java +++ b/hippo4j-adapter/hippo4j-adapter-base/src/main/java/cn/hippo4j/adapter/base/ThreadPoolAdapterScheduleAutoConfiguration.java @@ -24,7 +24,7 @@ import org.springframework.context.annotation.Configuration; * thread pool adapter extra auto configuration. */ @Configuration(proxyBeanMethods = false) -public class ThreadPoolAdapterExtraAutoConfiguration { +public class ThreadPoolAdapterScheduleAutoConfiguration { @Bean public ThreadPoolAdapterScheduler threadPoolAdapterExtra() { diff --git a/hippo4j-adapter/hippo4j-adapter-base/src/main/resources/META-INF/spring.factories b/hippo4j-adapter/hippo4j-adapter-base/src/main/resources/META-INF/spring.factories index 4236a045..ff8c4884 100644 --- a/hippo4j-adapter/hippo4j-adapter-base/src/main/resources/META-INF/spring.factories +++ b/hippo4j-adapter/hippo4j-adapter-base/src/main/resources/META-INF/spring.factories @@ -1 +1 @@ -org.springframework.boot.autoconfigure.EnableAutoConfiguration=cn.hippo4j.adapter.base.ThreadPoolAdapterExtraAutoConfiguration \ No newline at end of file +org.springframework.boot.autoconfigure.EnableAutoConfiguration=cn.hippo4j.adapter.base.ThreadPoolAdapterScheduleAutoConfiguration \ No newline at end of file diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/src/main/java/cn/hippo4j/springboot/starter/adapter/hystrix/HystrixAdapterAutoConfiguration.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/src/main/java/cn/hippo4j/springboot/starter/adapter/hystrix/HystrixAdapterAutoConfiguration.java index 2873f04d..26800ee6 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/src/main/java/cn/hippo4j/springboot/starter/adapter/hystrix/HystrixAdapterAutoConfiguration.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter-adapter/hippo4j-spring-boot-starter-adapter-hystrix/src/main/java/cn/hippo4j/springboot/starter/adapter/hystrix/HystrixAdapterAutoConfiguration.java @@ -18,7 +18,7 @@ package cn.hippo4j.springboot.starter.adapter.hystrix; import cn.hippo4j.adapter.base.ThreadPoolAdapterScheduler; -import cn.hippo4j.adapter.base.ThreadPoolAdapterExtraAutoConfiguration; +import cn.hippo4j.adapter.base.ThreadPoolAdapterScheduleAutoConfiguration; import cn.hippo4j.adapter.hystrix.HystrixThreadPoolAdapter; import cn.hippo4j.common.config.ApplicationContextHolder; import org.springframework.boot.autoconfigure.AutoConfigureAfter; @@ -33,7 +33,7 @@ import org.springframework.context.annotation.Configuration; * @create: 2022-07-15 **/ @Configuration(proxyBeanMethods = false) -@AutoConfigureAfter(ThreadPoolAdapterExtraAutoConfiguration.class) +@AutoConfigureAfter(ThreadPoolAdapterScheduleAutoConfiguration.class) public class HystrixAdapterAutoConfiguration { @Bean From 23daaa426f38243e7f5e0d0eb153b675a234b0eb Mon Sep 17 00:00:00 2001 From: shiming-stars-lk <1031900093@qq.com> Date: Fri, 15 Jul 2022 19:57:09 +0800 Subject: [PATCH 09/10] Hystrix thread pool monitoring optimization --- .../starter/core/ThreadPoolAdapterRegister.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java index bb39ea71..77101f5c 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java @@ -66,18 +66,16 @@ public class ThreadPoolAdapterRegister implements ApplicationRunner { private List cacheConfigList = Lists.newArrayList(); - private static final int TASK_INTERVAL_SECONDS = 2; - @Override public void run(ApplicationArguments args) throws Exception { ScheduledExecutorService scheduler = threadPoolAdapterScheduler.getScheduler(); + int taskIntervalSeconds = threadPoolAdapterScheduler.getTaskIntervalSeconds(); + ThreadPoolAdapterRegisterTask threadPoolAdapterRegisterTask = new ThreadPoolAdapterRegisterTask(scheduler, taskIntervalSeconds); - ThreadPoolAdapterRegisterTask threadPoolAdapterRegisterTask = new ThreadPoolAdapterRegisterTask(scheduler); - - scheduler.schedule(threadPoolAdapterRegisterTask, TASK_INTERVAL_SECONDS, TimeUnit.SECONDS); + scheduler.schedule(threadPoolAdapterRegisterTask, threadPoolAdapterScheduler.getTaskIntervalSeconds(), TimeUnit.SECONDS); } public List getThreadPoolAdapterCacheConfigs(){ @@ -123,8 +121,11 @@ public class ThreadPoolAdapterRegister implements ApplicationRunner { private ScheduledExecutorService scheduler; - public ThreadPoolAdapterRegisterTask(ScheduledExecutorService scheduler){ + private int taskIntervalSeconds; + + public ThreadPoolAdapterRegisterTask(ScheduledExecutorService scheduler, int taskIntervalSeconds){ this.scheduler = scheduler; + this.taskIntervalSeconds = taskIntervalSeconds; } @Override @@ -166,7 +167,7 @@ public class ThreadPoolAdapterRegister implements ApplicationRunner { log.error("Register Task Error",e); }finally { if (!scheduler.isShutdown()) { - scheduler.schedule(this, TASK_INTERVAL_SECONDS, TimeUnit.MILLISECONDS); + scheduler.schedule(this, taskIntervalSeconds, TimeUnit.MILLISECONDS); } } } From 83969bcd786d38499dff66e619305c53f500756d Mon Sep 17 00:00:00 2001 From: shining-stars-lk <1031900093@qq.com> Date: Sat, 16 Jul 2022 10:13:42 +0800 Subject: [PATCH 10/10] Hystrix is optimized for adaptation --- .../core/ThreadPoolAdapterRegister.java | 55 ++++++++++--------- 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java index 77101f5c..eaf33f08 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/core/ThreadPoolAdapterRegister.java @@ -74,7 +74,6 @@ public class ThreadPoolAdapterRegister implements ApplicationRunner { ScheduledExecutorService scheduler = threadPoolAdapterScheduler.getScheduler(); int taskIntervalSeconds = threadPoolAdapterScheduler.getTaskIntervalSeconds(); ThreadPoolAdapterRegisterTask threadPoolAdapterRegisterTask = new ThreadPoolAdapterRegisterTask(scheduler, taskIntervalSeconds); - scheduler.schedule(threadPoolAdapterRegisterTask, threadPoolAdapterScheduler.getTaskIntervalSeconds(), TimeUnit.SECONDS); } @@ -131,32 +130,9 @@ public class ThreadPoolAdapterRegister implements ApplicationRunner { @Override public void run() { try { - boolean registerFlag = false; - List newThreadPoolAdapterCacheConfigs = getThreadPoolAdapterCacheConfigs(); - Map> newThreadPoolAdapterCacheConfigMap = - newThreadPoolAdapterCacheConfigs.stream().collect(Collectors.toMap( - ThreadPoolAdapterCacheConfig::getMark, ThreadPoolAdapterCacheConfig::getThreadPoolAdapterStates, (k1, k2) -> k2)); - - Map> oldThreadPoolAdapterCacheConfigMap = - cacheConfigList.stream().collect(Collectors.toMap( - ThreadPoolAdapterCacheConfig::getMark, ThreadPoolAdapterCacheConfig::getThreadPoolAdapterStates, (k1, k2) -> k2)); - - for (Map.Entry> entry : newThreadPoolAdapterCacheConfigMap.entrySet()) { - String key = entry.getKey(); - List newValue = entry.getValue(); - List oldValue = oldThreadPoolAdapterCacheConfigMap.get(key); - if (oldValue == null) { - registerFlag = true; - break; - }else { - if (newValue.size() != oldValue.size()) { - registerFlag = true; - break; - } - } - } + boolean registerFlag = compareThreadPoolAdapterCacheConfigs(newThreadPoolAdapterCacheConfigs, cacheConfigList); cacheConfigList = newThreadPoolAdapterCacheConfigs; @@ -172,4 +148,33 @@ public class ThreadPoolAdapterRegister implements ApplicationRunner { } } } + + private boolean compareThreadPoolAdapterCacheConfigs(List newThreadPoolAdapterCacheConfigs, + List oldThreadPoolAdapterCacheConfigs){ + boolean registerFlag = false; + + Map> newThreadPoolAdapterCacheConfigMap = + newThreadPoolAdapterCacheConfigs.stream().collect(Collectors.toMap( + ThreadPoolAdapterCacheConfig::getMark, ThreadPoolAdapterCacheConfig::getThreadPoolAdapterStates, (k1, k2) -> k2)); + + Map> oldThreadPoolAdapterCacheConfigMap = + oldThreadPoolAdapterCacheConfigs.stream().collect(Collectors.toMap( + ThreadPoolAdapterCacheConfig::getMark, ThreadPoolAdapterCacheConfig::getThreadPoolAdapterStates, (k1, k2) -> k2)); + + for (Map.Entry> entry : newThreadPoolAdapterCacheConfigMap.entrySet()) { + String key = entry.getKey(); + List newValue = entry.getValue(); + List oldValue = oldThreadPoolAdapterCacheConfigMap.get(key); + if (oldValue == null) { + registerFlag = true; + break; + }else { + if (newValue.size() != oldValue.size()) { + registerFlag = true; + break; + } + } + } + return registerFlag; + } }