refactor: Deprecate AbstractDynamicExecutorSupport and make DynamicThreadPoolExecutor extend ExtensibleThreadPoolExecutor (#815)

pull/854/head
huangchengxing 3 years ago
parent 9396b28535
commit 3b3ab515a5

@ -17,107 +17,231 @@
package cn.hippo4j.core.executor; package cn.hippo4j.core.executor;
import cn.hippo4j.common.config.ApplicationContextHolder; import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport; import cn.hippo4j.core.plugin.impl.TaskDecoratorPlugin;
import cn.hippo4j.core.proxy.RejectedProxyUtil; import cn.hippo4j.core.plugin.impl.TaskRejectCountRecordPlugin;
import cn.hippo4j.core.toolkit.SystemClock; import cn.hippo4j.core.plugin.impl.TaskTimeoutNotifyAlarmPlugin;
import lombok.Getter; import cn.hippo4j.core.plugin.impl.ThreadPoolExecutorShutdownPlugin;
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager;
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginRegistrar;
import lombok.NonNull; import lombok.NonNull;
import lombok.Setter; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.core.task.TaskDecorator; import org.springframework.core.task.TaskDecorator;
import java.util.concurrent.*; import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
/** /**
* Enhanced dynamic and monitored thread pool. * Enhanced dynamic and monitored thread pool.
*/ */
public class DynamicThreadPoolExecutor extends AbstractDynamicExecutorSupport { @Slf4j
public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor implements DisposableBean {
@Getter
@Setter /**
private Long executeTimeOut; * Creates a new {@code DynamicThreadPoolExecutor} with the given initial parameters.
*
@Getter * @param threadPoolId thread-pool id
@Setter * @param executeTimeOut execute time out
private TaskDecorator taskDecorator; * @param waitForTasksToCompleteOnShutdown wait for tasks to complete on shutdown
* @param awaitTerminationMillis await termination millis
@Getter * @param corePoolSize the number of threads to keep in the pool, even
@Setter * if they are idle, unless {@code allowCoreThreadTimeOut} is set
private RejectedExecutionHandler redundancyHandler; * @param maximumPoolSize the maximum number of threads to allow in the
* pool
@Getter * @param keepAliveTime when the number of threads is greater than
private final String threadPoolId; * the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
@Getter * @param unit the time unit for the {@code keepAliveTime} argument
private final AtomicLong rejectCount = new AtomicLong(); * @param blockingQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
private final ThreadLocal<Long> startTimeThreadLocal = new ThreadLocal<>(); * tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor creates a new thread
public DynamicThreadPoolExecutor(int corePoolSize, * @param rejectedExecutionHandler the handler to use when execution is blocked because the thread bounds and queue capacities are reached
int maximumPoolSize, * @throws IllegalArgumentException if one of the following holds:<br>
long keepAliveTime, * {@code corePoolSize < 0}<br>
TimeUnit unit, * {@code keepAliveTime < 0}<br>
long executeTimeOut, * {@code maximumPoolSize <= 0}<br>
boolean waitForTasksToCompleteOnShutdown, * {@code maximumPoolSize < corePoolSize}
long awaitTerminationMillis, * @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public DynamicThreadPoolExecutor(
int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
long executeTimeOut, boolean waitForTasksToCompleteOnShutdown, long awaitTerminationMillis,
@NonNull BlockingQueue<Runnable> blockingQueue, @NonNull BlockingQueue<Runnable> blockingQueue,
@NonNull String threadPoolId, @NonNull String threadPoolId,
@NonNull ThreadFactory threadFactory, @NonNull ThreadFactory threadFactory,
@NonNull RejectedExecutionHandler rejectedExecutionHandler) { @NonNull RejectedExecutionHandler rejectedExecutionHandler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, waitForTasksToCompleteOnShutdown, awaitTerminationMillis, blockingQueue, threadPoolId, threadFactory, rejectedExecutionHandler); super(
this.threadPoolId = threadPoolId; threadPoolId, new DefaultThreadPoolPluginManager(),
this.executeTimeOut = executeTimeOut; corePoolSize, maximumPoolSize, keepAliveTime, unit,
// Number of dynamic proxy denial policies. blockingQueue, threadFactory, rejectedExecutionHandler);
RejectedExecutionHandler rejectedProxy = RejectedProxyUtil.createProxy(rejectedExecutionHandler, threadPoolId, rejectCount); log.info("Initializing ExecutorService" + threadPoolId);
setRejectedExecutionHandler(rejectedProxy);
// Redundant fields to avoid reflecting the acquired fields when sending change information. // init default plugins
redundancyHandler = rejectedExecutionHandler; new DefaultThreadPoolPluginRegistrar(executeTimeOut, awaitTerminationMillis, waitForTasksToCompleteOnShutdown)
.doRegister(this);
} }
/**
* Invoked by the containing {@code BeanFactory} on destruction of a bean.
*
*/
@Override @Override
public void execute(@NonNull Runnable command) { public void destroy() {
if (taskDecorator != null) { if (isWaitForTasksToCompleteOnShutdown()) {
command = taskDecorator.decorate(command); super.shutdown();
} else {
super.shutdownNow();
} }
super.execute(command); getThreadPoolPluginManager().clear();
} }
@Override /**
protected void beforeExecute(Thread t, Runnable r) { * Get await termination millis.
if (executeTimeOut == null || executeTimeOut <= 0) { *
return; * @return await termination millis.
} * @deprecated use {@link ThreadPoolExecutorShutdownPlugin}
startTimeThreadLocal.set(SystemClock.now()); */
@Deprecated
public long getAwaitTerminationMillis() {
return getPluginOfType(ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME, ThreadPoolExecutorShutdownPlugin.class)
.map(ThreadPoolExecutorShutdownPlugin::getAwaitTerminationMillis)
.orElse(-1L);
} }
@Override /**
protected void afterExecute(Runnable r, Throwable t) { * Is wait for tasks to complete on shutdown.
Long startTime; *
if ((startTime = startTimeThreadLocal.get()) == null) { * @return true if instance wait for tasks to complete on shutdown, false other otherwise.
return; * @deprecated use {@link ThreadPoolExecutorShutdownPlugin}
} */
try { @Deprecated
long endTime = SystemClock.now(); public boolean isWaitForTasksToCompleteOnShutdown() {
long executeTime; return getPluginOfType(ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME, ThreadPoolExecutorShutdownPlugin.class)
boolean executeTimeAlarm = (executeTime = (endTime - startTime)) > executeTimeOut; .map(ThreadPoolExecutorShutdownPlugin::isWaitForTasksToCompleteOnShutdown)
if (executeTimeAlarm && ApplicationContextHolder.getInstance() != null) { .orElse(false);
ThreadPoolNotifyAlarmHandler notifyAlarmHandler = ApplicationContextHolder.getBean(ThreadPoolNotifyAlarmHandler.class);
if (notifyAlarmHandler != null) {
notifyAlarmHandler.asyncSendExecuteTimeOutAlarm(threadPoolId, executeTime, executeTimeOut, this);
}
}
} finally {
startTimeThreadLocal.remove();
}
} }
@Override /**
protected ExecutorService initializeExecutor() { * Set support param.
return this; *
* @param awaitTerminationMillis await termination millis
* @param waitForTasksToCompleteOnShutdown wait for tasks to complete on shutdown
* @deprecated use {@link ThreadPoolExecutorShutdownPlugin}
*/
@Deprecated
public void setSupportParam(long awaitTerminationMillis, boolean waitForTasksToCompleteOnShutdown) {
getPluginOfType(ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME, ThreadPoolExecutorShutdownPlugin.class)
.ifPresent(processor -> processor
.setAwaitTerminationMillis(awaitTerminationMillis)
.setWaitForTasksToCompleteOnShutdown(waitForTasksToCompleteOnShutdown));
} }
/**
* Get reject count num.
*
* @return reject count num
* @deprecated use {@link TaskRejectCountRecordPlugin}
*/
@Deprecated
public Long getRejectCountNum() { public Long getRejectCountNum() {
return rejectCount.get(); return getPluginOfType(TaskRejectCountRecordPlugin.PLUGIN_NAME, TaskRejectCountRecordPlugin.class)
.map(TaskRejectCountRecordPlugin::getRejectCountNum)
.orElse(-1L);
}
/**
* Get reject count.
*
* @return reject count num
* @deprecated use {@link TaskRejectCountRecordPlugin}
*/
@Deprecated
public AtomicLong getRejectCount() {
return getPluginOfType(TaskRejectCountRecordPlugin.PLUGIN_NAME, TaskRejectCountRecordPlugin.class)
.map(TaskRejectCountRecordPlugin::getRejectCount)
.orElse(new AtomicLong(0));
}
/**
* Get execute time out.
*
* @deprecated use {@link TaskTimeoutNotifyAlarmPlugin}
*/
@Deprecated
public Long getExecuteTimeOut() {
return getPluginOfType(TaskTimeoutNotifyAlarmPlugin.PLUGIN_NAME, TaskTimeoutNotifyAlarmPlugin.class)
.map(TaskTimeoutNotifyAlarmPlugin::getExecuteTimeOut)
.orElse(-1L);
}
/**
* Set execute time out.
*
* @param executeTimeOut execute time out
* @deprecated use {@link TaskTimeoutNotifyAlarmPlugin}
*/
@Deprecated
public void setExecuteTimeOut(Long executeTimeOut) {
getPluginOfType(TaskTimeoutNotifyAlarmPlugin.PLUGIN_NAME, TaskTimeoutNotifyAlarmPlugin.class)
.ifPresent(processor -> processor.setExecuteTimeOut(executeTimeOut));
}
/**
* Get {@link TaskDecorator}.
*
* @deprecated use {@link TaskDecoratorPlugin}
*/
@Deprecated
public TaskDecorator getTaskDecorator() {
return getPluginOfType(TaskDecoratorPlugin.PLUGIN_NAME, TaskDecoratorPlugin.class)
.map(processor -> CollectionUtil.getFirst(processor.getDecorators()))
.orElse(null);
} }
/**
* Set {@link TaskDecorator}.
*
* @param taskDecorator task decorator
* @deprecated use {@link TaskDecoratorPlugin}
*/
@Deprecated
public void setTaskDecorator(TaskDecorator taskDecorator) {
getPluginOfType(TaskDecoratorPlugin.PLUGIN_NAME, TaskDecoratorPlugin.class)
.ifPresent(processor -> {
if (Objects.nonNull(taskDecorator)) {
processor.clearDecorators();
processor.addDecorator(taskDecorator);
}
});
}
/**
* Get rejected execution handler.
*
* @deprecated use {@link DynamicThreadPoolExecutor#getRejectedExecutionHandler}
*/
@Deprecated
public RejectedExecutionHandler getRedundancyHandler() {
return getRejectedExecutionHandler();
}
/**
* Set rejected execution handler.
*
* @param handler handler
* @deprecated use {@link DynamicThreadPoolExecutor#setRejectedExecutionHandler}
*/
@Deprecated
public void setRedundancyHandler(RejectedExecutionHandler handler) {
setRejectedExecutionHandler(handler);
}
} }

