Support Spring ThreadPoolTaskExecutor (#607)

* feat: Support Spring ThreadPoolTaskExecutor,close #606.

* fix: threadNamePrefix is error

* style format
pull/615/head
BigXin0109 3 years ago committed by GitHub
parent 39537488a8
commit 715aa2210e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -145,11 +145,14 @@ public class AbstractBuildThreadPoolTemplate {
private Boolean allowCoreThreadTimeOut = false; private Boolean allowCoreThreadTimeOut = false;
public ThreadPoolInitParam(String threadNamePrefix, boolean isDaemon) { public ThreadPoolInitParam(String threadNamePrefix, boolean isDaemon) {
this.threadPoolId = threadNamePrefix;
this.threadFactory = ThreadFactoryBuilder.builder() this.threadFactory = ThreadFactoryBuilder.builder()
.prefix(threadNamePrefix) .prefix(threadNamePrefix)
.daemon(isDaemon) .daemon(isDaemon)
.build(); .build();
} }
public ThreadPoolInitParam(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
} }
} }

@ -57,6 +57,8 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
private String threadNamePrefix; private String threadNamePrefix;
private ThreadFactory threadFactory;
private String threadPoolId; private String threadPoolId;
private TaskDecorator taskDecorator; private TaskDecorator taskDecorator;
@ -87,6 +89,11 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
return this; return this;
} }
public ThreadPoolBuilder threadFactory(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
return this;
}
public ThreadPoolBuilder threadFactory(String threadNamePrefix, Boolean isDaemon) { public ThreadPoolBuilder threadFactory(String threadNamePrefix, Boolean isDaemon) {
this.threadNamePrefix = threadNamePrefix; this.threadNamePrefix = threadNamePrefix;
this.isDaemon = isDaemon; this.isDaemon = isDaemon;
@ -227,9 +234,13 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
} }
private static AbstractBuildThreadPoolTemplate.ThreadPoolInitParam buildInitParam(ThreadPoolBuilder builder) { private static AbstractBuildThreadPoolTemplate.ThreadPoolInitParam buildInitParam(ThreadPoolBuilder builder) {
Assert.notEmpty(builder.threadNamePrefix, "The thread name prefix cannot be empty or an empty string."); AbstractBuildThreadPoolTemplate.ThreadPoolInitParam initParam;
AbstractBuildThreadPoolTemplate.ThreadPoolInitParam initParam = if (builder.threadFactory == null) {
new AbstractBuildThreadPoolTemplate.ThreadPoolInitParam(builder.threadNamePrefix, builder.isDaemon); Assert.notEmpty(builder.threadNamePrefix, "The thread name prefix cannot be empty or an empty string.");
initParam = new AbstractBuildThreadPoolTemplate.ThreadPoolInitParam(builder.threadNamePrefix, builder.isDaemon);
} else {
initParam = new AbstractBuildThreadPoolTemplate.ThreadPoolInitParam(builder.threadFactory);
}
initParam.setCorePoolNum(builder.corePoolSize) initParam.setCorePoolNum(builder.corePoolSize)
.setMaxPoolNum(builder.maxPoolSize) .setMaxPoolNum(builder.maxPoolSize)
.setKeepAliveTime(builder.keepAliveTime) .setKeepAliveTime(builder.keepAliveTime)

@ -34,6 +34,7 @@ public class DynamicThreadPoolAdapterChoose {
static { static {
DYNAMIC_THREAD_POOL_ADAPTERS.add(new TransmittableThreadLocalExecutorAdapter()); DYNAMIC_THREAD_POOL_ADAPTERS.add(new TransmittableThreadLocalExecutorAdapter());
DYNAMIC_THREAD_POOL_ADAPTERS.add(new TransmittableThreadLocalExecutorServiceAdapter()); DYNAMIC_THREAD_POOL_ADAPTERS.add(new TransmittableThreadLocalExecutorServiceAdapter());
DYNAMIC_THREAD_POOL_ADAPTERS.add(new ThreadPoolTaskExecutorAdapter());
} }
/** /**

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cn.hippo4j.core.executor.support.adpter;
import cn.hippo4j.common.toolkit.ReflectUtil;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import org.springframework.core.task.TaskDecorator;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Spring ThreadPoolTaskExecutor adapter.
*/
public class ThreadPoolTaskExecutorAdapter implements DynamicThreadPoolAdapter {
private static final String EXECUTOR_FIELD_NAME = "threadPoolExecutor";
private static final String WAIT_FOR_TASKS_TO_COMPLETE_ON_SHUTDOWN = "waitForTasksToCompleteOnShutdown";
private static final String AWAIT_TERMINATION_MILLIS = "awaitTerminationMillis";
private static final String TASK_DECORATOR = "taskDecorator";
private static final String BEAN_NAME = "beanName";
private static final String QUEUE_CAPACITY = "queueCapacity";
@Override
public boolean match(Object executor) {
return executor instanceof ThreadPoolTaskExecutor;
}
@Override
public DynamicThreadPoolExecutor unwrap(Object executor) {
Object unwrap = ReflectUtil.getFieldValue(executor, EXECUTOR_FIELD_NAME);
if (unwrap == null) {
return null;
}
if (!(unwrap instanceof ThreadPoolExecutor)) {
return null;
}
if (unwrap instanceof DynamicThreadPoolExecutor) {
return (DynamicThreadPoolExecutor) unwrap;
}
boolean waitForTasksToCompleteOnShutdown = (boolean) ReflectUtil.getFieldValue(executor, WAIT_FOR_TASKS_TO_COMPLETE_ON_SHUTDOWN);
long awaitTerminationMillis = (long) ReflectUtil.getFieldValue(executor, AWAIT_TERMINATION_MILLIS);
String beanName = (String) ReflectUtil.getFieldValue(executor, BEAN_NAME);
int queueCapacity = (int) ReflectUtil.getFieldValue(executor, QUEUE_CAPACITY);
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) unwrap;
ThreadPoolTaskExecutor threadPoolTaskExecutor = (ThreadPoolTaskExecutor) executor;
// Spring ThreadPoolTaskExecutor to DynamicThreadPoolExecutor
// ThreadPoolTaskExecutor not support executeTimeOut
ThreadPoolBuilder threadPoolBuilder = ThreadPoolBuilder.builder()
.dynamicPool()
.corePoolSize(threadPoolTaskExecutor.getCorePoolSize())
.maxPoolNum(threadPoolTaskExecutor.getMaxPoolSize())
.keepAliveTime(threadPoolTaskExecutor.getKeepAliveSeconds())
.timeUnit(TimeUnit.SECONDS)
.allowCoreThreadTimeOut(threadPoolExecutor.allowsCoreThreadTimeOut())
.waitForTasksToCompleteOnShutdown(waitForTasksToCompleteOnShutdown)
.awaitTerminationMillis(awaitTerminationMillis)
.threadFactory(threadPoolExecutor.getThreadFactory())
// threadPoolId default beanName
.threadPoolId(beanName)
.rejected(threadPoolExecutor.getRejectedExecutionHandler());
// use new Queue
threadPoolBuilder.capacity(queueCapacity);
// .workQueue(threadPoolExecutor.getQueue())
Optional.ofNullable(ReflectUtil.getFieldValue(executor, TASK_DECORATOR))
.ifPresent((taskDecorator) -> threadPoolBuilder.taskDecorator((TaskDecorator) taskDecorator));
return (DynamicThreadPoolExecutor) threadPoolBuilder.build();
}
@Override
public void replace(Object executor, Executor dynamicThreadPoolExecutor) {
ReflectUtil.setFieldValue(executor, EXECUTOR_FIELD_NAME, dynamicThreadPoolExecutor);
}
}

@ -25,10 +25,12 @@ import com.alibaba.ttl.threadpool.TtlExecutors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import static cn.hippo4j.common.constant.Constants.AVAILABLE_PROCESSORS;
import static cn.hippo4j.example.core.constant.GlobalTestConstant.MESSAGE_CONSUME; import static cn.hippo4j.example.core.constant.GlobalTestConstant.MESSAGE_CONSUME;
import static cn.hippo4j.example.core.constant.GlobalTestConstant.MESSAGE_PRODUCE; import static cn.hippo4j.example.core.constant.GlobalTestConstant.MESSAGE_PRODUCE;
@ -75,4 +77,20 @@ public class DynamicThreadPoolConfig {
.build(); .build();
return produceExecutor; return produceExecutor;
} }
@Bean
@DynamicThreadPool
public ThreadPoolTaskExecutor testThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setThreadNamePrefix("66666-");
final int maxQueueCapacity = 200;
threadPoolTaskExecutor.setCorePoolSize(AVAILABLE_PROCESSORS * 2);
threadPoolTaskExecutor.setMaxPoolSize(AVAILABLE_PROCESSORS * 4);
threadPoolTaskExecutor.setQueueCapacity(maxQueueCapacity);
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//
threadPoolTaskExecutor.setTaskDecorator(new TaskDecoratorTest.ContextCopyingDecorator());
return threadPoolTaskExecutor;
}
} }

