pull/1460/merge
lazy 2 years ago committed by GitHub
commit 1cb3a6625a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -71,7 +71,7 @@ public class DynamicThreadPoolAutoConfiguration {
@Bean @Bean
@ConditionalOnMissingBean @ConditionalOnMissingBean
@Order(Ordered.HIGHEST_PRECEDENCE) @Order(Ordered.HIGHEST_PRECEDENCE)
public ApplicationContextHolder hippo4JApplicationContextHolder() { public ApplicationContextHolder hippo4jApplicationContextHolder() {
return new ApplicationContextHolder(); return new ApplicationContextHolder();
} }
@ -93,7 +93,7 @@ public class DynamicThreadPoolAutoConfiguration {
} }
@Bean @Bean
public DynamicThreadPoolPostProcessor dynamicThreadPoolPostProcessor(ApplicationContextHolder hippo4JApplicationContextHolder) { public DynamicThreadPoolPostProcessor dynamicThreadPoolPostProcessor() {
return new DynamicThreadPoolPostProcessor(bootstrapConfigProperties); return new DynamicThreadPoolPostProcessor(bootstrapConfigProperties);
} }

@ -61,15 +61,14 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
@Override @Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DynamicThreadPoolExecutor || DynamicThreadPoolAdapterChoose.match(bean)) { if (bean instanceof DynamicThreadPoolExecutor || DynamicThreadPoolAdapterChoose.match(bean)) {
DynamicThreadPool dynamicThreadPool;
try { try {
dynamicThreadPool = ApplicationContextHolder.findAnnotationOnBean(beanName, DynamicThreadPool.class); DynamicThreadPool dynamicThreadPool =
Optional.ofNullable(ApplicationContextHolder.findAnnotationOnBean(beanName,
DynamicThreadPool.class))
.orElse(DynamicThreadPoolAnnotationUtil.findAnnotationOnBean(beanName,
DynamicThreadPool.class));
if (Objects.isNull(dynamicThreadPool)) { if (Objects.isNull(dynamicThreadPool)) {
// Adapt to lower versions of SpringBoot. return bean;
dynamicThreadPool = DynamicThreadPoolAnnotationUtil.findAnnotationOnBean(beanName, DynamicThreadPool.class);
if (Objects.isNull(dynamicThreadPool)) {
return bean;
}
} }
} catch (Exception ex) { } catch (Exception ex) {
log.error("Failed to create dynamic thread pool in annotation mode.", ex); log.error("Failed to create dynamic thread pool in annotation mode.", ex);
@ -79,7 +78,8 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
if ((dynamicThreadPoolExecutor = DynamicThreadPoolAdapterChoose.unwrap(bean)) == null) { if ((dynamicThreadPoolExecutor = DynamicThreadPoolAdapterChoose.unwrap(bean)) == null) {
dynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) bean; dynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) bean;
} }
DynamicThreadPoolWrapper wrap = new DynamicThreadPoolWrapper(dynamicThreadPoolExecutor.getThreadPoolId(), dynamicThreadPoolExecutor); DynamicThreadPoolWrapper wrap = new DynamicThreadPoolWrapper(dynamicThreadPoolExecutor.getThreadPoolId(),
dynamicThreadPoolExecutor);
ThreadPoolExecutor remoteThreadPoolExecutor = fillPoolAndRegister(wrap); ThreadPoolExecutor remoteThreadPoolExecutor = fillPoolAndRegister(wrap);
DynamicThreadPoolAdapterChoose.replace(bean, remoteThreadPoolExecutor); DynamicThreadPoolAdapterChoose.replace(bean, remoteThreadPoolExecutor);
return DynamicThreadPoolAdapterChoose.match(bean) ? bean : remoteThreadPoolExecutor; return DynamicThreadPoolAdapterChoose.match(bean) ? bean : remoteThreadPoolExecutor;
@ -168,7 +168,8 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
* @param executorProperties executor properties * @param executorProperties executor properties
*/ */
private void threadPoolParamReplace(ThreadPoolExecutor executor, ExecutorProperties executorProperties) { private void threadPoolParamReplace(ThreadPoolExecutor executor, ExecutorProperties executorProperties) {
BlockingQueue workQueue = BlockingQueueTypeEnum.createBlockingQueue(executorProperties.getBlockingQueue(), executorProperties.getQueueCapacity()); BlockingQueue workQueue = BlockingQueueTypeEnum.createBlockingQueue(executorProperties.getBlockingQueue(),
executorProperties.getQueueCapacity());
ReflectUtil.setFieldValue(executor, "workQueue", workQueue); ReflectUtil.setFieldValue(executor, "workQueue", workQueue);
executor.setCorePoolSize(executorProperties.getCorePoolSize()); executor.setCorePoolSize(executorProperties.getCorePoolSize());
executor.setMaximumPoolSize(executorProperties.getMaximumPoolSize()); executor.setMaximumPoolSize(executorProperties.getMaximumPoolSize());
@ -205,7 +206,8 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getQueueCapacity()).get())) .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getQueueCapacity()).get()))
.rejectedHandler(Optional.ofNullable(executorProperties.getRejectedHandler()) .rejectedHandler(Optional.ofNullable(executorProperties.getRejectedHandler())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getRejectedHandler()).get())) .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(each -> each.getRejectedHandler()).get()))
.threadNamePrefix(StringUtil.isBlank(executorProperties.getThreadNamePrefix()) ? executorProperties.getThreadPoolId() : executorProperties.getThreadNamePrefix()) .threadNamePrefix(StringUtil.isBlank(executorProperties.getThreadNamePrefix()) ?
executorProperties.getThreadPoolId() : executorProperties.getThreadNamePrefix())
.threadPoolId(executorProperties.getThreadPoolId()) .threadPoolId(executorProperties.getThreadPoolId())
.build(); .build();
return newExecutorProperties; return newExecutorProperties;
@ -218,7 +220,8 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
* @return thread-pool notify alarm * @return thread-pool notify alarm
*/ */
private ThreadPoolNotifyAlarm buildThreadPoolNotifyAlarm(ExecutorProperties executorProperties) { private ThreadPoolNotifyAlarm buildThreadPoolNotifyAlarm(ExecutorProperties executorProperties) {
DynamicThreadPoolNotifyProperties notify = Optional.ofNullable(executorProperties).map(ExecutorProperties::getNotify).orElse(null); DynamicThreadPoolNotifyProperties notify =
Optional.ofNullable(executorProperties).map(ExecutorProperties::getNotify).orElse(null);
boolean isAlarm = Optional.ofNullable(executorProperties.getAlarm()) boolean isAlarm = Optional.ofNullable(executorProperties.getAlarm())
.orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getAlarm).orElse(true)); .orElseGet(() -> Optional.ofNullable(configProperties.getDefaultExecutor()).map(ExecutorProperties::getAlarm).orElse(true));
int activeAlarm = Optional.ofNullable(executorProperties.getActiveAlarm()) int activeAlarm = Optional.ofNullable(executorProperties.getActiveAlarm())

