Optimize spring thread pool adaptation

pull/615/head
chen.ma 2 years ago
parent 18177f3477
commit 58a944a248

@ -23,6 +23,7 @@ import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import org.springframework.core.task.TaskDecorator;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@ -45,9 +46,12 @@ public class ThreadPoolTaskExecutorAdapter implements DynamicThreadPoolAdapter {
private static final String QUEUE_CAPACITY = "queueCapacity";
private static String MATCH_CLASS_NAME = "ThreadPoolTaskExecutor";
@Override
public boolean match(Object executor) {
return executor instanceof ThreadPoolTaskExecutor;
// Adapt to lower versions of spring.
return Objects.equals(MATCH_CLASS_NAME, executor.getClass().getSimpleName());
}
@Override
@ -66,11 +70,12 @@ public class ThreadPoolTaskExecutorAdapter implements DynamicThreadPoolAdapter {
long awaitTerminationMillis = (long) ReflectUtil.getFieldValue(executor, AWAIT_TERMINATION_MILLIS);
String beanName = (String) ReflectUtil.getFieldValue(executor, BEAN_NAME);
int queueCapacity = (int) ReflectUtil.getFieldValue(executor, QUEUE_CAPACITY);
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) unwrap;
ThreadPoolTaskExecutor threadPoolTaskExecutor = (ThreadPoolTaskExecutor) executor;
// Spring ThreadPoolTaskExecutor to DynamicThreadPoolExecutor
// ThreadPoolTaskExecutor not support executeTimeOut
/**
* Spring {@link ThreadPoolTaskExecutor} to {@link DynamicThreadPoolExecutor},
* {@link ThreadPoolTaskExecutor} not support {@link DynamicThreadPoolExecutor#executeTimeOut}.
*/
ThreadPoolBuilder threadPoolBuilder = ThreadPoolBuilder.builder()
.dynamicPool()
.corePoolSize(threadPoolTaskExecutor.getCorePoolSize())
@ -81,16 +86,13 @@ public class ThreadPoolTaskExecutorAdapter implements DynamicThreadPoolAdapter {
.waitForTasksToCompleteOnShutdown(waitForTasksToCompleteOnShutdown)
.awaitTerminationMillis(awaitTerminationMillis)
.threadFactory(threadPoolExecutor.getThreadFactory())
// threadPoolId default beanName
// Thread-pool id default bean name.
.threadPoolId(beanName)
.rejected(threadPoolExecutor.getRejectedExecutionHandler());
// use new Queue
// Use new blocking queue.
threadPoolBuilder.capacity(queueCapacity);
// .workQueue(threadPoolExecutor.getQueue())
Optional.ofNullable(ReflectUtil.getFieldValue(executor, TASK_DECORATOR))
.ifPresent((taskDecorator) -> threadPoolBuilder.taskDecorator((TaskDecorator) taskDecorator));
return (DynamicThreadPoolExecutor) threadPoolBuilder.build();
}

@ -80,17 +80,15 @@ public class DynamicThreadPoolConfig {
@Bean
@DynamicThreadPool
public ThreadPoolTaskExecutor testThreadPoolTaskExecutor() {
public ThreadPoolTaskExecutor testSpringThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setThreadNamePrefix("66666-");
final int maxQueueCapacity = 200;
threadPoolTaskExecutor.setThreadNamePrefix("test-spring-task-executor_");
int maxQueueCapacity = 200;
threadPoolTaskExecutor.setCorePoolSize(AVAILABLE_PROCESSORS * 2);
threadPoolTaskExecutor.setMaxPoolSize(AVAILABLE_PROCESSORS * 4);
threadPoolTaskExecutor.setQueueCapacity(maxQueueCapacity);
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//
threadPoolTaskExecutor.setTaskDecorator(new TaskDecoratorTest.ContextCopyingDecorator());
return threadPoolTaskExecutor;
}
}

@ -84,11 +84,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
DynamicThreadPoolWrapper wrap = new DynamicThreadPoolWrapper(dynamicThreadPoolExecutor.getThreadPoolId(), dynamicThreadPoolExecutor);
ThreadPoolExecutor remoteThreadPoolExecutor = fillPoolAndRegister(wrap);
DynamicThreadPoolAdapterChoose.replace(bean, remoteThreadPoolExecutor);
if (DynamicThreadPoolAdapterChoose.match(bean)) {
return bean;
} else {
return remoteThreadPoolExecutor;
}
return DynamicThreadPoolAdapterChoose.match(bean) ? bean : remoteThreadPoolExecutor;
}
if (bean instanceof DynamicThreadPoolWrapper) {
DynamicThreadPoolWrapper wrap = (DynamicThreadPoolWrapper) bean;

@ -97,11 +97,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
ThreadPoolExecutor remoteThreadPoolExecutor = fillPoolAndRegister(dynamicThreadPoolWrapper);
DynamicThreadPoolAdapterChoose.replace(bean, remoteThreadPoolExecutor);
subscribeConfig(dynamicThreadPoolWrapper);
if (DynamicThreadPoolAdapterChoose.match(bean)) {
return bean;
} else {
return remoteThreadPoolExecutor;
}
return DynamicThreadPoolAdapterChoose.match(bean) ? bean : remoteThreadPoolExecutor;
}
if (bean instanceof DynamicThreadPoolWrapper) {
DynamicThreadPoolWrapper dynamicThreadPoolWrapper = (DynamicThreadPoolWrapper) bean;
@ -144,6 +140,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
BlockingQueue workQueue = BlockingQueueTypeEnum.createBlockingQueue(threadPoolParameterInfo.getQueueType(), threadPoolParameterInfo.getCapacity());
newDynamicThreadPoolExecutor = ThreadPoolBuilder.builder()
.dynamicPool()
.threadPoolId(threadPoolId)
.workQueue(workQueue)
.threadFactory(executor.getThreadFactory())
.poolThreadSize(threadPoolParameterInfo.corePoolSizeAdapt(), threadPoolParameterInfo.maximumPoolSizeAdapt())

Loading…
Cancel
Save