Reconstitute the post processor of the thread pool

1.3.1
chen.ma 2 years ago
parent ce53d6b36b
commit 8604a75c1f

@ -51,10 +51,7 @@ import java.util.concurrent.TimeUnit;
import static cn.hippo4j.common.constant.Constants.*; import static cn.hippo4j.common.constant.Constants.*;
/** /**
* Dynamic threadPool post processor. * Dynamic thread-pool post processor.
*
* @author chen.ma
* @date 2021/8/2 20:40
*/ */
@Slf4j @Slf4j
@AllArgsConstructor @AllArgsConstructor
@ -68,18 +65,6 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
private final ServerThreadPoolDynamicRefresh threadPoolDynamicRefresh; private final ServerThreadPoolDynamicRefresh threadPoolDynamicRefresh;
private final ExecutorService executorService = ThreadPoolBuilder.builder()
.corePoolSize(2)
.maxPoolNum(4)
.keepAliveTime(2000)
.timeUnit(TimeUnit.MILLISECONDS)
.workQueue(QueueTypeEnum.ARRAY_BLOCKING_QUEUE)
.capacity(1024)
.allowCoreThreadTimeOut(true)
.threadFactory("client.dynamic.threadPool.change.config")
.rejected(new ThreadPoolExecutor.AbortPolicy())
.build();
@Override @Override
public Object postProcessBeforeInitialization(Object bean, String beanName) { public Object postProcessBeforeInitialization(Object bean, String beanName) {
return bean; return bean;
@ -102,8 +87,8 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
log.error("Failed to create dynamic thread pool in annotation mode.", ex); log.error("Failed to create dynamic thread pool in annotation mode.", ex);
return bean; return bean;
} }
DynamicThreadPoolExecutor dynamicExecutor = (DynamicThreadPoolExecutor) bean; DynamicThreadPoolExecutor dynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) bean;
DynamicThreadPoolWrapper wrap = new DynamicThreadPoolWrapper(dynamicExecutor.getThreadPoolId(), dynamicExecutor); DynamicThreadPoolWrapper wrap = new DynamicThreadPoolWrapper(dynamicThreadPoolExecutor.getThreadPoolId(), dynamicThreadPoolExecutor);
ThreadPoolExecutor remoteExecutor = fillPoolAndRegister(wrap); ThreadPoolExecutor remoteExecutor = fillPoolAndRegister(wrap);
subscribeConfig(wrap); subscribeConfig(wrap);
return remoteExecutor; return remoteExecutor;
@ -136,49 +121,47 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
queryStrMap.put(TP_ID, tpId); queryStrMap.put(TP_ID, tpId);
queryStrMap.put(ITEM_ID, properties.getItemId()); queryStrMap.put(ITEM_ID, properties.getItemId());
queryStrMap.put(NAMESPACE, properties.getNamespace()); queryStrMap.put(NAMESPACE, properties.getNamespace());
Result result;
boolean isSubscribe = false; boolean isSubscribe = false;
ThreadPoolExecutor newDynamicPoolExecutor = null; ThreadPoolExecutor newDynamicThreadPoolExecutor = null;
ThreadPoolParameterInfo ppi = new ThreadPoolParameterInfo(); ThreadPoolParameterInfo threadPoolParameterInfo = new ThreadPoolParameterInfo();
try { try {
result = httpAgent.httpGetByConfig(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 5000L); Result result = httpAgent.httpGetByConfig(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 5000L);
if (result.isSuccess() && result.getData() != null) { if (result.isSuccess() && result.getData() != null) {
String resultJsonStr = JSONUtil.toJSONString(result.getData()); String resultJsonStr = JSONUtil.toJSONString(result.getData());
if ((ppi = JSONUtil.parseObject(resultJsonStr, ThreadPoolParameterInfo.class)) != null) { if ((threadPoolParameterInfo = JSONUtil.parseObject(resultJsonStr, ThreadPoolParameterInfo.class)) != null) {
// Create a thread pool with relevant parameters. // Create a thread pool with relevant parameters.
BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(ppi.getQueueType(), ppi.getCapacity()); BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(threadPoolParameterInfo.getQueueType(), threadPoolParameterInfo.getCapacity());
newDynamicPoolExecutor = ThreadPoolBuilder.builder() newDynamicThreadPoolExecutor = ThreadPoolBuilder.builder()
.dynamicPool() .dynamicPool()
.workQueue(workQueue) .workQueue(workQueue)
.threadFactory(tpId) .threadFactory(tpId)
.poolThreadSize(ppi.getCoreSize(), ppi.getMaxSize()) .poolThreadSize(threadPoolParameterInfo.corePoolSizeAdapt(), threadPoolParameterInfo.maximumPoolSizeAdapt())
.keepAliveTime(ppi.getKeepAliveTime(), TimeUnit.SECONDS) .keepAliveTime(threadPoolParameterInfo.getKeepAliveTime(), TimeUnit.SECONDS)
.rejected(RejectedTypeEnum.createPolicy(ppi.getRejectedType())) .rejected(RejectedTypeEnum.createPolicy(threadPoolParameterInfo.getRejectedType()))
.allowCoreThreadTimeOut(EnableEnum.getBool(ppi.getAllowCoreThreadTimeOut())) .allowCoreThreadTimeOut(EnableEnum.getBool(threadPoolParameterInfo.getAllowCoreThreadTimeOut()))
.build(); .build();
// Set dynamic thread pool enhancement parameters. // Set dynamic thread pool enhancement parameters.
if (dynamicThreadPoolWrap.getExecutor() instanceof AbstractDynamicExecutorSupport) { if (dynamicThreadPoolWrap.getExecutor() instanceof AbstractDynamicExecutorSupport) {
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm( ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(
BooleanUtil.toBoolean(ppi.getIsAlarm().toString()), BooleanUtil.toBoolean(threadPoolParameterInfo.getIsAlarm().toString()),
ppi.getCapacityAlarm(), threadPoolParameterInfo.getCapacityAlarm(),
ppi.getLivenessAlarm()); threadPoolParameterInfo.getLivenessAlarm());
GlobalNotifyAlarmManage.put(tpId, threadPoolNotifyAlarm); GlobalNotifyAlarmManage.put(tpId, threadPoolNotifyAlarm);
TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).getTaskDecorator(); TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).getTaskDecorator();
((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskDecorator(taskDecorator); ((DynamicThreadPoolExecutor) newDynamicThreadPoolExecutor).setTaskDecorator(taskDecorator);
long awaitTerminationMillis = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).awaitTerminationMillis; long awaitTerminationMillis = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).awaitTerminationMillis;
boolean waitForTasksToCompleteOnShutdown = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).waitForTasksToCompleteOnShutdown; boolean waitForTasksToCompleteOnShutdown = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).waitForTasksToCompleteOnShutdown;
((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown); ((DynamicThreadPoolExecutor) newDynamicThreadPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown);
long executeTimeOut = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).getExecuteTimeOut(); long executeTimeOut = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).getExecuteTimeOut();
((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setExecuteTimeOut(executeTimeOut); ((DynamicThreadPoolExecutor) newDynamicThreadPoolExecutor).setExecuteTimeOut(executeTimeOut);
} }
dynamicThreadPoolWrap.setExecutor(newDynamicPoolExecutor); dynamicThreadPoolWrap.setExecutor(newDynamicThreadPoolExecutor);
isSubscribe = true; isSubscribe = true;
} }
} }
} catch (Exception ex) { } catch (Exception ex) {
newDynamicPoolExecutor = dynamicThreadPoolWrap.getExecutor() != null ? dynamicThreadPoolWrap.getExecutor() : CommonDynamicThreadPool.getInstance(tpId); newDynamicThreadPoolExecutor = dynamicThreadPoolWrap.getExecutor() != null ? dynamicThreadPoolWrap.getExecutor() : CommonDynamicThreadPool.getInstance(tpId);
dynamicThreadPoolWrap.setExecutor(newDynamicPoolExecutor); dynamicThreadPoolWrap.setExecutor(newDynamicThreadPoolExecutor);
log.error("Failed to initialize thread pool configuration. error message :: {}", ex.getMessage()); log.error("Failed to initialize thread pool configuration. error message :: {}", ex.getMessage());
} finally { } finally {
if (Objects.isNull(dynamicThreadPoolWrap.getExecutor())) { if (Objects.isNull(dynamicThreadPoolWrap.getExecutor())) {
@ -187,10 +170,22 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
// Set whether to subscribe to the remote thread pool configuration. // Set whether to subscribe to the remote thread pool configuration.
dynamicThreadPoolWrap.setSubscribeFlag(isSubscribe); dynamicThreadPoolWrap.setSubscribeFlag(isSubscribe);
} }
GlobalThreadPoolManage.register(dynamicThreadPoolWrap.getThreadPoolId(), ppi, dynamicThreadPoolWrap); GlobalThreadPoolManage.register(dynamicThreadPoolWrap.getThreadPoolId(), threadPoolParameterInfo, dynamicThreadPoolWrap);
return newDynamicPoolExecutor; return newDynamicThreadPoolExecutor;
} }
private final ExecutorService configRefreshExecutorService = ThreadPoolBuilder.builder()
.corePoolSize(2)
.maxPoolNum(4)
.keepAliveTime(2000)
.timeUnit(TimeUnit.MILLISECONDS)
.workQueue(QueueTypeEnum.ARRAY_BLOCKING_QUEUE)
.capacity(1024)
.allowCoreThreadTimeOut(true)
.threadFactory("client.dynamic.threadPool.change.config")
.rejected(new ThreadPoolExecutor.AbortPolicy())
.build();
/** /**
* Client dynamic thread pool subscription server configuration. * Client dynamic thread pool subscription server configuration.
* *
@ -198,7 +193,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
*/ */
protected void subscribeConfig(DynamicThreadPoolWrapper dynamicThreadPoolWrap) { protected void subscribeConfig(DynamicThreadPoolWrapper dynamicThreadPoolWrap) {
if (dynamicThreadPoolWrap.isSubscribeFlag()) { if (dynamicThreadPoolWrap.isSubscribeFlag()) {
threadPoolOperation.subscribeConfig(dynamicThreadPoolWrap.getThreadPoolId(), executorService, config -> threadPoolDynamicRefresh.dynamicRefresh(config)); threadPoolOperation.subscribeConfig(dynamicThreadPoolWrap.getThreadPoolId(), configRefreshExecutorService, config -> threadPoolDynamicRefresh.dynamicRefresh(config));
} }
} }
} }

Loading…
Cancel
Save