diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/AbstractBuildThreadPoolTemplate.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/AbstractBuildThreadPoolTemplate.java index 61f34eec..a1d1975b 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/AbstractBuildThreadPoolTemplate.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/AbstractBuildThreadPoolTemplate.java @@ -145,11 +145,14 @@ public class AbstractBuildThreadPoolTemplate { private Boolean allowCoreThreadTimeOut = false; public ThreadPoolInitParam(String threadNamePrefix, boolean isDaemon) { - this.threadPoolId = threadNamePrefix; this.threadFactory = ThreadFactoryBuilder.builder() .prefix(threadNamePrefix) .daemon(isDaemon) .build(); } + + public ThreadPoolInitParam(ThreadFactory threadFactory) { + this.threadFactory = threadFactory; + } } } diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/ThreadPoolBuilder.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/ThreadPoolBuilder.java index 56399fc0..1aa069c4 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/ThreadPoolBuilder.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/ThreadPoolBuilder.java @@ -57,6 +57,8 @@ public class ThreadPoolBuilder implements Builder { private String threadNamePrefix; + private ThreadFactory threadFactory; + private String threadPoolId; private TaskDecorator taskDecorator; @@ -87,6 +89,11 @@ public class ThreadPoolBuilder implements Builder { return this; } + public ThreadPoolBuilder threadFactory(ThreadFactory threadFactory) { + this.threadFactory = threadFactory; + return this; + } + public ThreadPoolBuilder threadFactory(String threadNamePrefix, Boolean isDaemon) { this.threadNamePrefix = threadNamePrefix; this.isDaemon = isDaemon; @@ -227,9 +234,13 @@ public class ThreadPoolBuilder implements Builder { } private static AbstractBuildThreadPoolTemplate.ThreadPoolInitParam buildInitParam(ThreadPoolBuilder builder) { - Assert.notEmpty(builder.threadNamePrefix, "The thread name prefix cannot be empty or an empty string."); - AbstractBuildThreadPoolTemplate.ThreadPoolInitParam initParam = - new AbstractBuildThreadPoolTemplate.ThreadPoolInitParam(builder.threadNamePrefix, builder.isDaemon); + AbstractBuildThreadPoolTemplate.ThreadPoolInitParam initParam; + if (builder.threadFactory == null) { + Assert.notEmpty(builder.threadNamePrefix, "The thread name prefix cannot be empty or an empty string."); + initParam = new AbstractBuildThreadPoolTemplate.ThreadPoolInitParam(builder.threadNamePrefix, builder.isDaemon); + } else { + initParam = new AbstractBuildThreadPoolTemplate.ThreadPoolInitParam(builder.threadFactory); + } initParam.setCorePoolNum(builder.corePoolSize) .setMaxPoolNum(builder.maxPoolSize) .setKeepAliveTime(builder.keepAliveTime) diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/adpter/DynamicThreadPoolAdapterChoose.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/adpter/DynamicThreadPoolAdapterChoose.java index c203ee5d..1b613483 100644 --- a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/adpter/DynamicThreadPoolAdapterChoose.java +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/adpter/DynamicThreadPoolAdapterChoose.java @@ -34,6 +34,7 @@ public class DynamicThreadPoolAdapterChoose { static { DYNAMIC_THREAD_POOL_ADAPTERS.add(new TransmittableThreadLocalExecutorAdapter()); DYNAMIC_THREAD_POOL_ADAPTERS.add(new TransmittableThreadLocalExecutorServiceAdapter()); + DYNAMIC_THREAD_POOL_ADAPTERS.add(new ThreadPoolTaskExecutorAdapter()); } /** diff --git a/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/adpter/ThreadPoolTaskExecutorAdapter.java b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/adpter/ThreadPoolTaskExecutorAdapter.java new file mode 100644 index 00000000..ebf655d5 --- /dev/null +++ b/hippo4j-core/src/main/java/cn/hippo4j/core/executor/support/adpter/ThreadPoolTaskExecutorAdapter.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cn.hippo4j.core.executor.support.adpter; + +import cn.hippo4j.common.toolkit.ReflectUtil; +import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; +import cn.hippo4j.core.executor.support.ThreadPoolBuilder; +import org.springframework.core.task.TaskDecorator; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.Optional; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Spring ThreadPoolTaskExecutor adapter. + */ +public class ThreadPoolTaskExecutorAdapter implements DynamicThreadPoolAdapter { + + private static final String EXECUTOR_FIELD_NAME = "threadPoolExecutor"; + + private static final String WAIT_FOR_TASKS_TO_COMPLETE_ON_SHUTDOWN = "waitForTasksToCompleteOnShutdown"; + + private static final String AWAIT_TERMINATION_MILLIS = "awaitTerminationMillis"; + + private static final String TASK_DECORATOR = "taskDecorator"; + + private static final String BEAN_NAME = "beanName"; + + private static final String QUEUE_CAPACITY = "queueCapacity"; + + @Override + public boolean match(Object executor) { + return executor instanceof ThreadPoolTaskExecutor; + } + + @Override + public DynamicThreadPoolExecutor unwrap(Object executor) { + Object unwrap = ReflectUtil.getFieldValue(executor, EXECUTOR_FIELD_NAME); + if (unwrap == null) { + return null; + } + if (!(unwrap instanceof ThreadPoolExecutor)) { + return null; + } + if (unwrap instanceof DynamicThreadPoolExecutor) { + return (DynamicThreadPoolExecutor) unwrap; + } + boolean waitForTasksToCompleteOnShutdown = (boolean)ReflectUtil.getFieldValue(executor, WAIT_FOR_TASKS_TO_COMPLETE_ON_SHUTDOWN); + long awaitTerminationMillis = (long)ReflectUtil.getFieldValue(executor, AWAIT_TERMINATION_MILLIS); + String beanName = (String)ReflectUtil.getFieldValue(executor, BEAN_NAME); + int queueCapacity = (int)ReflectUtil.getFieldValue(executor, QUEUE_CAPACITY); + + ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) unwrap; + ThreadPoolTaskExecutor threadPoolTaskExecutor = (ThreadPoolTaskExecutor) executor; + // Spring ThreadPoolTaskExecutor to DynamicThreadPoolExecutor + // ThreadPoolTaskExecutor not support executeTimeOut + ThreadPoolBuilder threadPoolBuilder = ThreadPoolBuilder.builder() + .dynamicPool() + .corePoolSize(threadPoolTaskExecutor.getCorePoolSize()) + .maxPoolNum(threadPoolTaskExecutor.getMaxPoolSize()) + .keepAliveTime(threadPoolTaskExecutor.getKeepAliveSeconds()) + .timeUnit(TimeUnit.SECONDS) + .allowCoreThreadTimeOut(threadPoolExecutor.allowsCoreThreadTimeOut()) + .waitForTasksToCompleteOnShutdown(waitForTasksToCompleteOnShutdown) + .awaitTerminationMillis(awaitTerminationMillis) + .threadFactory(threadPoolExecutor.getThreadFactory()) + //threadPoolId default beanName + .threadPoolId(beanName) + .rejected(threadPoolExecutor.getRejectedExecutionHandler()) + ; + //use new Queue + threadPoolBuilder.capacity(queueCapacity); + // .workQueue(threadPoolExecutor.getQueue()) + + Optional.ofNullable(ReflectUtil.getFieldValue(executor, TASK_DECORATOR)) + .ifPresent((taskDecorator) -> threadPoolBuilder.taskDecorator((TaskDecorator) taskDecorator)); + + return (DynamicThreadPoolExecutor) threadPoolBuilder.build(); + } + + @Override + public void replace(Object executor, Executor dynamicThreadPoolExecutor) { + ReflectUtil.setFieldValue(executor, EXECUTOR_FIELD_NAME, dynamicThreadPoolExecutor); + } +} diff --git a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/config/DynamicThreadPoolConfig.java b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/config/DynamicThreadPoolConfig.java index d76bb3d2..dc8d9596 100644 --- a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/config/DynamicThreadPoolConfig.java +++ b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/config/DynamicThreadPoolConfig.java @@ -25,10 +25,12 @@ import com.alibaba.ttl.threadpool.TtlExecutors; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; +import static cn.hippo4j.common.constant.Constants.AVAILABLE_PROCESSORS; import static cn.hippo4j.example.core.constant.GlobalTestConstant.MESSAGE_CONSUME; import static cn.hippo4j.example.core.constant.GlobalTestConstant.MESSAGE_PRODUCE; @@ -75,4 +77,20 @@ public class DynamicThreadPoolConfig { .build(); return produceExecutor; } + + @Bean + @DynamicThreadPool + public ThreadPoolTaskExecutor testThreadPoolTaskExecutor() { + ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); + threadPoolTaskExecutor.setThreadNamePrefix("66666-"); + final int maxQueueCapacity = 200; + threadPoolTaskExecutor.setCorePoolSize(AVAILABLE_PROCESSORS * 2); + threadPoolTaskExecutor.setMaxPoolSize(AVAILABLE_PROCESSORS * 4); + threadPoolTaskExecutor.setQueueCapacity(maxQueueCapacity); + threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + // + threadPoolTaskExecutor.setTaskDecorator(new TaskDecoratorTest.ContextCopyingDecorator()); + + return threadPoolTaskExecutor; + } } diff --git a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RunStateHandlerTest.java b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RunStateHandlerTest.java index e34ef463..9c00093a 100644 --- a/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RunStateHandlerTest.java +++ b/hippo4j-example/hippo4j-example-core/src/main/java/cn/hippo4j/example/core/inittest/RunStateHandlerTest.java @@ -20,6 +20,8 @@ package cn.hippo4j.example.core.inittest; import cn.hutool.core.thread.ThreadUtil; import lombok.extern.slf4j.Slf4j; import org.slf4j.MDC; +import org.springframework.core.task.TaskExecutor; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @@ -42,9 +44,12 @@ public class RunStateHandlerTest { @Resource private ThreadPoolExecutor messageProduceDynamicThreadPool; + @Resource + private ThreadPoolTaskExecutor testThreadPoolTaskExecutor; + private final ThreadPoolExecutor runStateHandlerTestExecutor = new ThreadPoolExecutor( - 3, - 3, + 4, + 4, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), @@ -63,6 +68,7 @@ public class RunStateHandlerTest { // Start the dynamic thread pool to simulate running tasks runTask(messageConsumeTtlDynamicThreadPool); runTask(messageProduceDynamicThreadPool); + runTask(testThreadPoolTaskExecutor); // Dynamically register thread pool ThreadPoolExecutor registerDynamicThreadPool = RegisterDynamicThreadPoolTest.registerDynamicThreadPool("auto-register-dynamic-thread-pool"); runTask(registerDynamicThreadPool); diff --git a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolPostProcessor.java b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolPostProcessor.java index 29443dcc..553ab31b 100644 --- a/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolPostProcessor.java +++ b/hippo4j-spring-boot/hippo4j-config-spring-boot-starter/src/main/java/cn/hippo4j/core/springboot/starter/support/DynamicThreadPoolPostProcessor.java @@ -84,7 +84,11 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { DynamicThreadPoolWrapper wrap = new DynamicThreadPoolWrapper(dynamicThreadPoolExecutor.getThreadPoolId(), dynamicThreadPoolExecutor); ThreadPoolExecutor remoteThreadPoolExecutor = fillPoolAndRegister(wrap); DynamicThreadPoolAdapterChoose.replace(bean, remoteThreadPoolExecutor); - return remoteThreadPoolExecutor; + if (DynamicThreadPoolAdapterChoose.match(bean)) { + return bean; + } else { + return remoteThreadPoolExecutor; + } } if (bean instanceof DynamicThreadPoolWrapper) { DynamicThreadPoolWrapper wrap = (DynamicThreadPoolWrapper) bean; diff --git a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java index 726dc33f..d506b4d7 100644 --- a/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java +++ b/hippo4j-spring-boot/hippo4j-spring-boot-starter/src/main/java/cn/hippo4j/springboot/starter/support/DynamicThreadPoolPostProcessor.java @@ -97,7 +97,11 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor { ThreadPoolExecutor remoteThreadPoolExecutor = fillPoolAndRegister(dynamicThreadPoolWrapper); DynamicThreadPoolAdapterChoose.replace(bean, remoteThreadPoolExecutor); subscribeConfig(dynamicThreadPoolWrapper); - return remoteThreadPoolExecutor; + if (DynamicThreadPoolAdapterChoose.match(bean)) { + return bean; + } else { + return remoteThreadPoolExecutor; + } } if (bean instanceof DynamicThreadPoolWrapper) { DynamicThreadPoolWrapper dynamicThreadPoolWrapper = (DynamicThreadPoolWrapper) bean;