From 3302a84d5eaf108432292240c10c5239857e30dc Mon Sep 17 00:00:00 2001 From: "chen.ma" Date: Tue, 12 Oct 2021 23:22:01 +0800 Subject: [PATCH] Fix local thread pool subscription deadlock. --- .../core/DynamicThreadPoolPostProcessor.java | 29 ++++++++++++++----- .../starter/wrap/DynamicThreadPoolWrap.java | 2 ++ 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/DynamicThreadPoolPostProcessor.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/DynamicThreadPoolPostProcessor.java index e007ccb9..b9f30e7a 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/DynamicThreadPoolPostProcessor.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/core/DynamicThreadPoolPostProcessor.java @@ -18,6 +18,7 @@ import org.springframework.beans.factory.config.BeanPostProcessor; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; @@ -61,19 +62,24 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { DynamicThreadPoolWrap dynamicThreadPoolWrap = (DynamicThreadPoolWrap) bean; /** - * 根据 TpId 向 Server 端发起请求,查询是否有远程配置 + * 根据 TpId 向 Server 端发起请求, 查询是否有远程配置 * Server 端无配置使用默认 ${@link CommonThreadPool#getInstance(String)} */ fillPoolAndRegister(dynamicThreadPoolWrap); /** - * 订阅 Server 端配置 + * 订阅 Server 端配置, 远端没有配置的线程池不参与订阅 */ subscribeConfig(dynamicThreadPoolWrap); return bean; } + /** + * Fill the thread pool and register. + * + * @param dynamicThreadPoolWrap + */ private void fillPoolAndRegister(DynamicThreadPoolWrap dynamicThreadPoolWrap) { String tpId = dynamicThreadPoolWrap.getTpId(); Map queryStrMap = new HashMap(3); @@ -81,9 +87,10 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { queryStrMap.put("itemId", properties.getItemId()); queryStrMap.put("namespace", properties.getNamespace()); - PoolParameterInfo ppi = new PoolParameterInfo(); - CustomThreadPoolExecutor poolExecutor = null; Result result = null; + boolean isSubscribe = false; + CustomThreadPoolExecutor poolExecutor = null; + PoolParameterInfo ppi = new PoolParameterInfo(); try { result = httpAgent.httpGetByConfig(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 3000L); @@ -102,21 +109,29 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { .build(); dynamicThreadPoolWrap.setPool(poolExecutor); - } else if (dynamicThreadPoolWrap.getPool() == null) { - dynamicThreadPoolWrap.setPool(CommonThreadPool.getInstance(tpId)); + isSubscribe = true; } } catch (Exception ex) { poolExecutor = dynamicThreadPoolWrap.getPool() != null ? dynamicThreadPoolWrap.getPool() : CommonThreadPool.getInstance(tpId); dynamicThreadPoolWrap.setPool(poolExecutor); log.error("[Init pool] Failed to initialize thread pool configuration. error message :: {}", ex.getMessage()); + } finally { + if (Objects.isNull(dynamicThreadPoolWrap.getPool())) { + dynamicThreadPoolWrap.setPool(CommonThreadPool.getInstance(tpId)); + } + + // 设置是否订阅远端线程池配置 + dynamicThreadPoolWrap.setSubscribeFlag(isSubscribe); } GlobalThreadPoolManage.register(dynamicThreadPoolWrap.getTpId(), ppi, dynamicThreadPoolWrap); } private void subscribeConfig(DynamicThreadPoolWrap dynamicThreadPoolWrap) { - threadPoolOperation.subscribeConfig(dynamicThreadPoolWrap.getTpId(), executorService, config -> ThreadPoolDynamicRefresh.refreshDynamicPool(config)); + if (dynamicThreadPoolWrap.isSubscribeFlag()) { + threadPoolOperation.subscribeConfig(dynamicThreadPoolWrap.getTpId(), executorService, config -> ThreadPoolDynamicRefresh.refreshDynamicPool(config)); + } } } diff --git a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/wrap/DynamicThreadPoolWrap.java b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/wrap/DynamicThreadPoolWrap.java index a0c2cfb6..571b4679 100644 --- a/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/wrap/DynamicThreadPoolWrap.java +++ b/dynamic-threadpool-spring-boot-starter/src/main/java/com/github/dynamic/threadpool/starter/wrap/DynamicThreadPoolWrap.java @@ -22,6 +22,8 @@ public class DynamicThreadPoolWrap { private String tpId; + private boolean subscribeFlag; + private CustomThreadPoolExecutor pool; /**