diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/config/DynamicThreadPoolAutoConfiguration.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/config/DynamicThreadPoolAutoConfiguration.java index 61932f3a..7ef24b95 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/config/DynamicThreadPoolAutoConfiguration.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/config/DynamicThreadPoolAutoConfiguration.java @@ -2,7 +2,10 @@ package io.dynamic.threadpool.starter.config; import io.dynamic.threadpool.common.config.ApplicationContextHolder; import io.dynamic.threadpool.starter.controller.PoolRunStateController; -import io.dynamic.threadpool.starter.core.*; +import io.dynamic.threadpool.starter.core.ConfigService; +import io.dynamic.threadpool.starter.core.ThreadPoolBeanPostProcessor; +import io.dynamic.threadpool.starter.core.ThreadPoolConfigService; +import io.dynamic.threadpool.starter.core.ThreadPoolOperation; import io.dynamic.threadpool.starter.enable.DynamicThreadPoolMarkerConfiguration; import io.dynamic.threadpool.starter.handler.ThreadPoolBannerHandler; import lombok.AllArgsConstructor; @@ -22,9 +25,9 @@ import org.springframework.context.annotation.Configuration; @Slf4j @Configuration @AllArgsConstructor +@ImportAutoConfiguration(OkHttpClientConfig.class) @EnableConfigurationProperties(DynamicThreadPoolProperties.class) @ConditionalOnBean(DynamicThreadPoolMarkerConfiguration.Marker.class) -@ImportAutoConfiguration(OkHttpClientConfig.class) public class DynamicThreadPoolAutoConfiguration { private final DynamicThreadPoolProperties properties; @@ -40,27 +43,23 @@ public class DynamicThreadPoolAutoConfiguration { } @Bean - public ConfigService configService() { + public ConfigService configService(ApplicationContextHolder holder) { return new ThreadPoolConfigService(properties); } @Bean - public ThreadPoolRunListener threadPoolRunListener() { - return new ThreadPoolRunListener(properties); - } - - @Bean - public ThreadPoolConfigAdapter threadPoolConfigAdapter() { - return new ThreadPoolConfigAdapter(); + public ThreadPoolOperation threadPoolOperation(ConfigService configService) { + return new ThreadPoolOperation(properties, configService); } @Bean - public ThreadPoolOperation threadPoolOperation() { - return new ThreadPoolOperation(properties); + public ThreadPoolBeanPostProcessor threadPoolBeanPostProcessor(ThreadPoolOperation threadPoolOperation) { + return new ThreadPoolBeanPostProcessor(properties, threadPoolOperation); } @Bean public PoolRunStateController poolRunStateController() { return new PoolRunStateController(); } + } diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolBeanPostProcessor.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolBeanPostProcessor.java new file mode 100644 index 00000000..e84eec99 --- /dev/null +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolBeanPostProcessor.java @@ -0,0 +1,117 @@ +package io.dynamic.threadpool.starter.core; + +import com.alibaba.fastjson.JSON; +import io.dynamic.threadpool.common.constant.Constants; +import io.dynamic.threadpool.common.model.PoolParameterInfo; +import io.dynamic.threadpool.common.web.base.Result; +import io.dynamic.threadpool.starter.common.CommonThreadPool; +import io.dynamic.threadpool.starter.config.DynamicThreadPoolProperties; +import io.dynamic.threadpool.starter.remote.HttpAgent; +import io.dynamic.threadpool.starter.remote.ServerHttpAgent; +import io.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum; +import io.dynamic.threadpool.starter.toolkit.thread.RejectedTypeEnum; +import io.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder; +import io.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrap; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.BeanPostProcessor; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * 线程池包装后置处理器 + * + * @author chen.ma + * @date 2021/8/2 20:40 + */ +@Slf4j +public final class ThreadPoolBeanPostProcessor implements BeanPostProcessor { + + private final DynamicThreadPoolProperties properties; + + private final ThreadPoolOperation threadPoolOperation; + + public ThreadPoolBeanPostProcessor(DynamicThreadPoolProperties properties, ThreadPoolOperation threadPoolOperation) { + this.properties = properties; + this.threadPoolOperation = threadPoolOperation; + } + + private final ExecutorService executorService = ThreadPoolBuilder.builder() + .poolThreadSize(2, 4) + .keepAliveTime(0L, TimeUnit.MILLISECONDS) + .workQueue(QueueTypeEnum.ARRAY_BLOCKING_QUEUE, 1) + .threadFactory("dynamic-threadPool-config") + .rejected(new ThreadPoolExecutor.DiscardOldestPolicy()) + .build(); + + @Override + public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { + if (!(bean instanceof DynamicThreadPoolWrap)) { + return bean; + } + DynamicThreadPoolWrap dynamicThreadPoolWrap = (DynamicThreadPoolWrap) bean; + + /** + * 根据 TpId 向 Server 端发起请求,查询是否有远程配置 + * Server 端无配置使用默认 ${@link CommonThreadPool#getInstance(String)} + */ + fillPoolAndRegister(dynamicThreadPoolWrap); + + /** + * 订阅 Server 端配置 + */ + subscribeConfig(dynamicThreadPoolWrap); + + return bean; + } + + private void fillPoolAndRegister(DynamicThreadPoolWrap dynamicThreadPoolWrap) { + String tpId = dynamicThreadPoolWrap.getTpId(); + Map queryStrMap = new HashMap(3); + queryStrMap.put("tpId", tpId); + queryStrMap.put("itemId", properties.getItemId()); + queryStrMap.put("namespace", properties.getNamespace()); + + PoolParameterInfo ppi = new PoolParameterInfo(); + HttpAgent httpAgent = new ServerHttpAgent(properties); + ThreadPoolExecutor poolExecutor = null; + Result result = null; + + try { + result = httpAgent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 3000L); + if (result.isSuccess() && result.getData() != null && (ppi = JSON.toJavaObject((JSON) result.getData(), PoolParameterInfo.class)) != null) { + // 使用相关参数创建线程池 + BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(ppi.getQueueType(), ppi.getCapacity()); + poolExecutor = ThreadPoolBuilder.builder() + .isCustomPool(true) + .poolThreadSize(ppi.getCoreSize(), ppi.getMaxSize()) + .keepAliveTime(ppi.getKeepAliveTime(), TimeUnit.SECONDS) + .workQueue(workQueue) + .threadFactory(tpId) + .rejected(RejectedTypeEnum.createPolicy(ppi.getRejectedType())) + .build(); + + dynamicThreadPoolWrap.setPool(poolExecutor); + } else if (dynamicThreadPoolWrap.getPool() == null) { + dynamicThreadPoolWrap.setPool(CommonThreadPool.getInstance(tpId)); + } + } catch (Exception ex) { + poolExecutor = dynamicThreadPoolWrap.getPool() != null ? dynamicThreadPoolWrap.getPool() : CommonThreadPool.getInstance(tpId); + dynamicThreadPoolWrap.setPool(poolExecutor); + + log.error("[Init pool] Failed to initialize thread pool configuration. error message :: {}", ex.getMessage()); + } + + GlobalThreadPoolManage.register(dynamicThreadPoolWrap.getTpId(), ppi, dynamicThreadPoolWrap); + } + + private void subscribeConfig(DynamicThreadPoolWrap dynamicThreadPoolWrap) { + threadPoolOperation.subscribeConfig(dynamicThreadPoolWrap.getTpId(), executorService, config -> ThreadPoolDynamicRefresh.refreshDynamicPool(config)); + } + +} diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolConfigAdapter.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolConfigAdapter.java deleted file mode 100644 index 559dd2fa..00000000 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolConfigAdapter.java +++ /dev/null @@ -1,50 +0,0 @@ -package io.dynamic.threadpool.starter.core; - -import io.dynamic.threadpool.common.config.ApplicationContextHolder; -import io.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum; -import io.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder; -import io.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrap; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.core.Ordered; -import org.springframework.core.annotation.Order; - -import javax.annotation.PostConstruct; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -/** - * 动态线程池配置适配器 - * - * @author chen.ma - * @date 2021/6/22 20:17 - */ -public class ThreadPoolConfigAdapter extends ConfigAdapter { - - @Autowired - private ThreadPoolOperation threadPoolOperation; - - private ExecutorService executorService = ThreadPoolBuilder.builder() - .poolThreadSize(2, 4) - .keepAliveTime(0L, TimeUnit.MILLISECONDS) - .workQueue(QueueTypeEnum.ARRAY_BLOCKING_QUEUE, 1) - .threadFactory("threadPool-config") - .rejected(new ThreadPoolExecutor.DiscardOldestPolicy()) - .build(); - - @PostConstruct - @Order(Ordered.LOWEST_PRECEDENCE - 2048) - public void subscribeConfig() { - Map executorMap = - ApplicationContextHolder.getBeansOfType(DynamicThreadPoolWrap.class); - - List tpIdList = new ArrayList(); - executorMap.forEach((key, val) -> tpIdList.add(val.getTpId())); - - tpIdList.forEach(each -> threadPoolOperation.subscribeConfig(each, executorService, config -> callbackConfig(config))); - } - -} diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolOperation.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolOperation.java index 3a0ed269..54919724 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolOperation.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolOperation.java @@ -1,7 +1,6 @@ package io.dynamic.threadpool.starter.core; import io.dynamic.threadpool.starter.config.DynamicThreadPoolProperties; -import org.springframework.beans.factory.annotation.Autowired; import java.util.concurrent.Executor; @@ -13,13 +12,13 @@ import java.util.concurrent.Executor; */ public class ThreadPoolOperation { - @Autowired - private ConfigService configService; + private final ConfigService configService; private final DynamicThreadPoolProperties properties; - public ThreadPoolOperation(DynamicThreadPoolProperties properties) { + public ThreadPoolOperation(DynamicThreadPoolProperties properties, ConfigService configService) { this.properties = properties; + this.configService = configService; } public Listener subscribeConfig(String tpId, Executor executor, ThreadPoolSubscribeCallback threadPoolSubscribeCallback) { diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolRunListener.java b/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolRunListener.java deleted file mode 100644 index ec56f50d..00000000 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/io/dynamic/threadpool/starter/core/ThreadPoolRunListener.java +++ /dev/null @@ -1,86 +0,0 @@ -package io.dynamic.threadpool.starter.core; - -import com.alibaba.fastjson.JSON; -import io.dynamic.threadpool.common.config.ApplicationContextHolder; -import io.dynamic.threadpool.common.constant.Constants; -import io.dynamic.threadpool.common.model.PoolParameterInfo; -import io.dynamic.threadpool.common.web.base.Result; -import io.dynamic.threadpool.starter.common.CommonThreadPool; -import io.dynamic.threadpool.starter.config.DynamicThreadPoolProperties; -import io.dynamic.threadpool.starter.remote.HttpAgent; -import io.dynamic.threadpool.starter.remote.ServerHttpAgent; -import io.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum; -import io.dynamic.threadpool.starter.toolkit.thread.RejectedTypeEnum; -import io.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder; -import io.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrap; -import lombok.extern.slf4j.Slf4j; -import org.springframework.core.Ordered; -import org.springframework.core.annotation.Order; - -import javax.annotation.PostConstruct; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -/** - * 线程池启动监听 - * - * @author chen.ma - * @date 2021/6/20 16:34 - */ -@Slf4j -public class ThreadPoolRunListener { - - private final DynamicThreadPoolProperties properties; - - public ThreadPoolRunListener(DynamicThreadPoolProperties properties) { - this.properties = properties; - } - - @PostConstruct - @Order(Ordered.LOWEST_PRECEDENCE - 1024) - public void run() { - Map executorMap = ApplicationContextHolder.getBeansOfType(DynamicThreadPoolWrap.class); - executorMap.forEach((key, val) -> { - String tpId = val.getTpId(); - Map queryStrMap = new HashMap(3); - queryStrMap.put("tpId", tpId); - queryStrMap.put("itemId", properties.getItemId()); - queryStrMap.put("namespace", properties.getNamespace()); - - PoolParameterInfo ppi = new PoolParameterInfo(); - HttpAgent httpAgent = new ServerHttpAgent(properties); - ThreadPoolExecutor poolExecutor = null; - Result result = null; - - try { - result = httpAgent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 3000L); - if (result.isSuccess() && result.getData() != null && (ppi = JSON.toJavaObject((JSON) result.getData(), PoolParameterInfo.class)) != null) { - // 使用相关参数创建线程池 - BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(ppi.getQueueType(), ppi.getCapacity()); - poolExecutor = ThreadPoolBuilder.builder() - .isCustomPool(true) - .poolThreadSize(ppi.getCoreSize(), ppi.getMaxSize()) - .keepAliveTime(ppi.getKeepAliveTime(), TimeUnit.SECONDS) - .workQueue(workQueue) - .threadFactory(tpId) - .rejected(RejectedTypeEnum.createPolicy(ppi.getRejectedType())) - .build(); - - val.setPool(poolExecutor); - } else if (val.getPool() == null) { - val.setPool(CommonThreadPool.getInstance(tpId)); - } - } catch (Exception ex) { - poolExecutor = val.getPool() != null ? val.getPool() : CommonThreadPool.getInstance(tpId); - val.setPool(poolExecutor); - - log.error("[Init pool] Failed to initialize thread pool configuration. error message :: {}", ex.getMessage()); - } - - GlobalThreadPoolManage.register(val.getTpId(), ppi, val); - }); - } -}