@ -20,6 +20,8 @@ package cn.hippo4j.example.core.inittest;
import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC; import org.slf4j.MDC;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
@ -42,9 +44,12 @@ public class RunStateHandlerTest {
@Resource @Resource
private ThreadPoolExecutor messageProduceDynamicThreadPool; private ThreadPoolExecutor messageProduceDynamicThreadPool;
@Resource
private ThreadPoolTaskExecutor testThreadPoolTaskExecutor;
private final ThreadPoolExecutor runStateHandlerTestExecutor = new ThreadPoolExecutor( private final ThreadPoolExecutor runStateHandlerTestExecutor = new ThreadPoolExecutor(
3, 4,
3, 4,
0L, 0L,
TimeUnit.MILLISECONDS, TimeUnit.MILLISECONDS,
new SynchronousQueue<>(), new SynchronousQueue<>(),
@ -63,6 +68,7 @@ public class RunStateHandlerTest {
// Start the dynamic thread pool to simulate running tasks // Start the dynamic thread pool to simulate running tasks
runTask(messageConsumeTtlDynamicThreadPool); runTask(messageConsumeTtlDynamicThreadPool);
runTask(messageProduceDynamicThreadPool); runTask(messageProduceDynamicThreadPool);
runTask(testThreadPoolTaskExecutor);
// Dynamically register thread pool // Dynamically register thread pool
ThreadPoolExecutor registerDynamicThreadPool = RegisterDynamicThreadPoolTest.registerDynamicThreadPool("auto-register-dynamic-thread-pool"); ThreadPoolExecutor registerDynamicThreadPool = RegisterDynamicThreadPoolTest.registerDynamicThreadPool("auto-register-dynamic-thread-pool");
runTask(registerDynamicThreadPool); runTask(registerDynamicThreadPool);

@ -84,7 +84,11 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
DynamicThreadPoolWrapper wrap = new DynamicThreadPoolWrapper(dynamicThreadPoolExecutor.getThreadPoolId(), dynamicThreadPoolExecutor); DynamicThreadPoolWrapper wrap = new DynamicThreadPoolWrapper(dynamicThreadPoolExecutor.getThreadPoolId(), dynamicThreadPoolExecutor);
ThreadPoolExecutor remoteThreadPoolExecutor = fillPoolAndRegister(wrap); ThreadPoolExecutor remoteThreadPoolExecutor = fillPoolAndRegister(wrap);
DynamicThreadPoolAdapterChoose.replace(bean, remoteThreadPoolExecutor); DynamicThreadPoolAdapterChoose.replace(bean, remoteThreadPoolExecutor);
return remoteThreadPoolExecutor; if (DynamicThreadPoolAdapterChoose.match(bean)) {
return bean;
} else {
return remoteThreadPoolExecutor;
}
} }
if (bean instanceof DynamicThreadPoolWrapper) { if (bean instanceof DynamicThreadPoolWrapper) {
DynamicThreadPoolWrapper wrap = (DynamicThreadPoolWrapper) bean; DynamicThreadPoolWrapper wrap = (DynamicThreadPoolWrapper) bean;

@ -97,7 +97,11 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
ThreadPoolExecutor remoteThreadPoolExecutor = fillPoolAndRegister(dynamicThreadPoolWrapper); ThreadPoolExecutor remoteThreadPoolExecutor = fillPoolAndRegister(dynamicThreadPoolWrapper);
DynamicThreadPoolAdapterChoose.replace(bean, remoteThreadPoolExecutor); DynamicThreadPoolAdapterChoose.replace(bean, remoteThreadPoolExecutor);
subscribeConfig(dynamicThreadPoolWrapper); subscribeConfig(dynamicThreadPoolWrapper);
return remoteThreadPoolExecutor; if (DynamicThreadPoolAdapterChoose.match(bean)) {
return bean;
} else {
return remoteThreadPoolExecutor;
}
} }
if (bean instanceof DynamicThreadPoolWrapper) { if (bean instanceof DynamicThreadPoolWrapper) {
DynamicThreadPoolWrapper dynamicThreadPoolWrapper = (DynamicThreadPoolWrapper) bean; DynamicThreadPoolWrapper dynamicThreadPoolWrapper = (DynamicThreadPoolWrapper) bean;
@ -123,6 +127,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
*/ */
protected ThreadPoolExecutor fillPoolAndRegister(DynamicThreadPoolWrapper dynamicThreadPoolWrapper) { protected ThreadPoolExecutor fillPoolAndRegister(DynamicThreadPoolWrapper dynamicThreadPoolWrapper) {
String threadPoolId = dynamicThreadPoolWrapper.getThreadPoolId(); String threadPoolId = dynamicThreadPoolWrapper.getThreadPoolId();
ThreadPoolExecutor executor = dynamicThreadPoolWrapper.getExecutor();
Map<String, String> queryStrMap = new HashMap(3); Map<String, String> queryStrMap = new HashMap(3);
queryStrMap.put(TP_ID, threadPoolId); queryStrMap.put(TP_ID, threadPoolId);
queryStrMap.put(ITEM_ID, properties.getItemId()); queryStrMap.put(ITEM_ID, properties.getItemId());
@ -140,7 +145,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
newDynamicThreadPoolExecutor = ThreadPoolBuilder.builder() newDynamicThreadPoolExecutor = ThreadPoolBuilder.builder()
.dynamicPool() .dynamicPool()
.workQueue(workQueue) .workQueue(workQueue)
.threadFactory(threadPoolId) .threadFactory(executor.getThreadFactory())
.poolThreadSize(threadPoolParameterInfo.corePoolSizeAdapt(), threadPoolParameterInfo.maximumPoolSizeAdapt()) .poolThreadSize(threadPoolParameterInfo.corePoolSizeAdapt(), threadPoolParameterInfo.maximumPoolSizeAdapt())
.keepAliveTime(threadPoolParameterInfo.getKeepAliveTime(), TimeUnit.SECONDS) .keepAliveTime(threadPoolParameterInfo.getKeepAliveTime(), TimeUnit.SECONDS)
.rejected(RejectedPolicyTypeEnum.createPolicy(threadPoolParameterInfo.getRejectedType())) .rejected(RejectedPolicyTypeEnum.createPolicy(threadPoolParameterInfo.getRejectedType()))
@ -148,7 +153,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
.build(); .build();
// Set dynamic thread pool enhancement parameters. // Set dynamic thread pool enhancement parameters.
ThreadPoolExecutor customDynamicThreadPool; ThreadPoolExecutor customDynamicThreadPool;
if ((customDynamicThreadPool = dynamicThreadPoolWrapper.getExecutor()) instanceof AbstractDynamicExecutorSupport) { if ((customDynamicThreadPool = executor) instanceof AbstractDynamicExecutorSupport) {
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm( ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(
BooleanUtil.toBoolean(threadPoolParameterInfo.getIsAlarm().toString()), BooleanUtil.toBoolean(threadPoolParameterInfo.getIsAlarm().toString()),
threadPoolParameterInfo.getCapacityAlarm(), threadPoolParameterInfo.getCapacityAlarm(),
@ -168,11 +173,11 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
} }
} }
} catch (Exception ex) { } catch (Exception ex) {
newDynamicThreadPoolExecutor = dynamicThreadPoolWrapper.getExecutor() != null ? dynamicThreadPoolWrapper.getExecutor() : CommonDynamicThreadPool.getInstance(threadPoolId); newDynamicThreadPoolExecutor = executor != null ? executor : CommonDynamicThreadPool.getInstance(threadPoolId);
dynamicThreadPoolWrapper.setExecutor(newDynamicThreadPoolExecutor); dynamicThreadPoolWrapper.setExecutor(newDynamicThreadPoolExecutor);
log.error("Failed to initialize thread pool configuration. error message: {}", ex.getMessage()); log.error("Failed to initialize thread pool configuration. error message: {}", ex.getMessage());
} finally { } finally {
if (Objects.isNull(dynamicThreadPoolWrapper.getExecutor())) { if (Objects.isNull(executor)) {
dynamicThreadPoolWrapper.setExecutor(CommonDynamicThreadPool.getInstance(threadPoolId)); dynamicThreadPoolWrapper.setExecutor(CommonDynamicThreadPool.getInstance(threadPoolId));
} }
// Set whether to subscribe to the remote thread pool configuration. // Set whether to subscribe to the remote thread pool configuration.

Loading…
Cancel
Save