From 58a944a24897ff16bdf141052ebb65dc7f5ec68a Mon Sep 17 00:00:00 2001 From: "chen.ma" Date: Mon, 29 Aug 2022 19:54:44 +0800 Subject: [PATCH] Optimize spring thread pool adaptation --- .../adpter/ThreadPoolTaskExecutorAdapter.java | 20 ++++++++++--------- .../core/config/DynamicThreadPoolConfig.java | 8 +++----- .../DynamicThreadPoolPostProcessor.java | 6 +----- .../DynamicThreadPoolPostProcessor.java | 7 ++----- 4 files changed, 17 insertions(+), 24 deletions(-) diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/adpter/ThreadPoolTaskExecutorAdapter.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/adpter/ThreadPoolTaskExecutorAdapter.java index 2739bcef..3dea2eb1 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/adpter/ThreadPoolTaskExecutorAdapter.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/adpter/ThreadPoolTaskExecutorAdapter.java @@ -23,6 +23,7 @@ import cn.hippo4j.core.executor.support.ThreadPoolBuilder; import org.springframework.core.task.TaskDecorator; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; @@ -45,9 +46,12 @@ public class ThreadPoolTaskExecutorAdapter implements DynamicThreadPoolAdapter { private static final String QUEUE_CAPACITY = "queueCapacity"; + private static String MATCH_CLASS_NAME = "ThreadPoolTaskExecutor"; + @Override public boolean match(Object executor) { - return executor instanceof ThreadPoolTaskExecutor; + // Adapt to lower versions of spring. + return Objects.equals(MATCH_CLASS_NAME, executor.getClass().getSimpleName()); } @Override @@ -66,11 +70,12 @@ public class ThreadPoolTaskExecutorAdapter implements DynamicThreadPoolAdapter { long awaitTerminationMillis = (long) ReflectUtil.getFieldValue(executor, AWAIT_TERMINATION_MILLIS); String beanName = (String) ReflectUtil.getFieldValue(executor, BEAN_NAME); int queueCapacity = (int) ReflectUtil.getFieldValue(executor, QUEUE_CAPACITY); - ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) unwrap; ThreadPoolTaskExecutor threadPoolTaskExecutor = (ThreadPoolTaskExecutor) executor; - // Spring ThreadPoolTaskExecutor to DynamicThreadPoolExecutor - // ThreadPoolTaskExecutor not support executeTimeOut + /** + * Spring {@link ThreadPoolTaskExecutor} to {@link DynamicThreadPoolExecutor}, + * {@link ThreadPoolTaskExecutor} not support {@link DynamicThreadPoolExecutor#executeTimeOut}. + */ ThreadPoolBuilder threadPoolBuilder = ThreadPoolBuilder.builder() .dynamicPool() .corePoolSize(threadPoolTaskExecutor.getCorePoolSize()) @@ -81,16 +86,13 @@ public class ThreadPoolTaskExecutorAdapter implements DynamicThreadPoolAdapter { .waitForTasksToCompleteOnShutdown(waitForTasksToCompleteOnShutdown) .awaitTerminationMillis(awaitTerminationMillis) .threadFactory(threadPoolExecutor.getThreadFactory()) - // threadPoolId default beanName + // Thread-pool id default bean name. .threadPoolId(beanName) .rejected(threadPoolExecutor.getRejectedExecutionHandler()); - // use new Queue + // Use new blocking queue. threadPoolBuilder.capacity(queueCapacity); - // .workQueue(threadPoolExecutor.getQueue()) - Optional.ofNullable(ReflectUtil.getFieldValue(executor, TASK_DECORATOR)) .ifPresent((taskDecorator) -> threadPoolBuilder.taskDecorator((TaskDecorator) taskDecorator)); - return (DynamicThreadPoolExecutor) threadPoolBuilder.build(); } diff --git a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/config/DynamicThreadPoolConfig.java b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/config/DynamicThreadPoolConfig.java index dc8d9596..d9246917 100644 --- a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/config/DynamicThreadPoolConfig.java +++ b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/config/DynamicThreadPoolConfig.java @@ -80,17 +80,15 @@ public class DynamicThreadPoolConfig { @Bean @DynamicThreadPool - public ThreadPoolTaskExecutor testThreadPoolTaskExecutor() { + public ThreadPoolTaskExecutor testSpringThreadPoolTaskExecutor() { ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); - threadPoolTaskExecutor.setThreadNamePrefix("66666-"); - final int maxQueueCapacity = 200; + threadPoolTaskExecutor.setThreadNamePrefix("test-spring-task-executor_"); + int maxQueueCapacity = 200; threadPoolTaskExecutor.setCorePoolSize(AVAILABLE_PROCESSORS * 2); threadPoolTaskExecutor.setMaxPoolSize(AVAILABLE_PROCESSORS * 4); threadPoolTaskExecutor.setQueueCapacity(maxQueueCapacity); threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); - // threadPoolTaskExecutor.setTaskDecorator(new TaskDecoratorTest.ContextCopyingDecorator()); - return threadPoolTaskExecutor; } } diff --git a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolPostProcessor.java b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolPostProcessor.java index 553ab31b..907c3c5f 100644 --- a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolPostProcessor.java +++ b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolPostProcessor.java @@ -84,11 +84,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { DynamicThreadPoolWrapper wrap = new DynamicThreadPoolWrapper(dynamicThreadPoolExecutor.getThreadPoolId(), dynamicThreadPoolExecutor); ThreadPoolExecutor remoteThreadPoolExecutor = fillPoolAndRegister(wrap); DynamicThreadPoolAdapterChoose.replace(bean, remoteThreadPoolExecutor); - if (DynamicThreadPoolAdapterChoose.match(bean)) { - return bean; - } else { - return remoteThreadPoolExecutor; - } + return DynamicThreadPoolAdapterChoose.match(bean) ? bean : remoteThreadPoolExecutor; } if (bean instanceof DynamicThreadPoolWrapper) { DynamicThreadPoolWrapper wrap = (DynamicThreadPoolWrapper) bean; diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java index 4f6ecb76..8c7e73e1 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java @@ -97,11 +97,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { ThreadPoolExecutor remoteThreadPoolExecutor = fillPoolAndRegister(dynamicThreadPoolWrapper); DynamicThreadPoolAdapterChoose.replace(bean, remoteThreadPoolExecutor); subscribeConfig(dynamicThreadPoolWrapper); - if (DynamicThreadPoolAdapterChoose.match(bean)) { - return bean; - } else { - return remoteThreadPoolExecutor; - } + return DynamicThreadPoolAdapterChoose.match(bean) ? bean : remoteThreadPoolExecutor; } if (bean instanceof DynamicThreadPoolWrapper) { DynamicThreadPoolWrapper dynamicThreadPoolWrapper = (DynamicThreadPoolWrapper) bean; @@ -144,6 +140,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { BlockingQueue workQueue = BlockingQueueTypeEnum.createBlockingQueue(threadPoolParameterInfo.getQueueType(), threadPoolParameterInfo.getCapacity()); newDynamicThreadPoolExecutor = ThreadPoolBuilder.builder() .dynamicPool() + .threadPoolId(threadPoolId) .workQueue(workQueue) .threadFactory(executor.getThreadFactory()) .poolThreadSize(threadPoolParameterInfo.corePoolSizeAdapt(), threadPoolParameterInfo.maximumPoolSizeAdapt())