refactor: 动态线程池初始化填充重构.

pull/161/head
龙台 3 years ago
parent 9eb3d6702c
commit 50106d7c9e

@ -2,7 +2,10 @@ package io.dynamic.threadpool.starter.config;
import io.dynamic.threadpool.common.config.ApplicationContextHolder;
import io.dynamic.threadpool.starter.controller.PoolRunStateController;
import io.dynamic.threadpool.starter.core.*;
import io.dynamic.threadpool.starter.core.ConfigService;
import io.dynamic.threadpool.starter.core.ThreadPoolBeanPostProcessor;
import io.dynamic.threadpool.starter.core.ThreadPoolConfigService;
import io.dynamic.threadpool.starter.core.ThreadPoolOperation;
import io.dynamic.threadpool.starter.enable.DynamicThreadPoolMarkerConfiguration;
import io.dynamic.threadpool.starter.handler.ThreadPoolBannerHandler;
import lombok.AllArgsConstructor;
@ -22,9 +25,9 @@ import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
@AllArgsConstructor
@ImportAutoConfiguration(OkHttpClientConfig.class)
@EnableConfigurationProperties(DynamicThreadPoolProperties.class)
@ConditionalOnBean(DynamicThreadPoolMarkerConfiguration.Marker.class)
@ImportAutoConfiguration(OkHttpClientConfig.class)
public class DynamicThreadPoolAutoConfiguration {
private final DynamicThreadPoolProperties properties;
@ -40,27 +43,23 @@ public class DynamicThreadPoolAutoConfiguration {
}
@Bean
public ConfigService configService() {
public ConfigService configService(ApplicationContextHolder holder) {
return new ThreadPoolConfigService(properties);
}
@Bean
public ThreadPoolRunListener threadPoolRunListener() {
return new ThreadPoolRunListener(properties);
}
@Bean
public ThreadPoolConfigAdapter threadPoolConfigAdapter() {
return new ThreadPoolConfigAdapter();
public ThreadPoolOperation threadPoolOperation(ConfigService configService) {
return new ThreadPoolOperation(properties, configService);
}
@Bean
public ThreadPoolOperation threadPoolOperation() {
return new ThreadPoolOperation(properties);
public ThreadPoolBeanPostProcessor threadPoolBeanPostProcessor(ThreadPoolOperation threadPoolOperation) {
return new ThreadPoolBeanPostProcessor(properties, threadPoolOperation);
}
@Bean
public PoolRunStateController poolRunStateController() {
return new PoolRunStateController();
}
}

@ -0,0 +1,117 @@
package io.dynamic.threadpool.starter.core;
import com.alibaba.fastjson.JSON;
import io.dynamic.threadpool.common.constant.Constants;
import io.dynamic.threadpool.common.model.PoolParameterInfo;
import io.dynamic.threadpool.common.web.base.Result;
import io.dynamic.threadpool.starter.common.CommonThreadPool;
import io.dynamic.threadpool.starter.config.DynamicThreadPoolProperties;
import io.dynamic.threadpool.starter.remote.HttpAgent;
import io.dynamic.threadpool.starter.remote.ServerHttpAgent;
import io.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum;
import io.dynamic.threadpool.starter.toolkit.thread.RejectedTypeEnum;
import io.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder;
import io.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrap;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 线
*
* @author chen.ma
* @date 2021/8/2 20:40
*/
@Slf4j
public final class ThreadPoolBeanPostProcessor implements BeanPostProcessor {
private final DynamicThreadPoolProperties properties;
private final ThreadPoolOperation threadPoolOperation;
public ThreadPoolBeanPostProcessor(DynamicThreadPoolProperties properties, ThreadPoolOperation threadPoolOperation) {
this.properties = properties;
this.threadPoolOperation = threadPoolOperation;
}
private final ExecutorService executorService = ThreadPoolBuilder.builder()
.poolThreadSize(2, 4)
.keepAliveTime(0L, TimeUnit.MILLISECONDS)
.workQueue(QueueTypeEnum.ARRAY_BLOCKING_QUEUE, 1)
.threadFactory("dynamic-threadPool-config")
.rejected(new ThreadPoolExecutor.DiscardOldestPolicy())
.build();
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (!(bean instanceof DynamicThreadPoolWrap)) {
return bean;
}
DynamicThreadPoolWrap dynamicThreadPoolWrap = (DynamicThreadPoolWrap) bean;
/**
* TpId Server
* Server 使 ${@link CommonThreadPool#getInstance(String)}
*/
fillPoolAndRegister(dynamicThreadPoolWrap);
/**
* Server
*/
subscribeConfig(dynamicThreadPoolWrap);
return bean;
}
private void fillPoolAndRegister(DynamicThreadPoolWrap dynamicThreadPoolWrap) {
String tpId = dynamicThreadPoolWrap.getTpId();
Map<String, String> queryStrMap = new HashMap(3);
queryStrMap.put("tpId", tpId);
queryStrMap.put("itemId", properties.getItemId());
queryStrMap.put("namespace", properties.getNamespace());
PoolParameterInfo ppi = new PoolParameterInfo();
HttpAgent httpAgent = new ServerHttpAgent(properties);
ThreadPoolExecutor poolExecutor = null;
Result result = null;
try {
result = httpAgent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 3000L);
if (result.isSuccess() && result.getData() != null && (ppi = JSON.toJavaObject((JSON) result.getData(), PoolParameterInfo.class)) != null) {
// 使用相关参数创建线程池
BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(ppi.getQueueType(), ppi.getCapacity());
poolExecutor = ThreadPoolBuilder.builder()
.isCustomPool(true)
.poolThreadSize(ppi.getCoreSize(), ppi.getMaxSize())
.keepAliveTime(ppi.getKeepAliveTime(), TimeUnit.SECONDS)
.workQueue(workQueue)
.threadFactory(tpId)
.rejected(RejectedTypeEnum.createPolicy(ppi.getRejectedType()))
.build();
dynamicThreadPoolWrap.setPool(poolExecutor);
} else if (dynamicThreadPoolWrap.getPool() == null) {
dynamicThreadPoolWrap.setPool(CommonThreadPool.getInstance(tpId));
}
} catch (Exception ex) {
poolExecutor = dynamicThreadPoolWrap.getPool() != null ? dynamicThreadPoolWrap.getPool() : CommonThreadPool.getInstance(tpId);
dynamicThreadPoolWrap.setPool(poolExecutor);
log.error("[Init pool] Failed to initialize thread pool configuration. error message :: {}", ex.getMessage());
}
GlobalThreadPoolManage.register(dynamicThreadPoolWrap.getTpId(), ppi, dynamicThreadPoolWrap);
}
private void subscribeConfig(DynamicThreadPoolWrap dynamicThreadPoolWrap) {
threadPoolOperation.subscribeConfig(dynamicThreadPoolWrap.getTpId(), executorService, config -> ThreadPoolDynamicRefresh.refreshDynamicPool(config));
}
}

@ -1,50 +0,0 @@
package io.dynamic.threadpool.starter.core;
import io.dynamic.threadpool.common.config.ApplicationContextHolder;
import io.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum;
import io.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder;
import io.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrap;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 线
*
* @author chen.ma
* @date 2021/6/22 20:17
*/
public class ThreadPoolConfigAdapter extends ConfigAdapter {
@Autowired
private ThreadPoolOperation threadPoolOperation;
private ExecutorService executorService = ThreadPoolBuilder.builder()
.poolThreadSize(2, 4)
.keepAliveTime(0L, TimeUnit.MILLISECONDS)
.workQueue(QueueTypeEnum.ARRAY_BLOCKING_QUEUE, 1)
.threadFactory("threadPool-config")
.rejected(new ThreadPoolExecutor.DiscardOldestPolicy())
.build();
@PostConstruct
@Order(Ordered.LOWEST_PRECEDENCE - 2048)
public void subscribeConfig() {
Map<String, DynamicThreadPoolWrap> executorMap =
ApplicationContextHolder.getBeansOfType(DynamicThreadPoolWrap.class);
List<String> tpIdList = new ArrayList();
executorMap.forEach((key, val) -> tpIdList.add(val.getTpId()));
tpIdList.forEach(each -> threadPoolOperation.subscribeConfig(each, executorService, config -> callbackConfig(config)));
}
}

@ -1,7 +1,6 @@
package io.dynamic.threadpool.starter.core;
import io.dynamic.threadpool.starter.config.DynamicThreadPoolProperties;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.concurrent.Executor;
@ -13,13 +12,13 @@ import java.util.concurrent.Executor;
*/
public class ThreadPoolOperation {
@Autowired
private ConfigService configService;
private final ConfigService configService;
private final DynamicThreadPoolProperties properties;
public ThreadPoolOperation(DynamicThreadPoolProperties properties) {
public ThreadPoolOperation(DynamicThreadPoolProperties properties, ConfigService configService) {
this.properties = properties;
this.configService = configService;
}
public Listener subscribeConfig(String tpId, Executor executor, ThreadPoolSubscribeCallback threadPoolSubscribeCallback) {

@ -1,86 +0,0 @@
package io.dynamic.threadpool.starter.core;
import com.alibaba.fastjson.JSON;
import io.dynamic.threadpool.common.config.ApplicationContextHolder;
import io.dynamic.threadpool.common.constant.Constants;
import io.dynamic.threadpool.common.model.PoolParameterInfo;
import io.dynamic.threadpool.common.web.base.Result;
import io.dynamic.threadpool.starter.common.CommonThreadPool;
import io.dynamic.threadpool.starter.config.DynamicThreadPoolProperties;
import io.dynamic.threadpool.starter.remote.HttpAgent;
import io.dynamic.threadpool.starter.remote.ServerHttpAgent;
import io.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum;
import io.dynamic.threadpool.starter.toolkit.thread.RejectedTypeEnum;
import io.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder;
import io.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrap;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 线
*
* @author chen.ma
* @date 2021/6/20 16:34
*/
@Slf4j
public class ThreadPoolRunListener {
private final DynamicThreadPoolProperties properties;
public ThreadPoolRunListener(DynamicThreadPoolProperties properties) {
this.properties = properties;
}
@PostConstruct
@Order(Ordered.LOWEST_PRECEDENCE - 1024)
public void run() {
Map<String, DynamicThreadPoolWrap> executorMap = ApplicationContextHolder.getBeansOfType(DynamicThreadPoolWrap.class);
executorMap.forEach((key, val) -> {
String tpId = val.getTpId();
Map<String, String> queryStrMap = new HashMap(3);
queryStrMap.put("tpId", tpId);
queryStrMap.put("itemId", properties.getItemId());
queryStrMap.put("namespace", properties.getNamespace());
PoolParameterInfo ppi = new PoolParameterInfo();
HttpAgent httpAgent = new ServerHttpAgent(properties);
ThreadPoolExecutor poolExecutor = null;
Result result = null;
try {
result = httpAgent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 3000L);
if (result.isSuccess() && result.getData() != null && (ppi = JSON.toJavaObject((JSON) result.getData(), PoolParameterInfo.class)) != null) {
// 使用相关参数创建线程池
BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(ppi.getQueueType(), ppi.getCapacity());
poolExecutor = ThreadPoolBuilder.builder()
.isCustomPool(true)
.poolThreadSize(ppi.getCoreSize(), ppi.getMaxSize())
.keepAliveTime(ppi.getKeepAliveTime(), TimeUnit.SECONDS)
.workQueue(workQueue)
.threadFactory(tpId)
.rejected(RejectedTypeEnum.createPolicy(ppi.getRejectedType()))
.build();
val.setPool(poolExecutor);
} else if (val.getPool() == null) {
val.setPool(CommonThreadPool.getInstance(tpId));
}
} catch (Exception ex) {
poolExecutor = val.getPool() != null ? val.getPool() : CommonThreadPool.getInstance(tpId);
val.setPool(poolExecutor);
log.error("[Init pool] Failed to initialize thread pool configuration. error message :: {}", ex.getMessage());
}
GlobalThreadPoolManage.register(val.getTpId(), ppi, val);
});
}
}
Loading…
Cancel
Save