@ -98,7 +98,7 @@ public class DynamicThreadPoolAutoConfiguration {
@Bean @Bean
@ConditionalOnMissingBean @ConditionalOnMissingBean
@Order(Ordered.HIGHEST_PRECEDENCE) @Order(Ordered.HIGHEST_PRECEDENCE)
public ApplicationContextHolder hippo4JApplicationContextHolder() { public ApplicationContextHolder hippo4jApplicationContextHolder() {
return new ApplicationContextHolder(); return new ApplicationContextHolder();
} }

@ -78,15 +78,14 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
@Override @Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DynamicThreadPoolExecutor || DynamicThreadPoolAdapterChoose.match(bean)) { if (bean instanceof DynamicThreadPoolExecutor || DynamicThreadPoolAdapterChoose.match(bean)) {
DynamicThreadPool dynamicThreadPool;
try { try {
dynamicThreadPool = ApplicationContextHolder.findAnnotationOnBean(beanName, DynamicThreadPool.class); DynamicThreadPool dynamicThreadPool =
Optional.ofNullable(ApplicationContextHolder.findAnnotationOnBean(beanName,
DynamicThreadPool.class))
.orElse(DynamicThreadPoolAnnotationUtil.findAnnotationOnBean(beanName,
DynamicThreadPool.class));
if (Objects.isNull(dynamicThreadPool)) { if (Objects.isNull(dynamicThreadPool)) {
// Adapt to lower versions of SpringBoot. return bean;
dynamicThreadPool = DynamicThreadPoolAnnotationUtil.findAnnotationOnBean(beanName, DynamicThreadPool.class);
if (Objects.isNull(dynamicThreadPool)) {
return bean;
}
} }
} catch (Exception ex) { } catch (Exception ex) {
log.error("Failed to create dynamic thread pool in annotation mode.", ex); log.error("Failed to create dynamic thread pool in annotation mode.", ex);
@ -96,7 +95,9 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
if ((dynamicThreadPoolExecutor = DynamicThreadPoolAdapterChoose.unwrap(bean)) == null) { if ((dynamicThreadPoolExecutor = DynamicThreadPoolAdapterChoose.unwrap(bean)) == null) {
dynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) bean; dynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) bean;
} }
DynamicThreadPoolWrapper dynamicThreadPoolWrapper = new DynamicThreadPoolWrapper(dynamicThreadPoolExecutor.getThreadPoolId(), dynamicThreadPoolExecutor); DynamicThreadPoolWrapper dynamicThreadPoolWrapper =
new DynamicThreadPoolWrapper(dynamicThreadPoolExecutor.getThreadPoolId(),
dynamicThreadPoolExecutor);
ThreadPoolExecutor remoteThreadPoolExecutor = fillPoolAndRegister(dynamicThreadPoolWrapper); ThreadPoolExecutor remoteThreadPoolExecutor = fillPoolAndRegister(dynamicThreadPoolWrapper);
DynamicThreadPoolAdapterChoose.replace(bean, remoteThreadPoolExecutor); DynamicThreadPoolAdapterChoose.replace(bean, remoteThreadPoolExecutor);
subscribeConfig(dynamicThreadPoolWrapper); subscribeConfig(dynamicThreadPoolWrapper);
@ -165,7 +166,8 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
} catch (Exception ex) { } catch (Exception ex) {
log.error("Failed to initialize thread pool configuration. error message: {}", ex.getMessage()); log.error("Failed to initialize thread pool configuration. error message: {}", ex.getMessage());
} }
GlobalThreadPoolManage.register(dynamicThreadPoolWrapper.getThreadPoolId(), threadPoolParameterInfo, dynamicThreadPoolWrapper); GlobalThreadPoolManage.register(dynamicThreadPoolWrapper.getThreadPoolId(), threadPoolParameterInfo,
dynamicThreadPoolWrapper);
return executor; return executor;
} }
@ -176,7 +178,8 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
* @param threadPoolParameterInfo thread-pool parameter info * @param threadPoolParameterInfo thread-pool parameter info
*/ */
private void threadPoolParamReplace(ThreadPoolExecutor executor, ThreadPoolParameterInfo threadPoolParameterInfo) { private void threadPoolParamReplace(ThreadPoolExecutor executor, ThreadPoolParameterInfo threadPoolParameterInfo) {
BlockingQueue workQueue = BlockingQueueTypeEnum.createBlockingQueue(threadPoolParameterInfo.getQueueType(), threadPoolParameterInfo.getCapacity()); BlockingQueue workQueue = BlockingQueueTypeEnum.createBlockingQueue(threadPoolParameterInfo.getQueueType(),
threadPoolParameterInfo.getCapacity());
ReflectUtil.setFieldValue(executor, "workQueue", workQueue); ReflectUtil.setFieldValue(executor, "workQueue", workQueue);
executor.setCorePoolSize(threadPoolParameterInfo.corePoolSizeAdapt()); executor.setCorePoolSize(threadPoolParameterInfo.corePoolSizeAdapt());
executor.setMaximumPoolSize(threadPoolParameterInfo.maximumPoolSizeAdapt()); executor.setMaximumPoolSize(threadPoolParameterInfo.maximumPoolSizeAdapt());

Loading…
Cancel
Save