Optimize custom thread pool code.

pull/161/head
chen.ma 3 years ago
parent 5cf3009db9
commit d148789f45

@ -48,4 +48,6 @@ public class Constants {
public static final int SCHEDULED_THREAD_CORE_NUM = 1; public static final int SCHEDULED_THREAD_CORE_NUM = 1;
public static final int MAP_INITIAL_CAPACITY = 16;
} }

@ -1,6 +1,7 @@
package com.github.dynamic.threadpool.starter.toolkit.thread; package com.github.dynamic.threadpool.starter.toolkit.thread;
import lombok.Getter; import lombok.NoArgsConstructor;
import lombok.NonNull;
import java.security.AccessControlContext; import java.security.AccessControlContext;
import java.security.AccessController; import java.security.AccessController;
@ -12,14 +13,15 @@ import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import static com.github.dynamic.threadpool.common.constant.Constants.MAP_INITIAL_CAPACITY;
/** /**
* Custom Thread Pool Wrap. * Custom Thread Pool Wrap.
* *
* @author chen.ma * @author chen.ma
* @date 2021/7/8 21:47 * @date 2021/7/8 21:47
*/ */
@Getter public final class CustomThreadPoolExecutor extends ThreadPoolExecutor {
public final class CustomThreadPoolExecutor extends ThreadPoolExecutorTemplate {
private final AtomicInteger regectCount = new AtomicInteger(); private final AtomicInteger regectCount = new AtomicInteger();
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
@ -33,6 +35,77 @@ public final class CustomThreadPoolExecutor extends ThreadPoolExecutorTemplate {
private static final int TIDYING = 2 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
private final BlockingQueue<Runnable> workQueue;
private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet();
private final Condition termination = mainLock.newCondition();
private int largestPoolSize;
private long completedTaskCount;
private volatile long keepAliveTime;
private volatile boolean allowCoreThreadTimeOut;
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private final AccessControlContext acc;
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;
private static final RejectedExecutionHandler DEFAULT_HANDLER = new ThreadPoolExecutor.AbortPolicy();
private static final RuntimePermission SHUTDOWN_PERM = new RuntimePermission("modifyThread");
public CustomThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), DEFAULT_HANDLER);
}
public CustomThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, DEFAULT_HANDLER);
}
public CustomThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);
}
public CustomThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
@NonNull BlockingQueue<Runnable> workQueue,
@NonNull ThreadFactory threadFactory,
@NonNull RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0) {
throw new IllegalArgumentException();
}
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
}
private static int runStateOf(int c) { private static int runStateOf(int c) {
return c & ~CAPACITY; return c & ~CAPACITY;
} }
@ -74,38 +147,6 @@ public final class CustomThreadPoolExecutor extends ThreadPoolExecutorTemplate {
return regectCount.get(); return regectCount.get();
} }
private final BlockingQueue<Runnable> workQueue;
private final ReentrantLock mainLock = new ReentrantLock();
private final HashSet<Worker> workers = new HashSet();
private final Condition termination = mainLock.newCondition();
private int largestPoolSize;
private long completedTaskCount;
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;
private volatile long keepAliveTime;
private volatile boolean allowCoreThreadTimeOut;
private volatile int corePoolSize;
private volatile int maximumPoolSize;
private static final RejectedExecutionHandler defaultHandler =
new ThreadPoolExecutor.AbortPolicy();
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
private final AccessControlContext acc;
private final class Worker private final class Worker
extends AbstractQueuedSynchronizer extends AbstractQueuedSynchronizer
implements Runnable { implements Runnable {
@ -117,7 +158,7 @@ public final class CustomThreadPoolExecutor extends ThreadPoolExecutorTemplate {
volatile long completedTasks; volatile long completedTasks;
Worker(Runnable firstTask) { Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker setState(-1);
this.firstTask = firstTask; this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); this.thread = getThreadFactory().newThread(this);
} }
@ -188,9 +229,9 @@ public final class CustomThreadPoolExecutor extends ThreadPoolExecutorTemplate {
final void tryTerminate() { final void tryTerminate() {
for (; ; ) { for (; ; ) {
int c = ctl.get(); int c = ctl.get();
if (isRunning(c) || if (isRunning(c)
runStateAtLeast(c, TIDYING) || || runStateAtLeast(c, TIDYING)
(runStateOf(c) == SHUTDOWN && !workQueue.isEmpty())) { || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty())) {
return; return;
} }
if (workerCountOf(c) != 0) { if (workerCountOf(c) != 0) {
@ -219,7 +260,7 @@ public final class CustomThreadPoolExecutor extends ThreadPoolExecutorTemplate {
private void checkShutdownAccess() { private void checkShutdownAccess() {
SecurityManager security = System.getSecurityManager(); SecurityManager security = System.getSecurityManager();
if (security != null) { if (security != null) {
security.checkPermission(shutdownPerm); security.checkPermission(SHUTDOWN_PERM);
final ReentrantLock mainLock = this.mainLock; final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); mainLock.lock();
try { try {
@ -281,9 +322,9 @@ public final class CustomThreadPoolExecutor extends ThreadPoolExecutorTemplate {
void onShutdown() { void onShutdown() {
} }
final boolean isRunningOrShutdown(boolean shutdownOK) { final boolean isRunningOrShutdown(boolean shutdownOk) {
int rs = runStateOf(ctl.get()); int rs = runStateOf(ctl.get());
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK); return rs == RUNNING || (rs == SHUTDOWN && shutdownOk);
} }
private List<Runnable> drainQueue() { private List<Runnable> drainQueue() {
@ -307,10 +348,7 @@ public final class CustomThreadPoolExecutor extends ThreadPoolExecutorTemplate {
int rs = runStateOf(c); int rs = runStateOf(c);
// Check if queue empty only if necessary. // Check if queue empty only if necessary.
if (rs >= SHUTDOWN && if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) {
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty())) {
return false; return false;
} }
@ -342,8 +380,7 @@ public final class CustomThreadPoolExecutor extends ThreadPoolExecutorTemplate {
try { try {
int rs = runStateOf(ctl.get()); int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN || if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) { if (t.isAlive()) {
throw new IllegalThreadStateException(); throw new IllegalThreadStateException();
} }
@ -408,7 +445,7 @@ public final class CustomThreadPoolExecutor extends ThreadPoolExecutorTemplate {
min = 1; min = 1;
} }
if (workerCountOf(c) >= min) { if (workerCountOf(c) >= min) {
return; // replacement not needed return;
} }
} }
addWorker(null, false); addWorker(null, false);
@ -431,8 +468,7 @@ public final class CustomThreadPoolExecutor extends ThreadPoolExecutorTemplate {
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut)) if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c)) { if (compareAndDecrementWorkerCount(c)) {
return null; return null;
} }
@ -462,10 +498,9 @@ public final class CustomThreadPoolExecutor extends ThreadPoolExecutorTemplate {
try { try {
while (task != null || (task = getTask()) != null) { while (task != null || (task = getTask()) != null) {
w.lock(); w.lock();
if ((runStateAtLeast(ctl.get(), STOP) || if ((runStateAtLeast(ctl.get(), STOP)
(Thread.interrupted() && || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
runStateAtLeast(ctl.get(), STOP))) && && !wt.isInterrupted()) {
!wt.isInterrupted()) {
wt.interrupt(); wt.interrupt();
} }
try { try {
@ -497,68 +532,8 @@ public final class CustomThreadPoolExecutor extends ThreadPoolExecutorTemplate {
} }
} }
public CustomThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public CustomThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public CustomThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public CustomThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0) {
throw new IllegalArgumentException();
}
if (workQueue == null || threadFactory == null || handler == null) {
throw new NullPointerException();
}
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
@Override @Override
public void execute(Runnable command) { public void execute(@NonNull Runnable command) {
if (command == null) {
throw new NullPointerException();
}
int c = ctl.get(); int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) { if (addWorker(command, true)) {
@ -662,10 +637,7 @@ public final class CustomThreadPoolExecutor extends ThreadPoolExecutorTemplate {
} }
@Override @Override
public void setThreadFactory(ThreadFactory threadFactory) { public void setThreadFactory(@NonNull ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException();
}
this.threadFactory = threadFactory; this.threadFactory = threadFactory;
} }
@ -675,10 +647,7 @@ public final class CustomThreadPoolExecutor extends ThreadPoolExecutorTemplate {
} }
@Override @Override
public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { public void setRejectedExecutionHandler(@NonNull RejectedExecutionHandler handler) {
if (handler == null) {
throw new NullPointerException();
}
this.handler = handler; this.handler = handler;
} }
@ -829,8 +798,7 @@ public final class CustomThreadPoolExecutor extends ThreadPoolExecutorTemplate {
final ReentrantLock mainLock = this.mainLock; final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); mainLock.lock();
try { try {
return runStateAtLeast(ctl.get(), TIDYING) ? 0 return runStateAtLeast(ctl.get(), TIDYING) ? 0 : workers.size();
: workers.size();
} finally { } finally {
mainLock.unlock(); mainLock.unlock();
} }
@ -929,23 +897,27 @@ public final class CustomThreadPoolExecutor extends ThreadPoolExecutorTemplate {
"]"; "]";
} }
private ConcurrentHashMap<String, Date> statisticsTime = new ConcurrentHashMap(MAP_INITIAL_CAPACITY);
@Override @Override
protected void beforeExecute(Thread t, Runnable r) { protected void beforeExecute(Thread t, Runnable r) {
statisticsTime.put(String.valueOf(r.hashCode()), new Date());
} }
@Override @Override
protected void afterExecute(Runnable r, Throwable t) { protected void afterExecute(Runnable r, Throwable t) {
Date startDate = statisticsTime.remove(String.valueOf(r.hashCode()));
Date finishDate = new Date();
long diff = finishDate.getTime() - startDate.getTime();
} }
@Override @Override
protected void terminated() { protected void terminated() {
} }
@NoArgsConstructor
public static class CallerRunsPolicy implements RejectedExecutionHandler { public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() {
}
@Override @Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) { if (!e.isShutdown()) {
@ -954,11 +926,8 @@ public final class CustomThreadPoolExecutor extends ThreadPoolExecutorTemplate {
} }
} }
@NoArgsConstructor
public static class AbortPolicy implements RejectedExecutionHandler { public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() {
}
@Override @Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() + throw new RejectedExecutionException("Task " + r.toString() +
@ -967,21 +936,15 @@ public final class CustomThreadPoolExecutor extends ThreadPoolExecutorTemplate {
} }
} }
@NoArgsConstructor
public static class DiscardPolicy implements RejectedExecutionHandler { public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() {
}
@Override @Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
} }
} }
@NoArgsConstructor
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() {
}
@Override @Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) { if (!e.isShutdown()) {

Loading…
Cancel
Save