Hystrix thread pool monitoring optimization

pull/284/head
shiming-stars-lk 3 years ago committed by shining-stars-lk
parent 694fa24dc5
commit 241de5bccf

@ -17,11 +17,14 @@
package cn.hippo4j.adapter.base; package cn.hippo4j.adapter.base;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
@ -30,28 +33,18 @@ import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
public class ThreadPoolAdapterExtra { public class ThreadPoolAdapterExtra {
private static final int BLOCKING_QUEUE_CAPACITY = 100; private final ScheduledExecutorService scheduler;
private BlockingQueue<Map<String, ThreadPoolAdapter>> blockingQueue;
public ThreadPoolAdapterExtra() { public ThreadPoolAdapterExtra() {
blockingQueue = new ArrayBlockingQueue(BLOCKING_QUEUE_CAPACITY); scheduler = new ScheduledThreadPoolExecutor(2,
new ThreadFactoryBuilder()
.setNameFormat("threadPoolAdapter")
.setDaemon(true)
.build());
} }
public void offerQueue(Map<String, ThreadPoolAdapter> map) throws InterruptedException { public ScheduledExecutorService getScheduler() {
blockingQueue.offer(map, 5, TimeUnit.SECONDS); return scheduler;
}
public void extraStart(ThreadPoolAdapterExtraHandle threadPoolAdapterExtraHandle) {
new Thread(() -> {
try {
for (;;) {
Map<String, ThreadPoolAdapter> map = blockingQueue.take();
threadPoolAdapterExtraHandle.execute(map);
}
} catch (InterruptedException e) {
log.error("extraStart error", e);
}
}, "threadPoolAdapterExtra").start();
} }
} }

@ -58,19 +58,12 @@ public class HystrixThreadPoolAdapter implements ThreadPoolAdapter, ApplicationL
private final Map<String, ThreadPoolExecutor> HYSTRIX_CONSUME_EXECUTOR = Maps.newHashMap(); private final Map<String, ThreadPoolExecutor> HYSTRIX_CONSUME_EXECUTOR = Maps.newHashMap();
private final ScheduledExecutorService scheduler;
private ThreadPoolAdapterExtra threadPoolAdapterExtra; private ThreadPoolAdapterExtra threadPoolAdapterExtra;
public HystrixThreadPoolAdapter(ThreadPoolAdapterExtra threadPoolAdapterExtra) { public HystrixThreadPoolAdapter(ThreadPoolAdapterExtra threadPoolAdapterExtra) {
this.threadPoolAdapterExtra = threadPoolAdapterExtra; this.threadPoolAdapterExtra = threadPoolAdapterExtra;
scheduler = new ScheduledThreadPoolExecutor(2,
new ThreadFactoryBuilder()
.setNameFormat("hystrixThreadPoolAdapter")
.setDaemon(true)
.build());
} }
@Override @Override
@ -120,13 +113,13 @@ public class HystrixThreadPoolAdapter implements ThreadPoolAdapter, ApplicationL
@Override @Override
public void onApplicationEvent(ApplicationStartedEvent event) { public void onApplicationEvent(ApplicationStartedEvent event) {
ScheduledExecutorService scheduler = threadPoolAdapterExtra.getScheduler();
HystrixThreadPoolRefreshTask hystrixThreadPoolRefreshTask = new HystrixThreadPoolRefreshTask(scheduler); HystrixThreadPoolRefreshTask hystrixThreadPoolRefreshTask = new HystrixThreadPoolRefreshTask(scheduler);
scheduler.schedule(hystrixThreadPoolRefreshTask, TASK_INTERVAL_SECONDS, TimeUnit.SECONDS); scheduler.schedule(hystrixThreadPoolRefreshTask, TASK_INTERVAL_SECONDS, TimeUnit.SECONDS);
} }
public void hystrixThreadPoolRefresh() { public void hystrixThreadPoolRefresh() {
try { try {
boolean addExtraFlag = false;
Class<HystrixThreadPool.Factory> factoryClass = HystrixThreadPool.Factory.class; Class<HystrixThreadPool.Factory> factoryClass = HystrixThreadPool.Factory.class;
Field threadPoolsField = factoryClass.getDeclaredField(THREAD_POOLS_FIELD); Field threadPoolsField = factoryClass.getDeclaredField(THREAD_POOLS_FIELD);
threadPoolsField.setAccessible(true); threadPoolsField.setAccessible(true);
@ -144,18 +137,10 @@ public class HystrixThreadPoolAdapter implements ThreadPoolAdapter, ApplicationL
threadPoolField.setAccessible(true); threadPoolField.setAccessible(true);
ThreadPoolExecutor threadPoolExecutor = ThreadPoolExecutor threadPoolExecutor =
(ThreadPoolExecutor) threadPoolField.get(hystrixThreadPoolDefault); (ThreadPoolExecutor) threadPoolField.get(hystrixThreadPoolDefault);
if (threadPoolExecutor != null && HYSTRIX_CONSUME_EXECUTOR.get(key) == null) { HYSTRIX_CONSUME_EXECUTOR.put(key, threadPoolExecutor);
HYSTRIX_CONSUME_EXECUTOR.put(key, threadPoolExecutor);
addExtraFlag = true;
}
} }
} }
} }
if (addExtraFlag) {
Map<String, ThreadPoolAdapter> map = Maps.newHashMap();
map.putAll(ApplicationContextHolder.getBeansOfType(HystrixThreadPoolAdapter.class));
threadPoolAdapterExtra.offerQueue(map);
}
} catch (Exception e) { } catch (Exception e) {
log.error("Failed to get Hystrix thread pool.", e); log.error("Failed to get Hystrix thread pool.", e);
} }

