Fix local thread pool subscription deadlock.

pull/161/head
chen.ma 3 years ago
parent cb18b37546
commit 3302a84d5e

@ -18,6 +18,7 @@ import org.springframework.beans.factory.config.BeanPostProcessor;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@ -61,19 +62,24 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
DynamicThreadPoolWrap dynamicThreadPoolWrap = (DynamicThreadPoolWrap) bean; DynamicThreadPoolWrap dynamicThreadPoolWrap = (DynamicThreadPoolWrap) bean;
/** /**
* TpId Server * TpId Server ,
* Server 使 ${@link CommonThreadPool#getInstance(String)} * Server 使 ${@link CommonThreadPool#getInstance(String)}
*/ */
fillPoolAndRegister(dynamicThreadPoolWrap); fillPoolAndRegister(dynamicThreadPoolWrap);
/** /**
* Server * Server , 线
*/ */
subscribeConfig(dynamicThreadPoolWrap); subscribeConfig(dynamicThreadPoolWrap);
return bean; return bean;
} }
/**
* Fill the thread pool and register.
*
* @param dynamicThreadPoolWrap
*/
private void fillPoolAndRegister(DynamicThreadPoolWrap dynamicThreadPoolWrap) { private void fillPoolAndRegister(DynamicThreadPoolWrap dynamicThreadPoolWrap) {
String tpId = dynamicThreadPoolWrap.getTpId(); String tpId = dynamicThreadPoolWrap.getTpId();
Map<String, String> queryStrMap = new HashMap(3); Map<String, String> queryStrMap = new HashMap(3);
@ -81,9 +87,10 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
queryStrMap.put("itemId", properties.getItemId()); queryStrMap.put("itemId", properties.getItemId());
queryStrMap.put("namespace", properties.getNamespace()); queryStrMap.put("namespace", properties.getNamespace());
PoolParameterInfo ppi = new PoolParameterInfo();
CustomThreadPoolExecutor poolExecutor = null;
Result result = null; Result result = null;
boolean isSubscribe = false;
CustomThreadPoolExecutor poolExecutor = null;
PoolParameterInfo ppi = new PoolParameterInfo();
try { try {
result = httpAgent.httpGetByConfig(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 3000L); result = httpAgent.httpGetByConfig(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 3000L);
@ -102,21 +109,29 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
.build(); .build();
dynamicThreadPoolWrap.setPool(poolExecutor); dynamicThreadPoolWrap.setPool(poolExecutor);
} else if (dynamicThreadPoolWrap.getPool() == null) { isSubscribe = true;
dynamicThreadPoolWrap.setPool(CommonThreadPool.getInstance(tpId));
} }
} catch (Exception ex) { } catch (Exception ex) {
poolExecutor = dynamicThreadPoolWrap.getPool() != null ? dynamicThreadPoolWrap.getPool() : CommonThreadPool.getInstance(tpId); poolExecutor = dynamicThreadPoolWrap.getPool() != null ? dynamicThreadPoolWrap.getPool() : CommonThreadPool.getInstance(tpId);
dynamicThreadPoolWrap.setPool(poolExecutor); dynamicThreadPoolWrap.setPool(poolExecutor);
log.error("[Init pool] Failed to initialize thread pool configuration. error message :: {}", ex.getMessage()); log.error("[Init pool] Failed to initialize thread pool configuration. error message :: {}", ex.getMessage());
} finally {
if (Objects.isNull(dynamicThreadPoolWrap.getPool())) {
dynamicThreadPoolWrap.setPool(CommonThreadPool.getInstance(tpId));
}
// 设置是否订阅远端线程池配置
dynamicThreadPoolWrap.setSubscribeFlag(isSubscribe);
} }
GlobalThreadPoolManage.register(dynamicThreadPoolWrap.getTpId(), ppi, dynamicThreadPoolWrap); GlobalThreadPoolManage.register(dynamicThreadPoolWrap.getTpId(), ppi, dynamicThreadPoolWrap);
} }
private void subscribeConfig(DynamicThreadPoolWrap dynamicThreadPoolWrap) { private void subscribeConfig(DynamicThreadPoolWrap dynamicThreadPoolWrap) {
threadPoolOperation.subscribeConfig(dynamicThreadPoolWrap.getTpId(), executorService, config -> ThreadPoolDynamicRefresh.refreshDynamicPool(config)); if (dynamicThreadPoolWrap.isSubscribeFlag()) {
threadPoolOperation.subscribeConfig(dynamicThreadPoolWrap.getTpId(), executorService, config -> ThreadPoolDynamicRefresh.refreshDynamicPool(config));
}
} }
} }

@ -22,6 +22,8 @@ public class DynamicThreadPoolWrap {
private String tpId; private String tpId;
private boolean subscribeFlag;
private CustomThreadPoolExecutor pool; private CustomThreadPoolExecutor pool;
/** /**

Loading…
Cancel
Save