Optimize spring thread pool adaptation

pull/615/head
chen.ma 3 years ago
parent 58a944a248
commit 622e405481

@ -78,7 +78,12 @@ public class DynamicThreadPoolConfig {
return produceExecutor; return produceExecutor;
} }
@Bean /**
* Test spring {@link ThreadPoolTaskExecutor}, Thread-pool id: testSpringThreadPoolTaskExecutor
*
* @return
*/
// @Bean
@DynamicThreadPool @DynamicThreadPool
public ThreadPoolTaskExecutor testSpringThreadPoolTaskExecutor() { public ThreadPoolTaskExecutor testSpringThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();

@ -20,14 +20,15 @@ package cn.hippo4j.example.core.inittest;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC; import org.slf4j.MDC;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.util.Random; import java.util.Random;
import java.util.concurrent.*; import java.util.concurrent.Executor;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static cn.hippo4j.common.constant.Constants.EXECUTE_TIMEOUT_TRACE; import static cn.hippo4j.common.constant.Constants.EXECUTE_TIMEOUT_TRACE;
@ -44,8 +45,8 @@ public class RunStateHandlerTest {
@Resource @Resource
private ThreadPoolExecutor messageProduceDynamicThreadPool; private ThreadPoolExecutor messageProduceDynamicThreadPool;
@Resource /*@Resource
private ThreadPoolTaskExecutor testThreadPoolTaskExecutor; private ThreadPoolTaskExecutor testSpringThreadPoolTaskExecutor;*/
private final ThreadPoolExecutor runStateHandlerTestExecutor = new ThreadPoolExecutor( private final ThreadPoolExecutor runStateHandlerTestExecutor = new ThreadPoolExecutor(
4, 4,
@ -68,7 +69,7 @@ public class RunStateHandlerTest {
// Start the dynamic thread pool to simulate running tasks // Start the dynamic thread pool to simulate running tasks
runTask(messageConsumeTtlDynamicThreadPool); runTask(messageConsumeTtlDynamicThreadPool);
runTask(messageProduceDynamicThreadPool); runTask(messageProduceDynamicThreadPool);
runTask(testThreadPoolTaskExecutor); // runTask(testThreadPoolTaskExecutor);
// Dynamically register thread pool // Dynamically register thread pool
ThreadPoolExecutor registerDynamicThreadPool = RegisterDynamicThreadPoolTest.registerDynamicThreadPool("auto-register-dynamic-thread-pool"); ThreadPoolExecutor registerDynamicThreadPool = RegisterDynamicThreadPoolTest.registerDynamicThreadPool("auto-register-dynamic-thread-pool");
runTask(registerDynamicThreadPool); runTask(registerDynamicThreadPool);

@ -164,14 +164,15 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
String threadNamePrefix = executorProperties.getThreadNamePrefix(); String threadNamePrefix = executorProperties.getThreadNamePrefix();
ExecutorProperties newExecutorProperties = buildExecutorProperties(executorProperties); ExecutorProperties newExecutorProperties = buildExecutorProperties(executorProperties);
ThreadPoolExecutor newDynamicPoolExecutor = ThreadPoolBuilder.builder() ThreadPoolExecutor newDynamicPoolExecutor = ThreadPoolBuilder.builder()
.dynamicPool() .threadPoolId(executorProperties.getThreadPoolId())
.threadFactory(StringUtil.isNotBlank(threadNamePrefix) ? threadNamePrefix : executorProperties.getThreadPoolId())
.poolThreadSize(newExecutorProperties.getCorePoolSize(), newExecutorProperties.getMaximumPoolSize()) .poolThreadSize(newExecutorProperties.getCorePoolSize(), newExecutorProperties.getMaximumPoolSize())
.workQueue(BlockingQueueTypeEnum.createBlockingQueue(newExecutorProperties.getBlockingQueue(), newExecutorProperties.getQueueCapacity())) .workQueue(BlockingQueueTypeEnum.createBlockingQueue(newExecutorProperties.getBlockingQueue(), newExecutorProperties.getQueueCapacity()))
.threadFactory(StringUtil.isNotBlank(threadNamePrefix) ? threadNamePrefix : executorProperties.getThreadPoolId())
.executeTimeOut(newExecutorProperties.getExecuteTimeOut()) .executeTimeOut(newExecutorProperties.getExecuteTimeOut())
.keepAliveTime(newExecutorProperties.getKeepAliveTime(), TimeUnit.SECONDS) .keepAliveTime(newExecutorProperties.getKeepAliveTime(), TimeUnit.SECONDS)
.rejected(RejectedPolicyTypeEnum.createPolicy(newExecutorProperties.getRejectedHandler())) .rejected(RejectedPolicyTypeEnum.createPolicy(newExecutorProperties.getRejectedHandler()))
.allowCoreThreadTimeOut(newExecutorProperties.getAllowCoreThreadTimeOut()) .allowCoreThreadTimeOut(newExecutorProperties.getAllowCoreThreadTimeOut())
.dynamicPool()
.build(); .build();
return newDynamicPoolExecutor; return newDynamicPoolExecutor;
} }

Loading…
Cancel
Save