@ -143,8 +143,7 @@ public class DiscoveryClient implements DisposableBean {
boolean success = register(); boolean success = register();
// TODO Abstract server registration logic // TODO Abstract server registration logic
ThreadPoolAdapterRegister adapterRegister = ApplicationContextHolder.getBean(ThreadPoolAdapterRegister.class); ThreadPoolAdapterRegister adapterRegister = ApplicationContextHolder.getBean(ThreadPoolAdapterRegister.class);
Map<String, ThreadPoolAdapter> threadPoolAdapterMap = ApplicationContextHolder.getBeansOfType(ThreadPoolAdapter.class); adapterRegister.register();
adapterRegister.register(threadPoolAdapterMap);
if (success) { if (success) {
instanceInfo.unsetIsDirty(timestamp); instanceInfo.unsetIsDirty(timestamp);
} }

@ -38,6 +38,9 @@ import org.springframework.core.env.ConfigurableEnvironment;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL; import static cn.hippo4j.common.constant.Constants.IDENTIFY_SLICER_SYMBOL;
import static cn.hippo4j.common.constant.Constants.REGISTER_ADAPTER_PATH; import static cn.hippo4j.common.constant.Constants.REGISTER_ADAPTER_PATH;
@ -59,15 +62,24 @@ public class ThreadPoolAdapterRegister implements ApplicationRunner {
private final ThreadPoolAdapterExtra threadPoolAdapterExtra; private final ThreadPoolAdapterExtra threadPoolAdapterExtra;
private List<ThreadPoolAdapterCacheConfig> cacheConfigList = Lists.newArrayList();
private static final int TASK_INTERVAL_SECONDS = 2;
@Override @Override
public void run(ApplicationArguments args) throws Exception { public void run(ApplicationArguments args) throws Exception {
Map<String, ThreadPoolAdapter> threadPoolAdapterMap = ApplicationContextHolder.getBeansOfType(ThreadPoolAdapter.class);
register(threadPoolAdapterMap);
threadPoolAdapterExtra.extraStart(map -> register(map));
}
public void register(Map<String, ThreadPoolAdapter> threadPoolAdapterMap) { ScheduledExecutorService scheduler = threadPoolAdapterExtra.getScheduler();
ThreadPoolAdapterRegisterTask threadPoolAdapterRegisterTask = new ThreadPoolAdapterRegisterTask(scheduler);
scheduler.schedule(threadPoolAdapterRegisterTask, TASK_INTERVAL_SECONDS, TimeUnit.SECONDS);
}
public List<ThreadPoolAdapterCacheConfig> getThreadPoolAdapterCacheConfigs(){
Map<String, ThreadPoolAdapter> threadPoolAdapterMap = ApplicationContextHolder.getBeansOfType(ThreadPoolAdapter.class);
List<ThreadPoolAdapterCacheConfig> cacheConfigList = Lists.newArrayList(); List<ThreadPoolAdapterCacheConfig> cacheConfigList = Lists.newArrayList();
threadPoolAdapterMap.forEach((key, val) -> { threadPoolAdapterMap.forEach((key, val) -> {
List<ThreadPoolAdapterState> threadPoolStates = val.getThreadPoolStates(); List<ThreadPoolAdapterState> threadPoolStates = val.getThreadPoolStates();
@ -84,6 +96,10 @@ public class ThreadPoolAdapterRegister implements ApplicationRunner {
cacheConfig.setThreadPoolAdapterStates(threadPoolStates); cacheConfig.setThreadPoolAdapterStates(threadPoolStates);
cacheConfigList.add(cacheConfig); cacheConfigList.add(cacheConfig);
}); });
return cacheConfigList;
}
public void doRegister(List<ThreadPoolAdapterCacheConfig> cacheConfigList){
if (CollectionUtil.isNotEmpty(cacheConfigList)) { if (CollectionUtil.isNotEmpty(cacheConfigList)) {
try { try {
Result result = httpAgent.httpPost(REGISTER_ADAPTER_PATH, cacheConfigList); Result result = httpAgent.httpPost(REGISTER_ADAPTER_PATH, cacheConfigList);
@ -95,4 +111,62 @@ public class ThreadPoolAdapterRegister implements ApplicationRunner {
} }
} }
} }
public void register() {
List<ThreadPoolAdapterCacheConfig> threadPoolAdapterCacheConfigs = getThreadPoolAdapterCacheConfigs();
doRegister(threadPoolAdapterCacheConfigs);
}
class ThreadPoolAdapterRegisterTask implements Runnable{
private ScheduledExecutorService scheduler;
public ThreadPoolAdapterRegisterTask(ScheduledExecutorService scheduler){
this.scheduler = scheduler;
}
@Override
public void run() {
try {
boolean registerFlag = false;
List<ThreadPoolAdapterCacheConfig> newThreadPoolAdapterCacheConfigs = getThreadPoolAdapterCacheConfigs();
Map<String, List<ThreadPoolAdapterState>> newThreadPoolAdapterCacheConfigMap =
newThreadPoolAdapterCacheConfigs.stream().collect(Collectors.toMap(
ThreadPoolAdapterCacheConfig::getMark, ThreadPoolAdapterCacheConfig::getThreadPoolAdapterStates, (k1, k2) -> k2));
Map<String, List<ThreadPoolAdapterState>> oldThreadPoolAdapterCacheConfigMap =
cacheConfigList.stream().collect(Collectors.toMap(
ThreadPoolAdapterCacheConfig::getMark, ThreadPoolAdapterCacheConfig::getThreadPoolAdapterStates, (k1, k2) -> k2));
for (Map.Entry<String, List<ThreadPoolAdapterState>> entry : newThreadPoolAdapterCacheConfigMap.entrySet()) {
String key = entry.getKey();
List<ThreadPoolAdapterState> newValue = entry.getValue();
List<ThreadPoolAdapterState> oldValue = oldThreadPoolAdapterCacheConfigMap.get(key);
if (oldValue == null) {
registerFlag = true;
break;
}else {
if (newValue.size() != oldValue.size()) {
registerFlag = true;
break;
}
}
}
cacheConfigList = newThreadPoolAdapterCacheConfigs;
if (registerFlag) {
doRegister(cacheConfigList);
}
}catch (Exception e){
log.error("Register Task Error",e);
}finally {
if (!scheduler.isShutdown()) {
scheduler.schedule(this, TASK_INTERVAL_SECONDS, TimeUnit.MILLISECONDS);
}
}
}
}
} }

Loading…
Cancel
Save