@ -17,7 +17,6 @@
package cn.hippo4j.core.executor; package cn.hippo4j.core.executor;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.core.executor.support.CommonDynamicThreadPool; import cn.hippo4j.core.executor.support.CommonDynamicThreadPool;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Builder; import lombok.Builder;
@ -67,8 +66,8 @@ public class DynamicThreadPoolWrapper implements DisposableBean {
@Override @Override
public void destroy() throws Exception { public void destroy() throws Exception {
if (executor != null && executor instanceof AbstractDynamicExecutorSupport) { if (executor != null && executor instanceof DynamicThreadPoolExecutor) {
((AbstractDynamicExecutorSupport) executor).destroy(); ((DynamicThreadPoolExecutor) executor).destroy();
} }
} }
} }

@ -22,13 +22,13 @@ import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage; import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder; import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import cn.hippo4j.core.toolkit.IdentifyUtil;
import cn.hippo4j.core.toolkit.ExecutorTraceContextUtil; import cn.hippo4j.core.toolkit.ExecutorTraceContextUtil;
import cn.hippo4j.message.service.Hippo4jSendMessageService; import cn.hippo4j.core.toolkit.IdentifyUtil;
import cn.hippo4j.message.enums.NotifyTypeEnum; import cn.hippo4j.message.enums.NotifyTypeEnum;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import cn.hippo4j.message.request.AlarmNotifyRequest; import cn.hippo4j.message.request.AlarmNotifyRequest;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest; import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.message.service.Hippo4jSendMessageService;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
import lombok.NonNull; import lombok.NonNull;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -211,9 +211,7 @@ public class ThreadPoolNotifyAlarmHandler implements Runnable, CommandLineRunner
*/ */
public AlarmNotifyRequest buildAlarmNotifyRequest(ThreadPoolExecutor threadPoolExecutor) { public AlarmNotifyRequest buildAlarmNotifyRequest(ThreadPoolExecutor threadPoolExecutor) {
BlockingQueue<Runnable> blockingQueue = threadPoolExecutor.getQueue(); BlockingQueue<Runnable> blockingQueue = threadPoolExecutor.getQueue();
RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor instanceof DynamicThreadPoolExecutor RejectedExecutionHandler rejectedExecutionHandler = threadPoolExecutor.getRejectedExecutionHandler();
? ((DynamicThreadPoolExecutor) threadPoolExecutor).getRedundancyHandler()
: threadPoolExecutor.getRejectedExecutionHandler();
long rejectCount = threadPoolExecutor instanceof DynamicThreadPoolExecutor long rejectCount = threadPoolExecutor instanceof DynamicThreadPoolExecutor
? ((DynamicThreadPoolExecutor) threadPoolExecutor).getRejectCountNum() ? ((DynamicThreadPoolExecutor) threadPoolExecutor).getRejectCountNum()
: -1L; : -1L;

@ -23,10 +23,8 @@ import cn.hippo4j.common.toolkit.BeanUtil;
import cn.hippo4j.common.toolkit.ByteConvertUtil; import cn.hippo4j.common.toolkit.ByteConvertUtil;
import cn.hippo4j.common.toolkit.MemoryUtil; import cn.hippo4j.common.toolkit.MemoryUtil;
import cn.hippo4j.common.toolkit.StringUtil; import cn.hippo4j.common.toolkit.StringUtil;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.core.toolkit.inet.InetUtils; import cn.hippo4j.core.toolkit.inet.InetUtils;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -66,11 +64,7 @@ public class ThreadPoolRunStateHandler extends AbstractThreadPoolRuntime {
DynamicThreadPoolWrapper executorService = GlobalThreadPoolManage.getExecutorService(threadPoolId); DynamicThreadPoolWrapper executorService = GlobalThreadPoolManage.getExecutorService(threadPoolId);
ThreadPoolExecutor pool = executorService.getExecutor(); ThreadPoolExecutor pool = executorService.getExecutor();
String rejectedName; String rejectedName;
if (pool instanceof AbstractDynamicExecutorSupport) { rejectedName = pool.getRejectedExecutionHandler().getClass().getSimpleName();
rejectedName = ((DynamicThreadPoolExecutor) pool).getRedundancyHandler().getClass().getSimpleName();
} else {
rejectedName = pool.getRejectedExecutionHandler().getClass().getSimpleName();
}
poolRunStateInfo.setRejectedName(rejectedName); poolRunStateInfo.setRejectedName(rejectedName);
ManyThreadPoolRunStateInfo manyThreadPoolRunStateInfo = BeanUtil.convert(poolRunStateInfo, ManyThreadPoolRunStateInfo.class); ManyThreadPoolRunStateInfo manyThreadPoolRunStateInfo = BeanUtil.convert(poolRunStateInfo, ManyThreadPoolRunStateInfo.class);
manyThreadPoolRunStateInfo.setIdentify(CLIENT_IDENTIFICATION_VALUE); manyThreadPoolRunStateInfo.setIdentify(CLIENT_IDENTIFICATION_VALUE);

@ -17,6 +17,7 @@
package cn.hippo4j.core.executor.support; package cn.hippo4j.core.executor.support;
import cn.hippo4j.core.plugin.impl.ThreadPoolExecutorShutdownPlugin;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.InitializingBean;
@ -25,7 +26,10 @@ import java.util.concurrent.*;
/** /**
* Dynamic executor configuration support. * Dynamic executor configuration support.
*
* @deprecated use {@link ThreadPoolExecutorShutdownPlugin} to get thread-pool shutdown support
*/ */
@Deprecated
@Slf4j @Slf4j
public abstract class AbstractDynamicExecutorSupport extends ThreadPoolExecutor implements InitializingBean, DisposableBean { public abstract class AbstractDynamicExecutorSupport extends ThreadPoolExecutor implements InitializingBean, DisposableBean {

@ -29,8 +29,6 @@ import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage; import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport;
import cn.hippo4j.core.proxy.RejectedProxyUtil;
import cn.hippo4j.message.dto.NotifyConfigDTO; import cn.hippo4j.message.dto.NotifyConfigDTO;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest; import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.message.service.Hippo4jBaseSendMessageService; import cn.hippo4j.message.service.Hippo4jBaseSendMessageService;
@ -43,7 +41,6 @@ import java.util.*;
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER; import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT; import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT;
@ -182,6 +179,7 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener<Ex
Boolean isAlarm = executorProperties.getAlarm(); Boolean isAlarm = executorProperties.getAlarm();
Integer activeAlarm = executorProperties.getActiveAlarm(); Integer activeAlarm = executorProperties.getActiveAlarm();
Integer capacityAlarm = executorProperties.getCapacityAlarm(); Integer capacityAlarm = executorProperties.getCapacityAlarm();
// FIXME Compare using Objects.equals
if ((isAlarm != null && isAlarm != threadPoolNotifyAlarm.getAlarm()) if ((isAlarm != null && isAlarm != threadPoolNotifyAlarm.getAlarm())
|| (activeAlarm != null && activeAlarm != threadPoolNotifyAlarm.getActiveAlarm()) || (activeAlarm != null && activeAlarm != threadPoolNotifyAlarm.getActiveAlarm())
|| (capacityAlarm != null && capacityAlarm != threadPoolNotifyAlarm.getCapacityAlarm())) { || (capacityAlarm != null && capacityAlarm != threadPoolNotifyAlarm.getCapacityAlarm())) {
@ -249,18 +247,12 @@ public class DynamicThreadPoolRefreshListener extends AbstractRefreshListener<Ex
executor.allowCoreThreadTimeOut(properties.getAllowCoreThreadTimeOut()); executor.allowCoreThreadTimeOut(properties.getAllowCoreThreadTimeOut());
} }
if (properties.getExecuteTimeOut() != null && !Objects.equals(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut())) { if (properties.getExecuteTimeOut() != null && !Objects.equals(beforeProperties.getExecuteTimeOut(), properties.getExecuteTimeOut())) {
if (executor instanceof AbstractDynamicExecutorSupport) { if (executor instanceof DynamicThreadPoolExecutor) {
((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(properties.getExecuteTimeOut()); ((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(properties.getExecuteTimeOut());
} }
} }
if (properties.getRejectedHandler() != null && !Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler())) { if (properties.getRejectedHandler() != null && !Objects.equals(beforeProperties.getRejectedHandler(), properties.getRejectedHandler())) {
RejectedExecutionHandler rejectedExecutionHandler = RejectedPolicyTypeEnum.createPolicy(properties.getRejectedHandler()); RejectedExecutionHandler rejectedExecutionHandler = RejectedPolicyTypeEnum.createPolicy(properties.getRejectedHandler());
if (executor instanceof AbstractDynamicExecutorSupport) {
DynamicThreadPoolExecutor dynamicExecutor = (DynamicThreadPoolExecutor) executor;
dynamicExecutor.setRedundancyHandler(rejectedExecutionHandler);
AtomicLong rejectCount = dynamicExecutor.getRejectCount();
rejectedExecutionHandler = RejectedProxyUtil.createProxy(rejectedExecutionHandler, threadPoolId, rejectCount);
}
executor.setRejectedExecutionHandler(rejectedExecutionHandler); executor.setRejectedExecutionHandler(rejectedExecutionHandler);
} }
if (properties.getKeepAliveTime() != null && !Objects.equals(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime())) { if (properties.getKeepAliveTime() != null && !Objects.equals(beforeProperties.getKeepAliveTime(), properties.getKeepAliveTime())) {

@ -123,8 +123,8 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
DynamicThreadPoolExecutor actualDynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) dynamicThreadPoolWrapper.getExecutor(); DynamicThreadPoolExecutor actualDynamicThreadPoolExecutor = (DynamicThreadPoolExecutor) dynamicThreadPoolWrapper.getExecutor();
TaskDecorator taskDecorator = actualDynamicThreadPoolExecutor.getTaskDecorator(); TaskDecorator taskDecorator = actualDynamicThreadPoolExecutor.getTaskDecorator();
((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskDecorator(taskDecorator); ((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setTaskDecorator(taskDecorator);
long awaitTerminationMillis = actualDynamicThreadPoolExecutor.awaitTerminationMillis; long awaitTerminationMillis = actualDynamicThreadPoolExecutor.getAwaitTerminationMillis();
boolean waitForTasksToCompleteOnShutdown = actualDynamicThreadPoolExecutor.waitForTasksToCompleteOnShutdown; boolean waitForTasksToCompleteOnShutdown = actualDynamicThreadPoolExecutor.isWaitForTasksToCompleteOnShutdown();
((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown); ((DynamicThreadPoolExecutor) newDynamicPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown);
dynamicThreadPoolWrapper.setExecutor(newDynamicPoolExecutor); dynamicThreadPoolWrapper.setExecutor(newDynamicPoolExecutor);
} }
@ -155,7 +155,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
.setBlockingQueue(queueType) .setBlockingQueue(queueType)
.setExecuteTimeOut(10000L) .setExecuteTimeOut(10000L)
.setQueueCapacity(queueCapacity) .setQueueCapacity(queueCapacity)
.setRejectedHandler(((DynamicThreadPoolExecutor) executor).getRedundancyHandler().getClass().getSimpleName()) .setRejectedHandler(executor.getRejectedExecutionHandler().getClass().getSimpleName())
.setThreadPoolId(threadPoolId); .setThreadPoolId(threadPoolId);
return executorProperties; return executorProperties;
} }

@ -17,20 +17,18 @@
package cn.hippo4j.springboot.starter.core; package cn.hippo4j.springboot.starter.core;
import cn.hippo4j.common.api.ThreadPoolDynamicRefresh;
import cn.hippo4j.common.enums.EnableEnum; import cn.hippo4j.common.enums.EnableEnum;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue;
import cn.hippo4j.common.model.ThreadPoolParameter; import cn.hippo4j.common.model.ThreadPoolParameter;
import cn.hippo4j.common.model.ThreadPoolParameterInfo; import cn.hippo4j.common.model.ThreadPoolParameterInfo;
import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.common.toolkit.JSONUtil; import cn.hippo4j.common.toolkit.JSONUtil;
import cn.hippo4j.core.executor.DynamicThreadPoolExecutor; import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler; import cn.hippo4j.core.executor.ThreadPoolNotifyAlarmHandler;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.AbstractDynamicExecutorSupport; import cn.hippo4j.message.request.ChangeParameterNotifyRequest;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.executor.support.RejectedPolicyTypeEnum;
import cn.hippo4j.common.executor.support.ResizableCapacityLinkedBlockingQueue;
import cn.hippo4j.core.proxy.RejectedProxyUtil;
import cn.hippo4j.common.api.ThreadPoolDynamicRefresh;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -39,7 +37,6 @@ import java.util.Optional;
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER; import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_DELIMITER;
import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT; import static cn.hippo4j.common.constant.ChangeThreadPoolConstants.CHANGE_THREAD_POOL_TEXT;
@ -71,9 +68,9 @@ public class ServerThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh
boolean originalAllowCoreThreadTimeOut = executor.allowsCoreThreadTimeOut(); boolean originalAllowCoreThreadTimeOut = executor.allowsCoreThreadTimeOut();
Long originalExecuteTimeOut = null; Long originalExecuteTimeOut = null;
RejectedExecutionHandler rejectedExecutionHandler = executor.getRejectedExecutionHandler(); RejectedExecutionHandler rejectedExecutionHandler = executor.getRejectedExecutionHandler();
if (executor instanceof AbstractDynamicExecutorSupport) { if (executor instanceof DynamicThreadPoolExecutor) {
DynamicThreadPoolExecutor dynamicExecutor = (DynamicThreadPoolExecutor) executor; DynamicThreadPoolExecutor dynamicExecutor = (DynamicThreadPoolExecutor) executor;
rejectedExecutionHandler = dynamicExecutor.getRedundancyHandler(); rejectedExecutionHandler = dynamicExecutor.getRejectedExecutionHandler();
originalExecuteTimeOut = dynamicExecutor.getExecuteTimeOut(); originalExecuteTimeOut = dynamicExecutor.getExecuteTimeOut();
} }
changePoolInfo(executor, parameter); changePoolInfo(executor, parameter);
@ -140,17 +137,11 @@ public class ServerThreadPoolDynamicRefresh implements ThreadPoolDynamicRefresh
executor.setKeepAliveTime(parameter.getKeepAliveTime(), TimeUnit.SECONDS); executor.setKeepAliveTime(parameter.getKeepAliveTime(), TimeUnit.SECONDS);
} }
Long executeTimeOut = Optional.ofNullable(parameter.getExecuteTimeOut()).orElse(0L); Long executeTimeOut = Optional.ofNullable(parameter.getExecuteTimeOut()).orElse(0L);
if (executeTimeOut != null && executor instanceof AbstractDynamicExecutorSupport) { if (executor instanceof DynamicThreadPoolExecutor) {
((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(executeTimeOut); ((DynamicThreadPoolExecutor) executor).setExecuteTimeOut(executeTimeOut);
} }
if (parameter.getRejectedType() != null) { if (parameter.getRejectedType() != null) {
RejectedExecutionHandler rejectedExecutionHandler = RejectedPolicyTypeEnum.createPolicy(parameter.getRejectedType()); RejectedExecutionHandler rejectedExecutionHandler = RejectedPolicyTypeEnum.createPolicy(parameter.getRejectedType());
if (executor instanceof AbstractDynamicExecutorSupport) {
DynamicThreadPoolExecutor dynamicExecutor = (DynamicThreadPoolExecutor) executor;
dynamicExecutor.setRedundancyHandler(rejectedExecutionHandler);
AtomicLong rejectCount = dynamicExecutor.getRejectCount();
rejectedExecutionHandler = RejectedProxyUtil.createProxy(rejectedExecutionHandler, parameter.getTpId(), rejectCount);
}
executor.setRejectedExecutionHandler(rejectedExecutionHandler); executor.setRejectedExecutionHandler(rejectedExecutionHandler);
} }
if (parameter.getAllowCoreThreadTimeOut() != null) { if (parameter.getAllowCoreThreadTimeOut() != null) {

@ -33,7 +33,8 @@ import cn.hippo4j.core.executor.DynamicThreadPoolExecutor;
import cn.hippo4j.core.executor.DynamicThreadPoolWrapper; import cn.hippo4j.core.executor.DynamicThreadPoolWrapper;
import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage; import cn.hippo4j.core.executor.manage.GlobalNotifyAlarmManage;
import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage; import cn.hippo4j.core.executor.manage.GlobalThreadPoolManage;
import cn.hippo4j.core.executor.support.*; import cn.hippo4j.core.executor.support.CommonDynamicThreadPool;
import cn.hippo4j.core.executor.support.ThreadPoolBuilder;
import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapterChoose; import cn.hippo4j.core.executor.support.adpter.DynamicThreadPoolAdapterChoose;
import cn.hippo4j.core.toolkit.DynamicThreadPoolAnnotationUtil; import cn.hippo4j.core.toolkit.DynamicThreadPoolAnnotationUtil;
import cn.hippo4j.message.service.ThreadPoolNotifyAlarm; import cn.hippo4j.message.service.ThreadPoolNotifyAlarm;
@ -151,7 +152,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
.allowCoreThreadTimeOut(EnableEnum.getBool(threadPoolParameterInfo.getAllowCoreThreadTimeOut())) .allowCoreThreadTimeOut(EnableEnum.getBool(threadPoolParameterInfo.getAllowCoreThreadTimeOut()))
.build(); .build();
// Set dynamic thread pool enhancement parameters. // Set dynamic thread pool enhancement parameters.
if (executor instanceof AbstractDynamicExecutorSupport) { if (executor instanceof DynamicThreadPoolExecutor) {
ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm( ThreadPoolNotifyAlarm threadPoolNotifyAlarm = new ThreadPoolNotifyAlarm(
BooleanUtil.toBoolean(threadPoolParameterInfo.getIsAlarm().toString()), BooleanUtil.toBoolean(threadPoolParameterInfo.getIsAlarm().toString()),
threadPoolParameterInfo.getLivenessAlarm(), threadPoolParameterInfo.getLivenessAlarm(),
@ -159,8 +160,8 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm); GlobalNotifyAlarmManage.put(threadPoolId, threadPoolNotifyAlarm);
TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) executor).getTaskDecorator(); TaskDecorator taskDecorator = ((DynamicThreadPoolExecutor) executor).getTaskDecorator();
((DynamicThreadPoolExecutor) newDynamicThreadPoolExecutor).setTaskDecorator(taskDecorator); ((DynamicThreadPoolExecutor) newDynamicThreadPoolExecutor).setTaskDecorator(taskDecorator);
long awaitTerminationMillis = ((DynamicThreadPoolExecutor) executor).awaitTerminationMillis; long awaitTerminationMillis = ((DynamicThreadPoolExecutor) executor).getAwaitTerminationMillis();
boolean waitForTasksToCompleteOnShutdown = ((DynamicThreadPoolExecutor) executor).waitForTasksToCompleteOnShutdown; boolean waitForTasksToCompleteOnShutdown = ((DynamicThreadPoolExecutor) executor).isWaitForTasksToCompleteOnShutdown();
((DynamicThreadPoolExecutor) newDynamicThreadPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown); ((DynamicThreadPoolExecutor) newDynamicThreadPoolExecutor).setSupportParam(awaitTerminationMillis, waitForTasksToCompleteOnShutdown);
long executeTimeOut = Optional.ofNullable(threadPoolParameterInfo.getExecuteTimeOut()) long executeTimeOut = Optional.ofNullable(threadPoolParameterInfo.getExecuteTimeOut())
.orElse(((DynamicThreadPoolExecutor) executor).getExecuteTimeOut()); .orElse(((DynamicThreadPoolExecutor) executor).getExecuteTimeOut());
@ -182,7 +183,7 @@ public final class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
.isAlarm(false) .isAlarm(false)
.activeAlarm(80) .activeAlarm(80)
.capacityAlarm(80) .capacityAlarm(80)
.rejectedPolicyType(RejectedPolicyTypeEnum.getRejectedPolicyTypeEnumByName(((DynamicThreadPoolExecutor) executor).getRedundancyHandler().getClass().getSimpleName())) .rejectedPolicyType(RejectedPolicyTypeEnum.getRejectedPolicyTypeEnumByName(executor.getRejectedExecutionHandler().getClass().getSimpleName()))
.build(); .build();
DynamicThreadPoolRegisterWrapper registerWrapper = DynamicThreadPoolRegisterWrapper.builder() DynamicThreadPoolRegisterWrapper registerWrapper = DynamicThreadPoolRegisterWrapper.builder()
.dynamicThreadPoolRegisterParameter(parameterInfo) .dynamicThreadPoolRegisterParameter(parameterInfo)

Loading…
Cancel
Save