动态线程池功能增强.

pull/12/head
chen.ma 3 years ago
parent 5f68ec2e0c
commit 7f4763ff51

@ -0,0 +1,147 @@
package cn.hippo4j.starter.core;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import java.util.concurrent.*;
/**
* Dynamic executor configuration support.
*
* @author chen.ma
* @date 2021/11/28 12:17
*/
@Slf4j
public abstract class DynamicExecutorConfigurationSupport extends ThreadPoolExecutor
implements BeanNameAware, InitializingBean, DisposableBean {
private String beanName;
private ExecutorService executor;
private long awaitTerminationMillis = 0;
private boolean waitForTasksToCompleteOnShutdown = false;
public DynamicExecutorConfigurationSupport(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
boolean waitForTasksToCompleteOnShutdown,
long awaitTerminationMillis,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.waitForTasksToCompleteOnShutdown = waitForTasksToCompleteOnShutdown;
this.awaitTerminationMillis = awaitTerminationMillis;
}
/**
* Create the target {@link java.util.concurrent.ExecutorService} instance.
* Called by {@code afterPropertiesSet}.
*
* @return a new ExecutorService instance
* @see #afterPropertiesSet()
*/
protected abstract ExecutorService initializeExecutor();
@Override
public void setBeanName(String name) {
this.beanName = name;
}
/**
* Calls {@code initialize()} after the container applied all property values.
*
* @see #initialize()
*/
@Override
public void afterPropertiesSet() {
initialize();
}
/**
* Calls {@code shutdown} when the BeanFactory destroys.
* the task executor instance.
*
* @see #shutdown()
*/
@Override
public void destroy() {
shutdownSupport();
}
/**
* Set up the ExecutorService.
*/
public void initialize() {
if (log.isInfoEnabled()) {
log.info("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
}
this.executor = initializeExecutor();
}
/**
* Perform a shutdown on the underlying ExecutorService.
*
* @see java.util.concurrent.ExecutorService#shutdown()
* @see java.util.concurrent.ExecutorService#shutdownNow()
*/
public void shutdownSupport() {
if (log.isInfoEnabled()) {
log.info("Shutting down ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
}
if (this.executor != null) {
if (this.waitForTasksToCompleteOnShutdown) {
this.executor.shutdown();
} else {
for (Runnable remainingTask : this.executor.shutdownNow()) {
cancelRemainingTask(remainingTask);
}
}
awaitTerminationIfNecessary(this.executor);
}
}
/**
* Cancel the given remaining task which never commended execution,
* as returned from {@link ExecutorService#shutdownNow()}.
*
* @param task the task to cancel (typically a {@link RunnableFuture})
* @see #shutdown()
* @see RunnableFuture#cancel(boolean)
* @since 5.0.5
*/
protected void cancelRemainingTask(Runnable task) {
if (task instanceof Future) {
((Future<?>) task).cancel(true);
}
}
/**
* Wait for the executor to terminate, according to the value of the.
*/
private void awaitTerminationIfNecessary(ExecutorService executor) {
if (this.awaitTerminationMillis > 0) {
try {
if (!executor.awaitTermination(this.awaitTerminationMillis, TimeUnit.MILLISECONDS)) {
if (log.isWarnEnabled()) {
log.warn("Timed out while waiting for executor" +
(this.beanName != null ? " '" + this.beanName + "'" : "") + " to terminate");
}
}
} catch (InterruptedException ex) {
if (log.isWarnEnabled()) {
log.warn("Interrupted while waiting for executor" +
(this.beanName != null ? " '" + this.beanName + "'" : "") + " to terminate");
}
Thread.currentThread().interrupt();
}
}
}
}

@ -5,6 +5,7 @@ import cn.hippo4j.starter.alarm.ThreadPoolAlarmManage;
import cn.hippo4j.starter.event.EventExecutor;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import org.springframework.core.task.TaskDecorator;
import java.security.AccessControlContext;
import java.security.AccessController;
@ -24,7 +25,7 @@ import static cn.hippo4j.common.constant.Constants.MAP_INITIAL_CAPACITY;
* @author chen.ma
* @date 2021/7/8 21:47
*/
public class DynamicThreadPoolExecutor extends ThreadPoolExecutor {
public class DynamicThreadPoolExecutor extends DynamicExecutorConfigurationSupport {
private final AtomicInteger rejectCount = new AtomicInteger();
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
@ -38,6 +39,7 @@ public class DynamicThreadPoolExecutor extends ThreadPoolExecutor {
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
private TaskDecorator taskDecorator;
private final BlockingQueue<Runnable> workQueue;
private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet();
@ -63,12 +65,14 @@ public class DynamicThreadPoolExecutor extends ThreadPoolExecutor {
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
boolean waitForTasksToCompleteOnShutdown,
long awaitTerminationMillis,
@NonNull BlockingQueue<Runnable> workQueue,
@NonNull String threadPoolId,
@NonNull ThreadFactory threadFactory,
@NonNull ThreadPoolAlarm threadPoolAlarm,
@NonNull RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, waitForTasksToCompleteOnShutdown, awaitTerminationMillis, workQueue, threadFactory, handler);
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
@ -533,6 +537,9 @@ public class DynamicThreadPoolExecutor extends ThreadPoolExecutor {
@Override
public void execute(@NonNull Runnable command) {
if (taskDecorator != null) {
command = taskDecorator.decorate(command);
}
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) {
@ -555,6 +562,11 @@ public class DynamicThreadPoolExecutor extends ThreadPoolExecutor {
}
}
@Override
protected ExecutorService initializeExecutor() {
return this;
}
@Override
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
@ -688,6 +700,14 @@ public class DynamicThreadPoolExecutor extends ThreadPoolExecutor {
addWorker(null, true);
}
public TaskDecorator getTaskDecorator() {
return taskDecorator;
}
public void setTaskDecorator(TaskDecorator taskDecorator) {
this.taskDecorator = taskDecorator;
}
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize) {

@ -1,5 +1,9 @@
package cn.hippo4j.starter.core;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.constant.Constants;
import cn.hippo4j.common.model.PoolParameterInfo;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.starter.common.CommonDynamicThreadPool;
import cn.hippo4j.starter.config.BootstrapProperties;
import cn.hippo4j.starter.remote.HttpAgent;
@ -8,15 +12,12 @@ import cn.hippo4j.starter.toolkit.thread.RejectedTypeEnum;
import cn.hippo4j.starter.toolkit.thread.ThreadPoolBuilder;
import cn.hippo4j.starter.wrapper.DynamicThreadPoolWrapper;
import com.alibaba.fastjson.JSON;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.constant.Constants;
import cn.hippo4j.common.model.PoolParameterInfo;
import cn.hippo4j.common.web.base.Result;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import lombok.var;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.task.TaskDecorator;
import java.util.HashMap;
import java.util.Map;
@ -106,7 +107,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
if (result.isSuccess() && result.getData() != null && (ppi = JSON.toJavaObject((JSON) result.getData(), PoolParameterInfo.class)) != null) {
// 使用相关参数创建线程池
BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(ppi.getQueueType(), ppi.getCapacity());
poolExecutor = (DynamicThreadPoolExecutor) ThreadPoolBuilder.builder()
poolExecutor = ThreadPoolBuilder.builder()
.dynamicPool()
.workQueue(workQueue)
.threadFactory(tpId)
@ -116,6 +117,11 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
.alarmConfig(ppi.getIsAlarm(), ppi.getCapacityAlarm(), ppi.getLivenessAlarm())
.build();
if (poolExecutor instanceof DynamicExecutorConfigurationSupport) {
TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) dynamicThreadPoolWrap.getExecutor()).getTaskDecorator();
((DynamicThreadPoolExecutor) poolExecutor).setTaskDecorator(taskDecorator);
}
dynamicThreadPoolWrap.setExecutor(poolExecutor);
isSubscribe = true;
}

@ -6,6 +6,7 @@ import cn.hippo4j.starter.core.DynamicThreadPoolExecutor;
import lombok.Data;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.task.TaskDecorator;
import java.util.concurrent.*;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
@ -101,11 +102,15 @@ public class AbstractBuildThreadPoolTemplate {
initParam.getMaxPoolNum(),
initParam.getKeepAliveTime(),
initParam.getTimeUnit(),
initParam.getWaitForTasksToCompleteOnShutdown(),
initParam.getAwaitTerminationMillis(),
initParam.getWorkQueue(),
initParam.getThreadPoolId(),
initParam.getThreadFactory(),
initParam.getThreadPoolAlarm(),
initParam.getRejectedExecutionHandler());
executorService.setTaskDecorator(initParam.getTaskDecorator());
return executorService;
}
@ -163,6 +168,21 @@ public class AbstractBuildThreadPoolTemplate {
*/
private ThreadPoolAlarm threadPoolAlarm;
/**
* 线
*/
private TaskDecorator taskDecorator;
/**
*
*/
private Long awaitTerminationMillis;
/**
*
*/
private Boolean waitForTasksToCompleteOnShutdown;
public ThreadPoolInitParam(String threadNamePrefix, boolean isDaemon) {
this.threadPoolId = threadNamePrefix;
this.threadFactory = ThreadFactoryBuilder.builder()

@ -3,6 +3,7 @@ package cn.hippo4j.starter.toolkit.thread;
import cn.hippo4j.common.toolkit.Assert;
import cn.hippo4j.starter.alarm.ThreadPoolAlarm;
import org.springframework.core.task.TaskDecorator;
import java.math.BigDecimal;
import java.util.Optional;
@ -96,6 +97,21 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
*/
private Integer livenessAlarm;
/**
* 线
*/
private TaskDecorator taskDecorator;
/**
*
*/
private Long awaitTerminationMillis = 0L;
/**
*
*/
private Boolean waitForTasksToCompleteOnShutdown = false;
/**
* CPU / (1 - 0.8)
*
@ -197,6 +213,27 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
return this;
}
public ThreadPoolBuilder taskDecorator(TaskDecorator taskDecorator) {
this.taskDecorator = taskDecorator;
return this;
}
public ThreadPoolBuilder awaitTerminationMillis(long awaitTerminationMillis) {
this.awaitTerminationMillis = awaitTerminationMillis;
return this;
}
public ThreadPoolBuilder waitForTasksToCompleteOnShutdown(boolean waitForTasksToCompleteOnShutdown) {
this.waitForTasksToCompleteOnShutdown = waitForTasksToCompleteOnShutdown;
return this;
}
public ThreadPoolBuilder dynamicSupport(boolean waitForTasksToCompleteOnShutdown, long awaitTerminationMillis) {
this.awaitTerminationMillis = awaitTerminationMillis;
this.waitForTasksToCompleteOnShutdown = waitForTasksToCompleteOnShutdown;
return this;
}
/**
*
*
@ -265,13 +302,16 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
.setKeepAliveTime(builder.keepAliveTime)
.setCapacity(builder.capacity)
.setRejectedExecutionHandler(builder.rejectedExecutionHandler)
.setTimeUnit(builder.timeUnit);
.setTimeUnit(builder.timeUnit)
.setTaskDecorator(builder.taskDecorator);
if (builder.isDynamicPool) {
String threadPoolId = Optional.ofNullable(builder.threadPoolId).orElse(builder.threadNamePrefix);
initParam.setThreadPoolId(threadPoolId);
ThreadPoolAlarm threadPoolAlarm = new ThreadPoolAlarm(builder.isAlarm, builder.capacityAlarm, builder.livenessAlarm);
initParam.setThreadPoolAlarm(threadPoolAlarm);
initParam.setWaitForTasksToCompleteOnShutdown(builder.waitForTasksToCompleteOnShutdown);
initParam.setAwaitTerminationMillis(builder.awaitTerminationMillis);
}
if (!builder.isFastPool) {

@ -1,7 +1,9 @@
package cn.hippo4j.starter.wrapper;
import cn.hippo4j.starter.common.CommonDynamicThreadPool;
import cn.hippo4j.starter.core.DynamicExecutorConfigurationSupport;
import lombok.Data;
import org.springframework.beans.factory.DisposableBean;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
@ -14,7 +16,7 @@ import java.util.concurrent.ThreadPoolExecutor;
* @date 2021/6/20 16:55
*/
@Data
public class DynamicThreadPoolWrapper {
public class DynamicThreadPoolWrapper implements DisposableBean {
private String tenantId;
@ -76,4 +78,11 @@ public class DynamicThreadPoolWrapper {
return executor.submit(task);
}
@Override
public void destroy() throws Exception {
if (executor != null && executor instanceof DynamicExecutorConfigurationSupport) {
((DynamicExecutorConfigurationSupport) executor).destroy();
}
}
}

Loading…
Cancel
Save