refactor: Deprecate AbstractDynamicExecutorSupport and make DynamicThreadPoolExecutor extend ExtensibleThreadPoolExecutor (#815)

pull/852/head
huangchengxing 3 years ago
parent b1bf7292bd
commit d049ebc111

@ -17,107 +17,231 @@
package cn.hippo4j.core.executor;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.core.proxy.RejectedProxyUtil;
import cn.hippo4j.core.toolkit.SystemClock;
import lombok.Getter;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.core.plugin.impl.TaskDecoratorPlugin;
import cn.hippo4j.core.plugin.impl.TaskRejectCountRecordPlugin;
import cn.hippo4j.core.plugin.impl.TaskTimeoutNotifyAlarmPlugin;
import cn.hippo4j.core.plugin.impl.ThreadPoolExecutorShutdownPlugin;
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager;
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginRegistrar;
import lombok.NonNull;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.core.task.TaskDecorator;
import java.util.concurrent.*;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* Enhanced dynamic and monitored thread pool.
*/
public class DynamicThreadPoolExecutor extends AbstractDynamicExecutorSupport {
@Getter
@Setter
private Long executeTimeOut;
@Getter
@Setter
private TaskDecorator taskDecorator;
@Getter
@Setter
private RejectedExecutionHandler redundancyHandler;
@Getter
private final String threadPoolId;
@Getter
private final AtomicLong rejectCount = new AtomicLong();
private final ThreadLocal<Long> startTimeThreadLocal = new ThreadLocal<>();
public DynamicThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
long executeTimeOut,
boolean waitForTasksToCompleteOnShutdown,
long awaitTerminationMillis,
@Slf4j
public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor implements DisposableBean {
/**
* Creates a new {@code DynamicThreadPoolExecutor} with the given initial parameters.
*
* @param threadPoolId thread-pool id
* @param executeTimeOut execute time out
* @param waitForTasksToCompleteOnShutdown wait for tasks to complete on shutdown
* @param awaitTerminationMillis await termination millis
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param blockingQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor creates a new thread
* @param rejectedExecutionHandler the handler to use when execution is blocked because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public DynamicThreadPoolExecutor(
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
long executeTimeOut, boolean waitForTasksToCompleteOnShutdown, long awaitTerminationMillis,
@NonNull BlockingQueue<Runnable> blockingQueue,
@NonNull String threadPoolId,
@NonNull ThreadFactory threadFactory,
@NonNull RejectedExecutionHandler rejectedExecutionHandler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, waitForTasksToCompleteOnShutdown, awaitTerminationMillis, blockingQueue, threadPoolId, threadFactory, rejectedExecutionHandler);
this.threadPoolId = threadPoolId;
this.executeTimeOut = executeTimeOut;
// Number of dynamic proxy denial policies.
RejectedExecutionHandler rejectedProxy = RejectedProxyUtil.createProxy(rejectedExecutionHandler, threadPoolId, rejectCount);
setRejectedExecutionHandler(rejectedProxy);
// Redundant fields to avoid reflecting the acquired fields when sending change information.
redundancyHandler = rejectedExecutionHandler;
super(
threadPoolId, new DefaultThreadPoolPluginManager(),
corePoolSize, maximumPoolSize, keepAliveTime, unit,
blockingQueue, threadFactory, rejectedExecutionHandler);
log.info("Initializing ExecutorService" + threadPoolId);
// init default plugins
new DefaultThreadPoolPluginRegistrar(executeTimeOut, awaitTerminationMillis, waitForTasksToCompleteOnShutdown)
.doRegister(this);
}
/**
* Invoked by the containing {@code BeanFactory} on destruction of a bean.
*
*/
@Override
public void execute(@NonNull Runnable command) {
if (taskDecorator != null) {
command = taskDecorator.decorate(command);
public void destroy() {
if (isWaitForTasksToCompleteOnShutdown()) {
super.shutdown();
} else {
super.shutdownNow();
}
super.execute(command);
getThreadPoolPluginManager().clear();
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
if (executeTimeOut == null || executeTimeOut <= 0) {
return;
}
startTimeThreadLocal.set(SystemClock.now());
/**
* Get await termination millis.
*
* @return await termination millis.
* @deprecated use {@link ThreadPoolExecutorShutdownPlugin}
*/
@Deprecated
public long getAwaitTerminationMillis() {
return getPluginOfType(ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME, ThreadPoolExecutorShutdownPlugin.class)
.map(ThreadPoolExecutorShutdownPlugin::getAwaitTerminationMillis)
.orElse(-1L);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
Long startTime;
if ((startTime = startTimeThreadLocal.get()) == null) {
return;
}
try {
long endTime = SystemClock.now();
long executeTime;
boolean executeTimeAlarm = (executeTime = (endTime - startTime)) > executeTimeOut;
if (executeTimeAlarm && ApplicationContextHolder.getInstance() != null) {
ThreadPoolNotifyAlarmHandler notifyAlarmHandler = ApplicationContextHolder.getBean(ThreadPoolNotifyAlarmHandler.class);
if (notifyAlarmHandler != null) {
notifyAlarmHandler.asyncSendExecuteTimeOutAlarm(threadPoolId, executeTime, executeTimeOut, this);
}
}
} finally {
startTimeThreadLocal.remove();
}
/**
* Is wait for tasks to complete on shutdown.
*
* @return true if instance wait for tasks to complete on shutdown, false other otherwise.
* @deprecated use {@link ThreadPoolExecutorShutdownPlugin}
*/
@Deprecated
public boolean isWaitForTasksToCompleteOnShutdown() {
return getPluginOfType(ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME, ThreadPoolExecutorShutdownPlugin.class)
.map(ThreadPoolExecutorShutdownPlugin::isWaitForTasksToCompleteOnShutdown)
.orElse(false);
}
@Override
protected ExecutorService initializeExecutor() {
return this;
/**
* Set support param.
*
* @param awaitTerminationMillis await termination millis
* @param waitForTasksToCompleteOnShutdown wait for tasks to complete on shutdown
* @deprecated use {@link ThreadPoolExecutorShutdownPlugin}
*/
@Deprecated
public void setSupportParam(long awaitTerminationMillis, boolean waitForTasksToCompleteOnShutdown) {
getPluginOfType(ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME, ThreadPoolExecutorShutdownPlugin.class)
.ifPresent(processor -> processor
.setAwaitTerminationMillis(awaitTerminationMillis)
.setWaitForTasksToCompleteOnShutdown(waitForTasksToCompleteOnShutdown));
}
/**
* Get reject count num.
*
* @return reject count num
* @deprecated use {@link TaskRejectCountRecordPlugin}
*/
@Deprecated
public Long getRejectCountNum() {
return rejectCount.get();
return getPluginOfType(TaskRejectCountRecordPlugin.PLUGIN_NAME, TaskRejectCountRecordPlugin.class)
.map(TaskRejectCountRecordPlugin::getRejectCountNum)
.orElse(-1L);
}
/**
* Get reject count.
*
* @return reject count num
* @deprecated use {@link TaskRejectCountRecordPlugin}
*/
@Deprecated
public AtomicLong getRejectCount() {
return getPluginOfType(TaskRejectCountRecordPlugin.PLUGIN_NAME, TaskRejectCountRecordPlugin.class)
.map(TaskRejectCountRecordPlugin::getRejectCount)
.orElse(new AtomicLong(0));
}
/**
* Get execute time out.
*
* @deprecated use {@link TaskTimeoutNotifyAlarmPlugin}
*/
@Deprecated
public Long getExecuteTimeOut() {
return getPluginOfType(TaskTimeoutNotifyAlarmPlugin.PLUGIN_NAME, TaskTimeoutNotifyAlarmPlugin.class)
.map(TaskTimeoutNotifyAlarmPlugin::getExecuteTimeOut)
.orElse(-1L);
}
/**
* Set execute time out.
*
* @param executeTimeOut execute time out
* @deprecated use {@link TaskTimeoutNotifyAlarmPlugin}
*/
@Deprecated
public void setExecuteTimeOut(Long executeTimeOut) {
getPluginOfType(TaskTimeoutNotifyAlarmPlugin.PLUGIN_NAME, TaskTimeoutNotifyAlarmPlugin.class)
.ifPresent(processor -> processor.setExecuteTimeOut(executeTimeOut));
}
/**
* Get {@link TaskDecorator}.
*
* @deprecated use {@link TaskDecoratorPlugin}
*/
@Deprecated
public TaskDecorator getTaskDecorator() {
return getPluginOfType(TaskDecoratorPlugin.PLUGIN_NAME, TaskDecoratorPlugin.class)
.map(processor -> CollectionUtil.getFirst(processor.getDecorators()))
.orElse(null);
}
/**
* Set {@link TaskDecorator}.
*
* @param taskDecorator task decorator
* @deprecated use {@link TaskDecoratorPlugin}
*/
@Deprecated
public void setTaskDecorator(TaskDecorator taskDecorator) {
getPluginOfType(TaskDecoratorPlugin.PLUGIN_NAME, TaskDecoratorPlugin.class)
.ifPresent(processor -> {
if (Objects.nonNull(taskDecorator)) {
processor.clearDecorators();
processor.addDecorator(taskDecorator);
}
});
}
/**
* Get rejected execution handler.
*
* @deprecated use {@link DynamicThreadPoolExecutor#getRejectedExecutionHandler}
*/
@Deprecated
public RejectedExecutionHandler getRedundancyHandler() {
return getRejectedExecutionHandler();
}
/**
* Set rejected execution handler.
*
* @param handler handler
* @deprecated use {@link DynamicThreadPoolExecutor#setRejectedExecutionHandler}
*/
@Deprecated
public void setRedundancyHandler(RejectedExecutionHandler handler) {
setRejectedExecutionHandler(handler);
}
}

@ -17,7 +17,6 @@
package cn.hippo4j.core.executor;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.core.executor.support.CommonDynamicThreadPool;
import lombok.AllArgsConstructor;
import lombok.Builder;
@ -67,8 +66,8 @@ public class DynamicThreadPoolWrapper implements DisposableBean {
@Override
public void destroy() throws Exception {
if (executor != null && executor instanceof AbstractDynamicExecutorSupport) {
((AbstractDynamicExecutorSupport) executor).destroy();
if (executor != null && executor instanceof DynamicThreadPoolExecutor) {
((DynamicThreadPoolExecutor) executor).destroy();
}
}
}

@ -22,13 +22,13 @@ import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import cn.hippo4j.core.toolkit.IdentifyUtil;
import cn.hippo4j.core.toolkit.ExecutorTraceContextUtil;
import cn.hippo4j.message.service.Hippo4jSendMessageService;
import cn.hippo4j.core.toolkit.IdentifyUtil;
import cn.hippo4j.message.enums.NotifyTypeEnum;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import cn.hippo4j.message.request.AlarmNotifyRequest;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.message.service.Hippo4jSendMessageService;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -211,9 +211,7 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
*/
public AlarmNotifyRequest buildAlarmNotifyRequest(ThreadPoolExecutor threadPoolExecutor) {
BlockingQueue<Runnable> blockingQueue = threadPoolExecutor.getQueue();
RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor instanceof DynamicThreadPoolExecutor
? ((DynamicThreadPoolExecutor) threadPoolExecutor).getRedundancyHandler()
: threadPoolExecutor.getRejectedExecutionHandler();
RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler();
long rejectCount = threadPoolExecutor instanceof DynamicThreadPoolExecutor
? ((DynamicThreadPoolExecutor) threadPoolExecutor).getRejectCountNum()
: -1L;

@ -23,10 +23,8 @@ import cn.hippo4j.common.toolkit.BeanUtil;
import cn.hippo4j.common.toolkit.ByteConvertUtil;
import cn.hippo4j.common.toolkit.MemoryUtil;
import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.core.toolkit.inet.InetUtils;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -66,11 +64,7 @@ public class ThreadPoolRunStateHandler extends AbstractThreadPoolRuntime {
DynamicThreadPoolWrapper executorService = GlobalThreadPoolManage.getExecutorService(threadPoolId);
ThreadPoolExecutor pool = executorService.getExecutor();
String rejectedName;
if (pool instanceof AbstractDynamicExecutorSupport) {
rejectedName = ((DynamicThreadPoolExecutor) pool).getRedundancyHandler().getClass().getSimpleName();
} else {
rejectedName = pool.getRejectedExecutionHandler().getClass().getSimpleName();
}
rejectedName = pool.getRejectedExecutionHandler().getClass().getSimpleName();
poolRunStateInfo.setRejectedName(rejectedName);
ManyThreadPoolRunStateInfo manyThreadPoolRunStateInfo = BeanUtil.convert(poolRunStateInfo, ManyThreadPoolRunStateInfo.class);
manyThreadPoolRunStateInfo.setIdentify(CLIENT_IDENTIFICATION_VALUE);

@ -17,6 +17,7 @@
package cn.hippo4j.core.executor.support;
import cn.hippo4j.core.plugin.impl.ThreadPoolExecutorShutdownPlugin;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
@ -25,7 +26,10 @@ import java.util.concurrent.*;
/**
* Dynamic executor configuration support.
*
* @deprecated use {@link ThreadPoolExecutorShutdownPlugin} to get thread-pool shutdown support
*/
@Deprecated
@Slf4j
public abstract class AbstractDynamicExecutorSupport extends ThreadPoolExecutor implements InitializingBean, DisposableBean {

@ -29,8 +29,6 @@ import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.core.proxy.RejectedProxyUtil;
import cn.hippo4j.message.dto.NotifyConfigDTO;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.message.service.Hippo4jBaseSendMessageService;
@ -43,7 +41,6 @@ import java.util.*;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT;
@ -182,6 +179,7 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener<Ex
Boolean isAlarm = executorProperties.getAlarm();
Integer activeAlarm = executorProperties.getActiveAlarm();
Integer capacityAlarm = executorProperties.getCapacityAlarm();
// FIXME Compare using Objects.equals
if ((isAlarm != null && isAlarm != threadPoolNotifyAlarm.getAlarm())
|| (activeAlarm != null && activeAlarm != threadPoolNotifyAlarm.getActiveAlarm())
|| (capacityAlarm != null && capacityAlarm != threadPoolNotifyAlarm.getCapacityAlarm())) {
@ -249,18 +247,12 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener<Ex
executor.allowCoreThreadTimeOut(properties.getAllowCoreThreadTimeOut());
}
if (properties.getExecuteTimeOut() != null && !Objects.equals(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut())) {
if (executor instanceof AbstractDynamicExecutorSupport) {
if (executor instanceof DynamicThreadPoolExecutor) {
((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(properties.getExecuteTimeOut());
}
}
if (properties.getRejectedHandler() != null && !Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler())) {
RejectedExecutionHandler rejectedExecutionHandler = RejectedPolicyTypeEnum.createPolicy(properties.getRejectedHandler());
if (executor instanceof AbstractDynamicExecutorSupport) {
DynamicThreadPoolExecutor dynamicExecutor = (DynamicThreadPoolExecutor) executor;
dynamicExecutor.setRedundancyHandler(rejectedExecutionHandler);
AtomicLong rejectCount = dynamicExecutor.getRejectCount();
rejectedExecutionHandler = RejectedProxyUtil.createProxy(rejectedExecutionHandler, threadPoolId, rejectCount);
}
executor.setRejectedExecutionHandler(rejectedExecutionHandler);
}
if (properties.getKeepAliveTime() != null && !Objects.equals(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime())) {

@ -123,8 +123,8 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
DynamicThreadPoolExecutor actualDynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) dynamicThreadPoolWrapper.getExecutor();
TaskDecorator taskDecorator = actualDynamicThreadPoolExecutor.getTaskDecorator();
((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskDecorator(taskDecorator);
long awaitTerminationMillis = actualDynamicThreadPoolExecutor.awaitTerminationMillis;
boolean waitForTasksToCompleteOnShutdown = actualDynamicThreadPoolExecutor.waitForTasksToCompleteOnShutdown;
long awaitTerminationMillis = actualDynamicThreadPoolExecutor.getAwaitTerminationMillis();
boolean waitForTasksToCompleteOnShutdown = actualDynamicThreadPoolExecutor.isWaitForTasksToCompleteOnShutdown();
((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown);
dynamicThreadPoolWrapper.setExecutor(newDynamicPoolExecutor);
}
@ -155,7 +155,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
.setBlockingQueue(queueType)
.setExecuteTimeOut(10000L)
.setQueueCapacity(queueCapacity)
.setRejectedHandler(((DynamicThreadPoolExecutor) executor).getRedundancyHandler().getClass().getSimpleName())
.setRejectedHandler(executor.getRejectedExecutionHandler().getClass().getSimpleName())
.setThreadPoolId(threadPoolId);
return executorProperties;
}

@ -17,20 +17,18 @@
package cn.hippo4j.springboot.starter.core;
import cn.hippo4j.common.api.ThreadPoolDynamicRefresh;
import cn.hippo4j.common.enums.EnableEnum;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue;
import cn.hippo4j.common.model.ThreadPoolParameter;
import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue;
import cn.hippo4j.core.proxy.RejectedProxyUtil;
import cn.hippo4j.common.api.ThreadPoolDynamicRefresh;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -39,7 +37,6 @@ import java.util.Optional;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT;
@ -71,9 +68,9 @@ public class ServerThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh
boolean originalAllowCoreThreadTimeOut = executor.allowsCoreThreadTimeOut();
Long originalExecuteTimeOut = null;
RejectedExecutionHandler rejectedExecutionHandler = executor.getRejectedExecutionHandler();
if (executor instanceof AbstractDynamicExecutorSupport) {
if (executor instanceof DynamicThreadPoolExecutor) {
DynamicThreadPoolExecutor dynamicExecutor = (DynamicThreadPoolExecutor) executor;
rejectedExecutionHandler = dynamicExecutor.getRedundancyHandler();
rejectedExecutionHandler = dynamicExecutor.getRejectedExecutionHandler();
originalExecuteTimeOut = dynamicExecutor.getExecuteTimeOut();
}
changePoolInfo(executor, parameter);
@ -140,17 +137,11 @@ public class ServerThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh
executor.setKeepAliveTime(parameter.getKeepAliveTime(), TimeUnit.SECONDS);
}
Long executeTimeOut = Optional.ofNullable(parameter.getExecuteTimeOut()).orElse(0L);
if (executeTimeOut != null && executor instanceof AbstractDynamicExecutorSupport) {
if (executor instanceof DynamicThreadPoolExecutor) {
((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(executeTimeOut);
}
if (parameter.getRejectedType() != null) {
RejectedExecutionHandler rejectedExecutionHandler = RejectedPolicyTypeEnum.createPolicy(parameter.getRejectedType());
if (executor instanceof AbstractDynamicExecutorSupport) {
DynamicThreadPoolExecutor dynamicExecutor = (DynamicThreadPoolExecutor) executor;
dynamicExecutor.setRedundancyHandler(rejectedExecutionHandler);
AtomicLong rejectCount = dynamicExecutor.getRejectCount();
rejectedExecutionHandler = RejectedProxyUtil.createProxy(rejectedExecutionHandler, parameter.getTpId(), rejectCount);
}
executor.setRejectedExecutionHandler(rejectedExecutionHandler);
}
if (parameter.getAllowCoreThreadTimeOut() != null) {

@ -33,7 +33,8 @@ import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.*;
import cn.hippo4j.core.executor.support.CommonDynamicThreadPool;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapterChoose;
import cn.hippo4j.core.toolkit.DynamicThreadPoolAnnotationUtil;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
@ -151,7 +152,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
.allowCoreThreadTimeOut(EnableEnum.getBool(threadPoolParameterInfo.getAllowCoreThreadTimeOut()))
.build();
// Set dynamic thread pool enhancement parameters.
if (executor instanceof AbstractDynamicExecutorSupport) {
if (executor instanceof DynamicThreadPoolExecutor) {
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(
BooleanUtil.toBoolean(threadPoolParameterInfo.getIsAlarm().toString()),
threadPoolParameterInfo.getLivenessAlarm(),
@ -159,8 +160,8 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm);
TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) executor).getTaskDecorator();
((DynamicThreadPoolExecutor) newDynamicThreadPoolExecutor).setTaskDecorator(taskDecorator);
long awaitTerminationMillis = ((DynamicThreadPoolExecutor) executor).awaitTerminationMillis;
boolean waitForTasksToCompleteOnShutdown = ((DynamicThreadPoolExecutor) executor).waitForTasksToCompleteOnShutdown;
long awaitTerminationMillis = ((DynamicThreadPoolExecutor) executor).getAwaitTerminationMillis();
boolean waitForTasksToCompleteOnShutdown = ((DynamicThreadPoolExecutor) executor).isWaitForTasksToCompleteOnShutdown();
((DynamicThreadPoolExecutor) newDynamicThreadPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown);
long executeTimeOut = Optional.ofNullable(threadPoolParameterInfo.getExecuteTimeOut())
.orElse(((DynamicThreadPoolExecutor) executor).getExecuteTimeOut());
@ -182,7 +183,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
.isAlarm(false)
.activeAlarm(80)
.capacityAlarm(80)
.rejectedPolicyType(RejectedPolicyTypeEnum.getRejectedPolicyTypeEnumByName(((DynamicThreadPoolExecutor) executor).getRedundancyHandler().getClass().getSimpleName()))
.rejectedPolicyType(RejectedPolicyTypeEnum.getRejectedPolicyTypeEnumByName(executor.getRejectedExecutionHandler().getClass().getSimpleName()))
.build();
DynamicThreadPoolRegisterWrapper registerWrapper = DynamicThreadPoolRegisterWrapper.builder()
.dynamicThreadPoolRegisterParameter(parameterInfo)

Loading…
Cancel
Save