Fix executeTimeOut corner case exception (#405)

pull/414/head
chen.ma 2 years ago
parent cf45b0062a
commit 5d2edb80af

@ -30,10 +30,7 @@ import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
/**
* Dynamic threadPool wrap.
*
* @author chen.ma
* @date 2021/7/8 21:47
* Enhanced dynamic and monitored thread pool.
*/
public class DynamicThreadPoolExecutor extends AbstractDynamicExecutorSupport {
@ -55,7 +52,7 @@ public class DynamicThreadPoolExecutor extends AbstractDynamicExecutorSupport {
@Getter
private final AtomicLong rejectCount = new AtomicLong();
private final ThreadLocal<Long> startTime = new ThreadLocal<>();
private final ThreadLocal<Long> startTimeThreadLocal = new ThreadLocal<>();
public DynamicThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
@ -64,18 +61,18 @@ public class DynamicThreadPoolExecutor extends AbstractDynamicExecutorSupport {
long executeTimeOut,
boolean waitForTasksToCompleteOnShutdown,
long awaitTerminationMillis,
@NonNull BlockingQueue<Runnable> workQueue,
@NonNull BlockingQueue<Runnable> blockingQueue,
@NonNull String threadPoolId,
@NonNull ThreadFactory threadFactory,
@NonNull RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, waitForTasksToCompleteOnShutdown, awaitTerminationMillis, workQueue, threadPoolId, threadFactory, handler);
@NonNull RejectedExecutionHandler rejectedExecutionHandler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, waitForTasksToCompleteOnShutdown, awaitTerminationMillis, blockingQueue, threadPoolId, threadFactory, rejectedExecutionHandler);
this.threadPoolId = threadPoolId;
this.executeTimeOut = executeTimeOut;
// Number of dynamic proxy denial policies.
RejectedExecutionHandler rejectedProxy = RejectedProxyUtil.createProxy(handler, threadPoolId, rejectCount);
RejectedExecutionHandler rejectedProxy = RejectedProxyUtil.createProxy(rejectedExecutionHandler, threadPoolId, rejectCount);
setRejectedExecutionHandler(rejectedProxy);
// Redundant fields to avoid reflecting the acquired fields when sending change information.
redundancyHandler = handler;
redundancyHandler = rejectedExecutionHandler;
}
@Override
@ -91,16 +88,16 @@ public class DynamicThreadPoolExecutor extends AbstractDynamicExecutorSupport {
if (executeTimeOut == null || executeTimeOut <= 0) {
return;
}
this.startTime.set(SystemClock.now());
startTimeThreadLocal.set(SystemClock.now());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
if (executeTimeOut == null || executeTimeOut <= 0) {
Long startTime;
if ((startTime = startTimeThreadLocal.get()) == null) {
return;
}
try {
long startTime = this.startTime.get();
long endTime = SystemClock.now();
long executeTime;
boolean executeTimeAlarm = (executeTime = (endTime - startTime)) > executeTimeOut;
@ -111,7 +108,7 @@ public class DynamicThreadPoolExecutor extends AbstractDynamicExecutorSupport {
}
}
} finally {
this.startTime.remove();
startTimeThreadLocal.remove();
}
}

Loading…
Cancel
Save