refactor: Make DynamicThreadPoolExecutor support the selection of shutdown mode in destroy

pull/854/head
huangchengxing 3 years ago
parent ceb1efd0e5
commit 78324ba02c

@ -24,7 +24,9 @@ import cn.hippo4j.core.plugin.impl.TaskTimeoutNotifyAlarmPlugin;
import cn.hippo4j.core.plugin.impl.ThreadPoolExecutorShutdownPlugin; import cn.hippo4j.core.plugin.impl.ThreadPoolExecutorShutdownPlugin;
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager; import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginManager;
import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginRegistrar; import cn.hippo4j.core.plugin.manager.DefaultThreadPoolPluginRegistrar;
import lombok.Getter;
import lombok.NonNull; import lombok.NonNull;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.DisposableBean;
import org.springframework.core.task.TaskDecorator; import org.springframework.core.task.TaskDecorator;
@ -42,6 +44,13 @@ import java.util.concurrent.atomic.AtomicLong;
@Slf4j @Slf4j
public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor implements DisposableBean { public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor implements DisposableBean {
/**
* wait for tasks to complete on shutdown
*/
@Getter
@Setter
public boolean waitForTasksToCompleteOnShutdown;
/** /**
* Creates a new {@code DynamicThreadPoolExecutor} with the given initial parameters. * Creates a new {@code DynamicThreadPoolExecutor} with the given initial parameters.
* *
@ -82,10 +91,10 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
threadPoolId, new DefaultThreadPoolPluginManager(), threadPoolId, new DefaultThreadPoolPluginManager(),
corePoolSize, maximumPoolSize, keepAliveTime, unit, corePoolSize, maximumPoolSize, keepAliveTime, unit,
blockingQueue, threadFactory, rejectedExecutionHandler); blockingQueue, threadFactory, rejectedExecutionHandler);
log.info("Initializing ExecutorService" + threadPoolId); log.info("Initializing ExecutorService {}", threadPoolId);
this.waitForTasksToCompleteOnShutdown = waitForTasksToCompleteOnShutdown;
// init default plugins // init default plugins
new DefaultThreadPoolPluginRegistrar(executeTimeOut, awaitTerminationMillis, waitForTasksToCompleteOnShutdown) new DefaultThreadPoolPluginRegistrar(executeTimeOut, awaitTerminationMillis)
.doRegister(this); .doRegister(this);
} }
@ -116,19 +125,6 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
.orElse(-1L); .orElse(-1L);
} }
/**
* Is wait for tasks to complete on shutdown.
*
* @return true if instance wait for tasks to complete on shutdown, false other otherwise.
* @deprecated use {@link ThreadPoolExecutorShutdownPlugin}
*/
@Deprecated
public boolean isWaitForTasksToCompleteOnShutdown() {
return getPluginOfType(ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME, ThreadPoolExecutorShutdownPlugin.class)
.map(ThreadPoolExecutorShutdownPlugin::isWaitForTasksToCompleteOnShutdown)
.orElse(false);
}
/** /**
* Set support param. * Set support param.
* *
@ -138,10 +134,9 @@ public class DynamicThreadPoolExecutor extends ExtensibleThreadPoolExecutor impl
*/ */
@Deprecated @Deprecated
public void setSupportParam(long awaitTerminationMillis, boolean waitForTasksToCompleteOnShutdown) { public void setSupportParam(long awaitTerminationMillis, boolean waitForTasksToCompleteOnShutdown) {
setWaitForTasksToCompleteOnShutdown(waitForTasksToCompleteOnShutdown);
getPluginOfType(ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME, ThreadPoolExecutorShutdownPlugin.class) getPluginOfType(ThreadPoolExecutorShutdownPlugin.PLUGIN_NAME, ThreadPoolExecutorShutdownPlugin.class)
.ifPresent(processor -> processor .ifPresent(processor -> processor.setAwaitTerminationMillis(awaitTerminationMillis));
.setAwaitTerminationMillis(awaitTerminationMillis)
.setWaitForTasksToCompleteOnShutdown(waitForTasksToCompleteOnShutdown));
} }
/** /**

@ -31,9 +31,8 @@ import java.util.List;
import java.util.concurrent.*; import java.util.concurrent.*;
/** /**
* After the thread pool calls {@link ThreadPoolExecutor#shutdown()} or {@link ThreadPoolExecutor#shutdownNow()}, * <p>After the thread pool calls {@link ThreadPoolExecutor#shutdown()} or {@link ThreadPoolExecutor#shutdownNow()}. <br />
* if necessary, cancel the remaining tasks in the pool, * Cancel the remaining tasks in the pool, then wait for the thread pool to terminate until
* and wait for the thread pool to terminate until
* the blocked main thread has timed out or the thread pool has completely terminated. * the blocked main thread has timed out or the thread pool has completely terminated.
*/ */
@Accessors(chain = true) @Accessors(chain = true)
@ -60,12 +59,6 @@ public class ThreadPoolExecutorShutdownPlugin implements ShutdownAwarePlugin {
@Setter @Setter
public long awaitTerminationMillis; public long awaitTerminationMillis;
/**
* wait for tasks to complete on shutdown
*/
@Setter
public boolean waitForTasksToCompleteOnShutdown;
/** /**
* Callback before pool shutdown. * Callback before pool shutdown.
* *
@ -77,14 +70,13 @@ public class ThreadPoolExecutorShutdownPlugin implements ShutdownAwarePlugin {
ExtensibleThreadPoolExecutor dynamicThreadPoolExecutor = (ExtensibleThreadPoolExecutor) executor; ExtensibleThreadPoolExecutor dynamicThreadPoolExecutor = (ExtensibleThreadPoolExecutor) executor;
String threadPoolId = dynamicThreadPoolExecutor.getThreadPoolId(); String threadPoolId = dynamicThreadPoolExecutor.getThreadPoolId();
if (log.isInfoEnabled()) { if (log.isInfoEnabled()) {
log.info("Before shutting down ExecutorService" + " '" + threadPoolId + "'"); log.info("Before shutting down ExecutorService {}", threadPoolId);
} }
} }
} }
/** /**
* Callback after pool shutdown. * Callback after pool shutdown. <br />
* if {@link #waitForTasksToCompleteOnShutdown} return {@code true}
* cancel the remaining tasks, * cancel the remaining tasks,
* then wait for pool to terminate according {@link #awaitTerminationMillis} if necessary. * then wait for pool to terminate according {@link #awaitTerminationMillis} if necessary.
* *
@ -95,7 +87,7 @@ public class ThreadPoolExecutorShutdownPlugin implements ShutdownAwarePlugin {
public void afterShutdown(ThreadPoolExecutor executor, List<Runnable> remainingTasks) { public void afterShutdown(ThreadPoolExecutor executor, List<Runnable> remainingTasks) {
if (executor instanceof ExtensibleThreadPoolExecutor) { if (executor instanceof ExtensibleThreadPoolExecutor) {
ExtensibleThreadPoolExecutor pool = (ExtensibleThreadPoolExecutor) executor; ExtensibleThreadPoolExecutor pool = (ExtensibleThreadPoolExecutor) executor;
if (!waitForTasksToCompleteOnShutdown && CollectionUtil.isNotEmpty(remainingTasks)) { if (CollectionUtil.isNotEmpty(remainingTasks)) {
remainingTasks.forEach(this::cancelRemainingTask); remainingTasks.forEach(this::cancelRemainingTask);
} }
awaitTerminationIfNecessary(pool); awaitTerminationIfNecessary(pool);
@ -110,8 +102,7 @@ public class ThreadPoolExecutorShutdownPlugin implements ShutdownAwarePlugin {
@Override @Override
public PluginRuntime getPluginRuntime() { public PluginRuntime getPluginRuntime() {
return new PluginRuntime(getId()) return new PluginRuntime(getId())
.addInfo("awaitTerminationMillis", awaitTerminationMillis) .addInfo("awaitTerminationMillis", awaitTerminationMillis);
.addInfo("waitForTasksToCompleteOnShutdown", waitForTasksToCompleteOnShutdown);
} }
/** /**
@ -139,11 +130,13 @@ public class ThreadPoolExecutorShutdownPlugin implements ShutdownAwarePlugin {
try { try {
boolean isTerminated = executor.awaitTermination(this.awaitTerminationMillis, TimeUnit.MILLISECONDS); boolean isTerminated = executor.awaitTermination(this.awaitTerminationMillis, TimeUnit.MILLISECONDS);
if (!isTerminated && log.isWarnEnabled()) { if (!isTerminated && log.isWarnEnabled()) {
log.warn("Timed out while waiting for executor" + " '" + threadPoolId + "'" + " to terminate."); log.warn("Timed out while waiting for executor {} to terminate.", threadPoolId);
} else {
log.info("ExecutorService {} has been shutdowned.", threadPoolId);
} }
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
if (log.isWarnEnabled()) { if (log.isWarnEnabled()) {
log.warn("Interrupted while waiting for executor" + " '" + threadPoolId + "'" + " to terminate."); log.warn("Interrupted while waiting for executor {} to terminate.", threadPoolId);
} }
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }

@ -47,11 +47,6 @@ public class DefaultThreadPoolPluginRegistrar implements ThreadPoolPluginRegistr
*/ */
private long awaitTerminationMillis; private long awaitTerminationMillis;
/**
* wait for tasks to complete on shutdown
*/
private boolean waitForTasksToCompleteOnShutdown;
/** /**
* Get id. * Get id.
* *
@ -76,7 +71,7 @@ public class DefaultThreadPoolPluginRegistrar implements ThreadPoolPluginRegistr
support.register(new TaskRejectCountRecordPlugin()); support.register(new TaskRejectCountRecordPlugin());
support.register(new TaskRejectNotifyAlarmPlugin()); support.register(new TaskRejectNotifyAlarmPlugin());
// callback when pool shutdown // callback when pool shutdown
support.register(new ThreadPoolExecutorShutdownPlugin(awaitTerminationMillis, waitForTasksToCompleteOnShutdown)); support.register(new ThreadPoolExecutorShutdownPlugin(awaitTerminationMillis));
} }
} }

Loading…
Cancel
Save