Spring post processor logic refactoring (#874) (#905)

pull/906/head
马称 Ma Chen 2 years ago committed by GitHub
parent bcac0ac45e
commit 517779cf8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -20,6 +20,7 @@ package cn.hippo4j.config.springboot.starter.support;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.toolkit.ReflectUtil;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.config.springboot.starter.config.BootstrapConfigProperties;
import cn.hippo4j.config.springboot.starter.config.DynamicThreadPoolNotifyProperties;
@ -31,14 +32,12 @@ import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapterChoose;
import cn.hippo4j.core.provider.CommonDynamicThreadPoolProviderFactory;
import cn.hippo4j.core.toolkit.DynamicThreadPoolAnnotationUtil;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.task.TaskDecorator;
import java.util.Objects;
import java.util.Optional;
@ -96,11 +95,11 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
/**
* Fill the thread pool and register.
*
* @param dynamicThreadPoolWrapper
* @param dynamicThreadPoolWrapper dynamic thread-pool wrapper
*/
protected ThreadPoolExecutor fillPoolAndRegister(DynamicThreadPoolWrapper dynamicThreadPoolWrapper) {
String threadPoolId = dynamicThreadPoolWrapper.getThreadPoolId();
ThreadPoolExecutor newDynamicPoolExecutor = dynamicThreadPoolWrapper.getExecutor();
ThreadPoolExecutor executor = dynamicThreadPoolWrapper.getExecutor();
ExecutorProperties executorProperties = null;
if (configProperties.getExecutors() != null) {
executorProperties = configProperties.getExecutors()
@ -109,39 +108,42 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
.findFirst()
.orElseThrow(() -> new RuntimeException("The thread pool id does not exist in the configuration."));
try {
newDynamicPoolExecutor = buildNewDynamicThreadPool(executorProperties);
threadPoolParamReplace(executor, executorProperties);
} catch (Exception ex) {
log.error("Failed to initialize thread pool configuration. error: {}", ex);
log.error("Failed to initialize thread pool configuration.", ex);
} finally {
if (Objects.isNull(dynamicThreadPoolWrapper.getExecutor())) {
dynamicThreadPoolWrapper.setExecutor(CommonDynamicThreadPoolProviderFactory.getInstance(threadPoolId));
}
dynamicThreadPoolWrapper.setInitFlag(Boolean.TRUE);
}
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = buildThreadPoolNotifyAlarm(executorProperties);
GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm);
DynamicThreadPoolExecutor actualDynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) dynamicThreadPoolWrapper.getExecutor();
TaskDecorator taskDecorator = actualDynamicThreadPoolExecutor.getTaskDecorator();
((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskDecorator(taskDecorator);
long awaitTerminationMillis = actualDynamicThreadPoolExecutor.getAwaitTerminationMillis();
boolean waitForTasksToCompleteOnShutdown = actualDynamicThreadPoolExecutor.isWaitForTasksToCompleteOnShutdown();
((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown);
dynamicThreadPoolWrapper.setExecutor(newDynamicPoolExecutor);
}
GlobalThreadPoolManage.registerPool(dynamicThreadPoolWrapper.getThreadPoolId(), dynamicThreadPoolWrapper);
GlobalCoreThreadPoolManage.register(
threadPoolId,
executorProperties == null
? buildExecutorProperties(threadPoolId, newDynamicPoolExecutor)
? buildDefaultExecutorProperties(threadPoolId, executor)
: buildActualExecutorProperties(executorProperties));
return newDynamicPoolExecutor;
return executor;
}
/**
* Build actual executor properties.
*
* @param executorProperties executor properties
* @return executor properties
*/
private ExecutorProperties buildActualExecutorProperties(ExecutorProperties executorProperties) {
return Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> buildExecutorProperties(executorProperties)).orElse(executorProperties);
}
private ExecutorProperties buildExecutorProperties(String threadPoolId, ThreadPoolExecutor executor) {
/**
* Build default executor properties.
*
* @param threadPoolId thread-pool id
* @param executor dynamic thread-pool executor
* @return executor properties
*/
private ExecutorProperties buildDefaultExecutorProperties(String threadPoolId, ThreadPoolExecutor executor) {
ExecutorProperties executorProperties = new ExecutorProperties();
BlockingQueue<Runnable> blockingQueue = executor.getQueue();
int queueSize = blockingQueue.size();
@ -160,6 +162,12 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
return executorProperties;
}
/**
* Build new dynamic thread-pool.
*
* @param executorProperties executor properties
* @return thread-pool executor
*/
private ThreadPoolExecutor buildNewDynamicThreadPool(ExecutorProperties executorProperties) {
String threadNamePrefix = executorProperties.getThreadNamePrefix();
ExecutorProperties newExecutorProperties = buildExecutorProperties(executorProperties);
@ -177,6 +185,28 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
return newDynamicPoolExecutor;
}
/**
* Thread-pool param replace.
*
* @param executor dynamic thread-pool executor
* @param executorProperties executor properties
*/
private void threadPoolParamReplace(ThreadPoolExecutor executor, ExecutorProperties executorProperties) {
BlockingQueue workQueue = BlockingQueueTypeEnum.createBlockingQueue(executorProperties.getBlockingQueue(), executorProperties.getQueueCapacity());
ReflectUtil.setFieldValue(executor, "workQueue", workQueue);
executor.setCorePoolSize(executorProperties.getCorePoolSize());
executor.setMaximumPoolSize(executorProperties.getMaximumPoolSize());
executor.setKeepAliveTime(executorProperties.getKeepAliveTime(), TimeUnit.SECONDS);
executor.allowCoreThreadTimeOut(executorProperties.getAllowCoreThreadTimeOut());
executor.setRejectedExecutionHandler(RejectedPolicyTypeEnum.createPolicy(executorProperties.getRejectedHandler()));
}
/**
* Build executor properties.
*
* @param executorProperties executor properties
* @return executor properties
*/
private ExecutorProperties buildExecutorProperties(ExecutorProperties executorProperties) {
ExecutorProperties newExecutorProperties = ExecutorProperties.builder()
.corePoolSize(Optional.ofNullable(executorProperties.getCorePoolSize())
@ -201,6 +231,12 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
return newExecutorProperties;
}
/**
* Build thread-pool notify alarm
*
* @param executorProperties executor properties
* @return thread-pool notify alarm
*/
private ThreadPoolNotifyAlarm buildThreadPoolNotifyAlarm(ExecutorProperties executorProperties) {
DynamicThreadPoolNotifyProperties notify = Optional.ofNullable(executorProperties).map(ExecutorProperties::getNotify).orElse(null);
boolean isAlarm = Optional.ofNullable(executorProperties.getAlarm())

@ -27,15 +27,14 @@ import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterParameter;
import cn.hippo4j.common.model.register.DynamicThreadPoolRegisterWrapper;
import cn.hippo4j.common.toolkit.BooleanUtil;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.common.toolkit.ReflectUtil;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.core.executor.DynamicThreadPool;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapterChoose;
import cn.hippo4j.core.provider.CommonDynamicThreadPoolProviderFactory;
import cn.hippo4j.core.toolkit.DynamicThreadPoolAnnotationUtil;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import cn.hippo4j.springboot.starter.config.BootstrapProperties;
@ -45,12 +44,10 @@ import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.task.TaskDecorator;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@ -114,7 +111,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
/**
* Register and subscribe.
*
* @param dynamicThreadPoolWrapper
* @param dynamicThreadPoolWrapper dynamic thread-pool wrapper
*/
protected void registerAndSubscribe(DynamicThreadPoolWrapper dynamicThreadPoolWrapper) {
fillPoolAndRegister(dynamicThreadPoolWrapper);
@ -124,7 +121,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
/**
* Fill the thread pool and register.
*
* @param dynamicThreadPoolWrapper
* @param dynamicThreadPoolWrapper dynamic thread-pool wrapper
*/
protected ThreadPoolExecutor fillPoolAndRegister(DynamicThreadPoolWrapper dynamicThreadPoolWrapper) {
String threadPoolId = dynamicThreadPoolWrapper.getThreadPoolId();
@ -133,42 +130,14 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
queryStrMap.put(TP_ID, threadPoolId);
queryStrMap.put(ITEM_ID, properties.getItemId());
queryStrMap.put(NAMESPACE, properties.getNamespace());
ThreadPoolExecutor newDynamicThreadPoolExecutor = null;
ThreadPoolParameterInfo threadPoolParameterInfo = new ThreadPoolParameterInfo();
try {
Result result = httpAgent.httpGetByConfig(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 5000L);
if (result.isSuccess() && result.getData() != null) {
String resultJsonStr = JSONUtil.toJSONString(result.getData());
if ((threadPoolParameterInfo = JSONUtil.parseObject(resultJsonStr, ThreadPoolParameterInfo.class)) != null) {
// Create a thread pool with relevant parameters.
BlockingQueue workQueue = BlockingQueueTypeEnum.createBlockingQueue(threadPoolParameterInfo.getQueueType(), threadPoolParameterInfo.getCapacity());
newDynamicThreadPoolExecutor = ThreadPoolBuilder.builder()
.dynamicPool()
.threadPoolId(threadPoolId)
.workQueue(workQueue)
.threadFactory(executor.getThreadFactory())
.poolThreadSize(threadPoolParameterInfo.corePoolSizeAdapt(), threadPoolParameterInfo.maximumPoolSizeAdapt())
.keepAliveTime(threadPoolParameterInfo.getKeepAliveTime(), TimeUnit.SECONDS)
.rejected(RejectedPolicyTypeEnum.createPolicy(threadPoolParameterInfo.getRejectedType()))
.allowCoreThreadTimeOut(EnableEnum.getBool(threadPoolParameterInfo.getAllowCoreThreadTimeOut()))
.build();
// Set dynamic thread pool enhancement parameters.
if (executor instanceof DynamicThreadPoolExecutor) {
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(
BooleanUtil.toBoolean(threadPoolParameterInfo.getIsAlarm().toString()),
threadPoolParameterInfo.getLivenessAlarm(),
threadPoolParameterInfo.getCapacityAlarm());
GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm);
TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) executor).getTaskDecorator();
((DynamicThreadPoolExecutor) newDynamicThreadPoolExecutor).setTaskDecorator(taskDecorator);
long awaitTerminationMillis = ((DynamicThreadPoolExecutor) executor).getAwaitTerminationMillis();
boolean waitForTasksToCompleteOnShutdown = ((DynamicThreadPoolExecutor) executor).isWaitForTasksToCompleteOnShutdown();
((DynamicThreadPoolExecutor) newDynamicThreadPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown);
long executeTimeOut = Optional.ofNullable(threadPoolParameterInfo.getExecuteTimeOut())
.orElse(((DynamicThreadPoolExecutor) executor).getExecuteTimeOut());
((DynamicThreadPoolExecutor) newDynamicThreadPoolExecutor).setExecuteTimeOut(executeTimeOut);
}
dynamicThreadPoolWrapper.setExecutor(newDynamicThreadPoolExecutor);
threadPoolParamReplace(executor, threadPoolParameterInfo);
registerNotifyAlarm(threadPoolParameterInfo);
}
} else {
// DynamicThreadPool configuration undefined in server
@ -191,22 +160,46 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
GlobalThreadPoolManage.dynamicRegister(registerWrapper);
}
} catch (Exception ex) {
newDynamicThreadPoolExecutor = executor != null ? executor : CommonDynamicThreadPoolProviderFactory.getInstance(threadPoolId);
dynamicThreadPoolWrapper.setExecutor(newDynamicThreadPoolExecutor);
log.error("Failed to initialize thread pool configuration. error message: {}", ex.getMessage());
} finally {
if (Objects.isNull(executor)) {
dynamicThreadPoolWrapper.setExecutor(CommonDynamicThreadPoolProviderFactory.getInstance(threadPoolId));
}
}
GlobalThreadPoolManage.register(dynamicThreadPoolWrapper.getThreadPoolId(), threadPoolParameterInfo, dynamicThreadPoolWrapper);
return newDynamicThreadPoolExecutor;
return executor;
}
/**
* Thread-pool param replace.
*
* @param executor dynamic thread-pool executor
* @param threadPoolParameterInfo thread-pool parameter info
*/
private void threadPoolParamReplace(ThreadPoolExecutor executor, ThreadPoolParameterInfo threadPoolParameterInfo) {
BlockingQueue workQueue = BlockingQueueTypeEnum.createBlockingQueue(threadPoolParameterInfo.getQueueType(), threadPoolParameterInfo.getCapacity());
ReflectUtil.setFieldValue(executor, "workQueue", workQueue);
executor.setCorePoolSize(threadPoolParameterInfo.corePoolSizeAdapt());
executor.setMaximumPoolSize(threadPoolParameterInfo.maximumPoolSizeAdapt());
executor.setKeepAliveTime(threadPoolParameterInfo.getKeepAliveTime(), TimeUnit.SECONDS);
executor.allowCoreThreadTimeOut(EnableEnum.getBool(threadPoolParameterInfo.getAllowCoreThreadTimeOut()));
executor.setRejectedExecutionHandler(RejectedPolicyTypeEnum.createPolicy(threadPoolParameterInfo.getRejectedType()));
}
/**
* Register notify alarm.
*
* @param threadPoolParameterInfo thread-pool parameter info
*/
private void registerNotifyAlarm(ThreadPoolParameterInfo threadPoolParameterInfo) {
// Set dynamic thread pool enhancement parameters.
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(
BooleanUtil.toBoolean(threadPoolParameterInfo.getIsAlarm().toString()),
threadPoolParameterInfo.getLivenessAlarm(),
threadPoolParameterInfo.getCapacityAlarm());
GlobalNotifyAlarmManage.put(threadPoolParameterInfo.getTpId(), threadPoolNotifyAlarm);
}
/**
* Client dynamic thread pool subscription server configuration.
*
* @param dynamicThreadPoolWrapper
* @param dynamicThreadPoolWrapper dynamic thread-pool wrapper
*/
protected void subscribeConfig(DynamicThreadPoolWrapper dynamicThreadPoolWrapper) {
if (dynamicThreadPoolWrapper.isSubscribeFlag()) {

Loading…
Cancel
Save