mirror of https://github.com/longtai-cn/hippo4j
commit
735e49f82b
@ -1,972 +0,0 @@
|
|||||||
package cn.hippo4j.starter.core;
|
|
||||||
|
|
||||||
import cn.hippo4j.starter.alarm.ThreadPoolAlarm;
|
|
||||||
import cn.hippo4j.starter.alarm.ThreadPoolAlarmManage;
|
|
||||||
import cn.hippo4j.starter.event.MonitorEventExecutor;
|
|
||||||
import lombok.NoArgsConstructor;
|
|
||||||
import lombok.NonNull;
|
|
||||||
import org.springframework.core.task.TaskDecorator;
|
|
||||||
|
|
||||||
import java.security.AccessControlContext;
|
|
||||||
import java.security.AccessController;
|
|
||||||
import java.security.PrivilegedAction;
|
|
||||||
import java.util.*;
|
|
||||||
import java.util.concurrent.*;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
|
|
||||||
import java.util.concurrent.locks.Condition;
|
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
|
||||||
|
|
||||||
import static cn.hippo4j.common.constant.Constants.MAP_INITIAL_CAPACITY;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Dynamic threadPool wrap.
|
|
||||||
*
|
|
||||||
* @author chen.ma
|
|
||||||
* @date 2021/7/8 21:47
|
|
||||||
*/
|
|
||||||
public class DynamicThreadPoolExecutor extends DynamicExecutorConfigurationSupport {
|
|
||||||
|
|
||||||
private final AtomicInteger rejectCount = new AtomicInteger();
|
|
||||||
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
|
|
||||||
|
|
||||||
private static final int COUNT_BITS = Integer.SIZE - 3;
|
|
||||||
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
|
|
||||||
|
|
||||||
private static final int RUNNING = -1 << COUNT_BITS;
|
|
||||||
private static final int SHUTDOWN = 0 << COUNT_BITS;
|
|
||||||
private static final int STOP = 1 << COUNT_BITS;
|
|
||||||
private static final int TIDYING = 2 << COUNT_BITS;
|
|
||||||
private static final int TERMINATED = 3 << COUNT_BITS;
|
|
||||||
|
|
||||||
private TaskDecorator taskDecorator;
|
|
||||||
private final BlockingQueue<Runnable> workQueue;
|
|
||||||
private final ReentrantLock mainLock = new ReentrantLock();
|
|
||||||
private final HashSet<Worker> workers = new HashSet();
|
|
||||||
private final Condition termination = mainLock.newCondition();
|
|
||||||
|
|
||||||
private int largestPoolSize;
|
|
||||||
private long completedTaskCount;
|
|
||||||
private volatile long keepAliveTime;
|
|
||||||
private volatile boolean allowCoreThreadTimeOut;
|
|
||||||
private volatile int corePoolSize;
|
|
||||||
private volatile int maximumPoolSize;
|
|
||||||
private String threadPoolId;
|
|
||||||
|
|
||||||
private final AccessControlContext acc;
|
|
||||||
private volatile ThreadPoolAlarm threadPoolAlarm;
|
|
||||||
private volatile ThreadFactory threadFactory;
|
|
||||||
private volatile RejectedExecutionHandler handler;
|
|
||||||
|
|
||||||
private static final RejectedExecutionHandler DEFAULT_HANDLER = new ThreadPoolExecutor.AbortPolicy();
|
|
||||||
private static final RuntimePermission SHUTDOWN_PERM = new RuntimePermission("modifyThread");
|
|
||||||
|
|
||||||
public DynamicThreadPoolExecutor(int corePoolSize,
|
|
||||||
int maximumPoolSize,
|
|
||||||
long keepAliveTime,
|
|
||||||
TimeUnit unit,
|
|
||||||
boolean waitForTasksToCompleteOnShutdown,
|
|
||||||
long awaitTerminationMillis,
|
|
||||||
@NonNull BlockingQueue<Runnable> workQueue,
|
|
||||||
@NonNull String threadPoolId,
|
|
||||||
@NonNull ThreadFactory threadFactory,
|
|
||||||
@NonNull ThreadPoolAlarm threadPoolAlarm,
|
|
||||||
@NonNull RejectedExecutionHandler handler) {
|
|
||||||
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, waitForTasksToCompleteOnShutdown, awaitTerminationMillis, workQueue, threadPoolId, threadFactory, handler);
|
|
||||||
|
|
||||||
this.corePoolSize = corePoolSize;
|
|
||||||
this.maximumPoolSize = maximumPoolSize;
|
|
||||||
this.workQueue = workQueue;
|
|
||||||
this.threadPoolId = threadPoolId;
|
|
||||||
this.keepAliveTime = unit.toNanos(keepAliveTime);
|
|
||||||
this.threadFactory = threadFactory;
|
|
||||||
this.handler = handler;
|
|
||||||
this.threadPoolAlarm = threadPoolAlarm;
|
|
||||||
this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
|
|
||||||
}
|
|
||||||
|
|
||||||
private static int runStateOf(int c) {
|
|
||||||
return c & ~CAPACITY;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static int workerCountOf(int c) {
|
|
||||||
return c & CAPACITY;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static int ctlOf(int rs, int wc) {
|
|
||||||
return rs | wc;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean runStateLessThan(int c, int s) {
|
|
||||||
return c < s;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean runStateAtLeast(int c, int s) {
|
|
||||||
return c >= s;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static boolean isRunning(int c) {
|
|
||||||
return c < SHUTDOWN;
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean compareAndIncrementWorkerCount(int expect) {
|
|
||||||
return ctl.compareAndSet(expect, expect + 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean compareAndDecrementWorkerCount(int expect) {
|
|
||||||
return ctl.compareAndSet(expect, expect - 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void decrementWorkerCount() {
|
|
||||||
do {
|
|
||||||
} while (!compareAndDecrementWorkerCount(ctl.get()));
|
|
||||||
}
|
|
||||||
|
|
||||||
public Integer getRejectCount() {
|
|
||||||
return rejectCount.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
public ThreadPoolAlarm getThreadPoolAlarm() {
|
|
||||||
return this.threadPoolAlarm;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setThreadPoolAlarm(ThreadPoolAlarm threadPoolAlarm) {
|
|
||||||
this.threadPoolAlarm = threadPoolAlarm;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getThreadPoolId() {
|
|
||||||
return this.threadPoolId;
|
|
||||||
}
|
|
||||||
|
|
||||||
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
|
|
||||||
|
|
||||||
private static final long serialVersionUID = 6138294804551838833L;
|
|
||||||
|
|
||||||
final Thread thread;
|
|
||||||
Runnable firstTask;
|
|
||||||
volatile long completedTasks;
|
|
||||||
|
|
||||||
Worker(Runnable firstTask) {
|
|
||||||
setState(-1);
|
|
||||||
this.firstTask = firstTask;
|
|
||||||
this.thread = getThreadFactory().newThread(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
runWorker(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected boolean isHeldExclusively() {
|
|
||||||
return getState() != 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected boolean tryAcquire(int unused) {
|
|
||||||
if (compareAndSetState(0, 1)) {
|
|
||||||
setExclusiveOwnerThread(Thread.currentThread());
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected boolean tryRelease(int unused) {
|
|
||||||
setExclusiveOwnerThread(null);
|
|
||||||
setState(0);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void lock() {
|
|
||||||
acquire(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean tryLock() {
|
|
||||||
return tryAcquire(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void unlock() {
|
|
||||||
release(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isLocked() {
|
|
||||||
return isHeldExclusively();
|
|
||||||
}
|
|
||||||
|
|
||||||
void interruptIfStarted() {
|
|
||||||
Thread t;
|
|
||||||
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
|
|
||||||
try {
|
|
||||||
t.interrupt();
|
|
||||||
} catch (SecurityException ignore) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void advanceRunState(int targetState) {
|
|
||||||
for (; ; ) {
|
|
||||||
int c = ctl.get();
|
|
||||||
if (runStateAtLeast(c, targetState) ||
|
|
||||||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
final void tryTerminate() {
|
|
||||||
for (; ; ) {
|
|
||||||
int c = ctl.get();
|
|
||||||
if (isRunning(c)
|
|
||||||
|| runStateAtLeast(c, TIDYING)
|
|
||||||
|| (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty())) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (workerCountOf(c) != 0) {
|
|
||||||
interruptIdleWorkers(ONLY_ONE);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
final ReentrantLock mainLock = this.mainLock;
|
|
||||||
mainLock.lock();
|
|
||||||
try {
|
|
||||||
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
|
|
||||||
try {
|
|
||||||
terminated();
|
|
||||||
} finally {
|
|
||||||
ctl.set(ctlOf(TERMINATED, 0));
|
|
||||||
termination.signalAll();
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
mainLock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void checkShutdownAccess() {
|
|
||||||
SecurityManager security = System.getSecurityManager();
|
|
||||||
if (security != null) {
|
|
||||||
security.checkPermission(SHUTDOWN_PERM);
|
|
||||||
final ReentrantLock mainLock = this.mainLock;
|
|
||||||
mainLock.lock();
|
|
||||||
try {
|
|
||||||
for (Worker w : workers) {
|
|
||||||
security.checkAccess(w.thread);
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
mainLock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void interruptWorkers() {
|
|
||||||
final ReentrantLock mainLock = this.mainLock;
|
|
||||||
mainLock.lock();
|
|
||||||
try {
|
|
||||||
for (Worker w : workers) {
|
|
||||||
w.interruptIfStarted();
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
mainLock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void interruptIdleWorkers(boolean onlyOne) {
|
|
||||||
final ReentrantLock mainLock = this.mainLock;
|
|
||||||
mainLock.lock();
|
|
||||||
try {
|
|
||||||
for (Worker w : workers) {
|
|
||||||
Thread t = w.thread;
|
|
||||||
if (!t.isInterrupted() && w.tryLock()) {
|
|
||||||
try {
|
|
||||||
t.interrupt();
|
|
||||||
} catch (SecurityException ignore) {
|
|
||||||
} finally {
|
|
||||||
w.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (onlyOne) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
mainLock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void interruptIdleWorkers() {
|
|
||||||
interruptIdleWorkers(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final boolean ONLY_ONE = true;
|
|
||||||
|
|
||||||
final void reject(Runnable command) {
|
|
||||||
rejectCount.incrementAndGet();
|
|
||||||
MonitorEventExecutor.publishEvent(
|
|
||||||
() -> ThreadPoolAlarmManage.checkPoolRejectAlarm(this)
|
|
||||||
);
|
|
||||||
handler.rejectedExecution(command, this);
|
|
||||||
}
|
|
||||||
|
|
||||||
void onShutdown() {
|
|
||||||
}
|
|
||||||
|
|
||||||
final boolean isRunningOrShutdown(boolean shutdownOk) {
|
|
||||||
int rs = runStateOf(ctl.get());
|
|
||||||
return rs == RUNNING || (rs == SHUTDOWN && shutdownOk);
|
|
||||||
}
|
|
||||||
|
|
||||||
private List<Runnable> drainQueue() {
|
|
||||||
BlockingQueue<Runnable> q = workQueue;
|
|
||||||
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
|
|
||||||
q.drainTo(taskList);
|
|
||||||
if (!q.isEmpty()) {
|
|
||||||
for (Runnable r : q.toArray(new Runnable[0])) {
|
|
||||||
if (q.remove(r)) {
|
|
||||||
taskList.add(r);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return taskList;
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean addWorker(Runnable firstTask, boolean core) {
|
|
||||||
retry:
|
|
||||||
for (; ; ) {
|
|
||||||
int c = ctl.get();
|
|
||||||
int rs = runStateOf(c);
|
|
||||||
|
|
||||||
// Check if queue empty only if necessary.
|
|
||||||
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (; ; ) {
|
|
||||||
int wc = workerCountOf(c);
|
|
||||||
if (wc >= CAPACITY ||
|
|
||||||
wc >= (core ? corePoolSize : maximumPoolSize)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
if (compareAndIncrementWorkerCount(c)) {
|
|
||||||
break retry;
|
|
||||||
}
|
|
||||||
c = ctl.get();
|
|
||||||
if (runStateOf(c) != rs) {
|
|
||||||
continue retry;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
MonitorEventExecutor.publishEvent(
|
|
||||||
() -> ThreadPoolAlarmManage.checkPoolLivenessAlarm(core, this)
|
|
||||||
);
|
|
||||||
|
|
||||||
boolean workerStarted = false;
|
|
||||||
boolean workerAdded = false;
|
|
||||||
Worker w = null;
|
|
||||||
try {
|
|
||||||
w = new Worker(firstTask);
|
|
||||||
final Thread t = w.thread;
|
|
||||||
if (t != null) {
|
|
||||||
final ReentrantLock mainLock = this.mainLock;
|
|
||||||
mainLock.lock();
|
|
||||||
try {
|
|
||||||
int rs = runStateOf(ctl.get());
|
|
||||||
|
|
||||||
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
|
|
||||||
if (t.isAlive()) {
|
|
||||||
throw new IllegalThreadStateException();
|
|
||||||
}
|
|
||||||
workers.add(w);
|
|
||||||
int s = workers.size();
|
|
||||||
if (s > largestPoolSize) {
|
|
||||||
largestPoolSize = s;
|
|
||||||
}
|
|
||||||
workerAdded = true;
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
mainLock.unlock();
|
|
||||||
}
|
|
||||||
if (workerAdded) {
|
|
||||||
t.start();
|
|
||||||
workerStarted = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
if (!workerStarted) {
|
|
||||||
addWorkerFailed(w);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return workerStarted;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void addWorkerFailed(Worker w) {
|
|
||||||
final ReentrantLock mainLock = this.mainLock;
|
|
||||||
mainLock.lock();
|
|
||||||
try {
|
|
||||||
if (w != null) {
|
|
||||||
workers.remove(w);
|
|
||||||
}
|
|
||||||
decrementWorkerCount();
|
|
||||||
tryTerminate();
|
|
||||||
} finally {
|
|
||||||
mainLock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void processWorkerExit(Worker w, boolean completedAbruptly) {
|
|
||||||
if (completedAbruptly) {
|
|
||||||
decrementWorkerCount();
|
|
||||||
}
|
|
||||||
|
|
||||||
final ReentrantLock mainLock = this.mainLock;
|
|
||||||
mainLock.lock();
|
|
||||||
try {
|
|
||||||
completedTaskCount += w.completedTasks;
|
|
||||||
workers.remove(w);
|
|
||||||
} finally {
|
|
||||||
mainLock.unlock();
|
|
||||||
}
|
|
||||||
|
|
||||||
tryTerminate();
|
|
||||||
|
|
||||||
int c = ctl.get();
|
|
||||||
if (runStateLessThan(c, STOP)) {
|
|
||||||
if (!completedAbruptly) {
|
|
||||||
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
|
|
||||||
if (min == 0 && !workQueue.isEmpty()) {
|
|
||||||
min = 1;
|
|
||||||
}
|
|
||||||
if (workerCountOf(c) >= min) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
addWorker(null, false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private Runnable getTask() {
|
|
||||||
boolean timedOut = false;
|
|
||||||
|
|
||||||
for (; ; ) {
|
|
||||||
int c = ctl.get();
|
|
||||||
int rs = runStateOf(c);
|
|
||||||
|
|
||||||
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
|
|
||||||
decrementWorkerCount();
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
int wc = workerCountOf(c);
|
|
||||||
|
|
||||||
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
|
|
||||||
|
|
||||||
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
|
|
||||||
if (compareAndDecrementWorkerCount(c)) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
Runnable r = timed ?
|
|
||||||
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
|
|
||||||
workQueue.take();
|
|
||||||
if (r != null) {
|
|
||||||
return r;
|
|
||||||
}
|
|
||||||
timedOut = true;
|
|
||||||
} catch (InterruptedException retry) {
|
|
||||||
timedOut = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
final void runWorker(Worker w) {
|
|
||||||
Thread wt = Thread.currentThread();
|
|
||||||
Runnable task = w.firstTask;
|
|
||||||
w.firstTask = null;
|
|
||||||
w.unlock();
|
|
||||||
boolean completedAbruptly = true;
|
|
||||||
try {
|
|
||||||
while (task != null || (task = getTask()) != null) {
|
|
||||||
w.lock();
|
|
||||||
if ((runStateAtLeast(ctl.get(), STOP)
|
|
||||||
|| (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
|
|
||||||
&& !wt.isInterrupted()) {
|
|
||||||
wt.interrupt();
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
beforeExecute(wt, task);
|
|
||||||
Throwable thrown = null;
|
|
||||||
try {
|
|
||||||
task.run();
|
|
||||||
} catch (RuntimeException x) {
|
|
||||||
thrown = x;
|
|
||||||
throw x;
|
|
||||||
} catch (Error x) {
|
|
||||||
thrown = x;
|
|
||||||
throw x;
|
|
||||||
} catch (Throwable x) {
|
|
||||||
thrown = x;
|
|
||||||
throw new Error(x);
|
|
||||||
} finally {
|
|
||||||
afterExecute(task, thrown);
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
task = null;
|
|
||||||
w.completedTasks++;
|
|
||||||
w.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
completedAbruptly = false;
|
|
||||||
} finally {
|
|
||||||
processWorkerExit(w, completedAbruptly);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void execute(@NonNull Runnable command) {
|
|
||||||
if (taskDecorator != null) {
|
|
||||||
command = taskDecorator.decorate(command);
|
|
||||||
}
|
|
||||||
int c = ctl.get();
|
|
||||||
if (workerCountOf(c) < corePoolSize) {
|
|
||||||
if (addWorker(command, true)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
c = ctl.get();
|
|
||||||
}
|
|
||||||
if (isRunning(c) && workQueue.offer(command)) {
|
|
||||||
MonitorEventExecutor.publishEvent(
|
|
||||||
() -> ThreadPoolAlarmManage.checkPoolCapacityAlarm(this)
|
|
||||||
);
|
|
||||||
int recheck = ctl.get();
|
|
||||||
if (!isRunning(recheck) && remove(command)) {
|
|
||||||
reject(command);
|
|
||||||
} else if (workerCountOf(recheck) == 0) {
|
|
||||||
addWorker(null, false);
|
|
||||||
}
|
|
||||||
} else if (!addWorker(command, false)) {
|
|
||||||
reject(command);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected ExecutorService initializeExecutor() {
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void shutdown() {
|
|
||||||
final ReentrantLock mainLock = this.mainLock;
|
|
||||||
mainLock.lock();
|
|
||||||
try {
|
|
||||||
checkShutdownAccess();
|
|
||||||
advanceRunState(SHUTDOWN);
|
|
||||||
interruptIdleWorkers();
|
|
||||||
onShutdown();
|
|
||||||
} finally {
|
|
||||||
mainLock.unlock();
|
|
||||||
}
|
|
||||||
tryTerminate();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<Runnable> shutdownNow() {
|
|
||||||
List<Runnable> tasks;
|
|
||||||
final ReentrantLock mainLock = this.mainLock;
|
|
||||||
mainLock.lock();
|
|
||||||
try {
|
|
||||||
checkShutdownAccess();
|
|
||||||
advanceRunState(STOP);
|
|
||||||
interruptWorkers();
|
|
||||||
tasks = drainQueue();
|
|
||||||
} finally {
|
|
||||||
mainLock.unlock();
|
|
||||||
}
|
|
||||||
tryTerminate();
|
|
||||||
return tasks;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isShutdown() {
|
|
||||||
return !isRunning(ctl.get());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isTerminating() {
|
|
||||||
int c = ctl.get();
|
|
||||||
return !isRunning(c) && runStateLessThan(c, TERMINATED);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isTerminated() {
|
|
||||||
return runStateAtLeast(ctl.get(), TERMINATED);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean awaitTermination(long timeout, TimeUnit unit)
|
|
||||||
throws InterruptedException {
|
|
||||||
long nanos = unit.toNanos(timeout);
|
|
||||||
final ReentrantLock mainLock = this.mainLock;
|
|
||||||
mainLock.lock();
|
|
||||||
try {
|
|
||||||
for (; ; ) {
|
|
||||||
if (runStateAtLeast(ctl.get(), TERMINATED)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if (nanos <= 0) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
nanos = termination.awaitNanos(nanos);
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
mainLock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void finalize() {
|
|
||||||
SecurityManager sm = System.getSecurityManager();
|
|
||||||
if (sm == null || acc == null) {
|
|
||||||
shutdown();
|
|
||||||
} else {
|
|
||||||
PrivilegedAction<Void> pa = () -> {
|
|
||||||
shutdown();
|
|
||||||
return null;
|
|
||||||
};
|
|
||||||
AccessController.doPrivileged(pa, acc);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setThreadFactory(@NonNull ThreadFactory threadFactory) {
|
|
||||||
this.threadFactory = threadFactory;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ThreadFactory getThreadFactory() {
|
|
||||||
return threadFactory;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setRejectedExecutionHandler(@NonNull RejectedExecutionHandler handler) {
|
|
||||||
this.handler = handler;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public RejectedExecutionHandler getRejectedExecutionHandler() {
|
|
||||||
return handler;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setCorePoolSize(int corePoolSize) {
|
|
||||||
if (corePoolSize < 0) {
|
|
||||||
throw new IllegalArgumentException();
|
|
||||||
}
|
|
||||||
int delta = corePoolSize - this.corePoolSize;
|
|
||||||
this.corePoolSize = corePoolSize;
|
|
||||||
if (workerCountOf(ctl.get()) > corePoolSize) {
|
|
||||||
interruptIdleWorkers();
|
|
||||||
} else if (delta > 0) {
|
|
||||||
int k = Math.min(delta, workQueue.size());
|
|
||||||
while (k-- > 0 && addWorker(null, true)) {
|
|
||||||
if (workQueue.isEmpty()) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getCorePoolSize() {
|
|
||||||
return corePoolSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean prestartCoreThread() {
|
|
||||||
return workerCountOf(ctl.get()) < corePoolSize &&
|
|
||||||
addWorker(null, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
public TaskDecorator getTaskDecorator() {
|
|
||||||
return taskDecorator;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setTaskDecorator(TaskDecorator taskDecorator) {
|
|
||||||
this.taskDecorator = taskDecorator;
|
|
||||||
}
|
|
||||||
|
|
||||||
void ensurePrestart() {
|
|
||||||
int wc = workerCountOf(ctl.get());
|
|
||||||
if (wc < corePoolSize) {
|
|
||||||
addWorker(null, true);
|
|
||||||
} else if (wc == 0) {
|
|
||||||
addWorker(null, false);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int prestartAllCoreThreads() {
|
|
||||||
int n = 0;
|
|
||||||
while (addWorker(null, true)) {
|
|
||||||
++n;
|
|
||||||
}
|
|
||||||
return n;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean allowsCoreThreadTimeOut() {
|
|
||||||
return allowCoreThreadTimeOut;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void allowCoreThreadTimeOut(boolean value) {
|
|
||||||
if (value && keepAliveTime <= 0) {
|
|
||||||
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
|
|
||||||
}
|
|
||||||
if (value != allowCoreThreadTimeOut) {
|
|
||||||
allowCoreThreadTimeOut = value;
|
|
||||||
if (value) {
|
|
||||||
interruptIdleWorkers();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setMaximumPoolSize(int maximumPoolSize) {
|
|
||||||
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) {
|
|
||||||
throw new IllegalArgumentException();
|
|
||||||
}
|
|
||||||
this.maximumPoolSize = maximumPoolSize;
|
|
||||||
if (workerCountOf(ctl.get()) > maximumPoolSize) {
|
|
||||||
interruptIdleWorkers();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getMaximumPoolSize() {
|
|
||||||
return maximumPoolSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setKeepAliveTime(long time, TimeUnit unit) {
|
|
||||||
if (time < 0) {
|
|
||||||
throw new IllegalArgumentException();
|
|
||||||
}
|
|
||||||
if (time == 0 && allowsCoreThreadTimeOut()) {
|
|
||||||
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
|
|
||||||
}
|
|
||||||
long keepAliveTime = unit.toNanos(time);
|
|
||||||
long delta = keepAliveTime - this.keepAliveTime;
|
|
||||||
this.keepAliveTime = keepAliveTime;
|
|
||||||
if (delta < 0) {
|
|
||||||
interruptIdleWorkers();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getKeepAliveTime(TimeUnit unit) {
|
|
||||||
return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public BlockingQueue<Runnable> getQueue() {
|
|
||||||
return workQueue;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean remove(Runnable task) {
|
|
||||||
boolean removed = workQueue.remove(task);
|
|
||||||
tryTerminate();
|
|
||||||
return removed;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void purge() {
|
|
||||||
final BlockingQueue<Runnable> q = workQueue;
|
|
||||||
try {
|
|
||||||
Iterator<Runnable> it = q.iterator();
|
|
||||||
while (it.hasNext()) {
|
|
||||||
Runnable r = it.next();
|
|
||||||
if (r instanceof Future<?> && ((Future<?>) r).isCancelled()) {
|
|
||||||
it.remove();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (ConcurrentModificationException fallThrough) {
|
|
||||||
for (Object r : q.toArray()) {
|
|
||||||
if (r instanceof Future<?> && ((Future<?>) r).isCancelled()) {
|
|
||||||
q.remove(r);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tryTerminate();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getPoolSize() {
|
|
||||||
final ReentrantLock mainLock = this.mainLock;
|
|
||||||
mainLock.lock();
|
|
||||||
try {
|
|
||||||
return runStateAtLeast(ctl.get(), TIDYING) ? 0 : workers.size();
|
|
||||||
} finally {
|
|
||||||
mainLock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getActiveCount() {
|
|
||||||
final ReentrantLock mainLock = this.mainLock;
|
|
||||||
mainLock.lock();
|
|
||||||
try {
|
|
||||||
int n = 0;
|
|
||||||
for (Worker w : workers) {
|
|
||||||
if (w.isLocked()) {
|
|
||||||
++n;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return n;
|
|
||||||
} finally {
|
|
||||||
mainLock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getLargestPoolSize() {
|
|
||||||
final ReentrantLock mainLock = this.mainLock;
|
|
||||||
mainLock.lock();
|
|
||||||
try {
|
|
||||||
return largestPoolSize;
|
|
||||||
} finally {
|
|
||||||
mainLock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getTaskCount() {
|
|
||||||
final ReentrantLock mainLock = this.mainLock;
|
|
||||||
mainLock.lock();
|
|
||||||
try {
|
|
||||||
long n = completedTaskCount;
|
|
||||||
for (Worker w : workers) {
|
|
||||||
n += w.completedTasks;
|
|
||||||
if (w.isLocked()) {
|
|
||||||
++n;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return n + workQueue.size();
|
|
||||||
} finally {
|
|
||||||
mainLock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long getCompletedTaskCount() {
|
|
||||||
final ReentrantLock mainLock = this.mainLock;
|
|
||||||
mainLock.lock();
|
|
||||||
try {
|
|
||||||
long n = completedTaskCount;
|
|
||||||
for (Worker w : workers) {
|
|
||||||
n += w.completedTasks;
|
|
||||||
}
|
|
||||||
return n;
|
|
||||||
} finally {
|
|
||||||
mainLock.unlock();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
long ncompleted;
|
|
||||||
int nworkers, nactive;
|
|
||||||
final ReentrantLock mainLock = this.mainLock;
|
|
||||||
mainLock.lock();
|
|
||||||
try {
|
|
||||||
ncompleted = completedTaskCount;
|
|
||||||
nactive = 0;
|
|
||||||
nworkers = workers.size();
|
|
||||||
for (Worker w : workers) {
|
|
||||||
ncompleted += w.completedTasks;
|
|
||||||
if (w.isLocked()) {
|
|
||||||
++nactive;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
mainLock.unlock();
|
|
||||||
}
|
|
||||||
int c = ctl.get();
|
|
||||||
String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" :
|
|
||||||
(runStateAtLeast(c, TERMINATED) ? "Terminated" :
|
|
||||||
"Shutting down"));
|
|
||||||
return super.toString() +
|
|
||||||
"[" + rs +
|
|
||||||
", pool size = " + nworkers +
|
|
||||||
", active threads = " + nactive +
|
|
||||||
", queued tasks = " + workQueue.size() +
|
|
||||||
", completed tasks = " + ncompleted +
|
|
||||||
"]";
|
|
||||||
}
|
|
||||||
|
|
||||||
private ConcurrentHashMap<String, Date> statisticsTime = new ConcurrentHashMap(MAP_INITIAL_CAPACITY);
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void beforeExecute(Thread t, Runnable r) {
|
|
||||||
statisticsTime.put(String.valueOf(r.hashCode()), new Date());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void afterExecute(Runnable r, Throwable t) {
|
|
||||||
Date startDate = statisticsTime.remove(String.valueOf(r.hashCode()));
|
|
||||||
Date finishDate = new Date();
|
|
||||||
// long diff = finishDate.getTime() - startDate.getTime();
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
protected void terminated() {
|
|
||||||
}
|
|
||||||
|
|
||||||
@NoArgsConstructor
|
|
||||||
public static class CallerRunsPolicy implements RejectedExecutionHandler {
|
|
||||||
@Override
|
|
||||||
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
|
|
||||||
if (!e.isShutdown()) {
|
|
||||||
r.run();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@NoArgsConstructor
|
|
||||||
public static class AbortPolicy implements RejectedExecutionHandler {
|
|
||||||
@Override
|
|
||||||
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
|
|
||||||
throw new RejectedExecutionException("Task " + r.toString() +
|
|
||||||
" rejected from " +
|
|
||||||
e.toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@NoArgsConstructor
|
|
||||||
public static class DiscardPolicy implements RejectedExecutionHandler {
|
|
||||||
@Override
|
|
||||||
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@NoArgsConstructor
|
|
||||||
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
|
|
||||||
@Override
|
|
||||||
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
|
|
||||||
if (!e.isShutdown()) {
|
|
||||||
e.getQueue().poll();
|
|
||||||
e.execute(r);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
@ -0,0 +1,33 @@
|
|||||||
|
HELP.md
|
||||||
|
target/
|
||||||
|
!.mvn/wrapper/maven-wrapper.jar
|
||||||
|
!**/src/main/**/target/
|
||||||
|
!**/src/test/**/target/
|
||||||
|
|
||||||
|
### STS ###
|
||||||
|
.apt_generated
|
||||||
|
.classpath
|
||||||
|
.factorypath
|
||||||
|
.project
|
||||||
|
.settings
|
||||||
|
.springBeans
|
||||||
|
.sts4-cache
|
||||||
|
|
||||||
|
### IntelliJ IDEA ###
|
||||||
|
.idea
|
||||||
|
*.iws
|
||||||
|
*.iml
|
||||||
|
*.ipr
|
||||||
|
|
||||||
|
### NetBeans ###
|
||||||
|
/nbproject/private/
|
||||||
|
/nbbuild/
|
||||||
|
/dist/
|
||||||
|
/nbdist/
|
||||||
|
/.nb-gradle/
|
||||||
|
build/
|
||||||
|
!**/src/main/**/build/
|
||||||
|
!**/src/test/**/build/
|
||||||
|
|
||||||
|
### VS Code ###
|
||||||
|
.vscode/
|
@ -0,0 +1,73 @@
|
|||||||
|
package cn.hippo4j.starter.core;
|
||||||
|
|
||||||
|
import lombok.Getter;
|
||||||
|
import lombok.NonNull;
|
||||||
|
import lombok.Setter;
|
||||||
|
import org.springframework.core.task.TaskDecorator;
|
||||||
|
|
||||||
|
import java.lang.reflect.Proxy;
|
||||||
|
import java.util.concurrent.*;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dynamic threadPool wrap.
|
||||||
|
*
|
||||||
|
* @author chen.ma
|
||||||
|
* @date 2021/7/8 21:47
|
||||||
|
*/
|
||||||
|
public class DynamicThreadPoolExecutor extends DynamicExecutorConfigurationSupport {
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
@Setter
|
||||||
|
private TaskDecorator taskDecorator;
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
private final String threadPoolId;
|
||||||
|
|
||||||
|
private final AtomicInteger rejectCount = new AtomicInteger();
|
||||||
|
|
||||||
|
public DynamicThreadPoolExecutor(int corePoolSize,
|
||||||
|
int maximumPoolSize,
|
||||||
|
long keepAliveTime,
|
||||||
|
TimeUnit unit,
|
||||||
|
boolean waitForTasksToCompleteOnShutdown,
|
||||||
|
long awaitTerminationMillis,
|
||||||
|
@NonNull BlockingQueue<Runnable> workQueue,
|
||||||
|
@NonNull String threadPoolId,
|
||||||
|
@NonNull ThreadFactory threadFactory,
|
||||||
|
@NonNull RejectedExecutionHandler handler) {
|
||||||
|
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, waitForTasksToCompleteOnShutdown, awaitTerminationMillis, workQueue, threadPoolId, threadFactory, handler);
|
||||||
|
this.threadPoolId = threadPoolId;
|
||||||
|
|
||||||
|
RejectedExecutionHandler rejectedProxy = (RejectedExecutionHandler) Proxy
|
||||||
|
.newProxyInstance(
|
||||||
|
handler.getClass().getClassLoader(),
|
||||||
|
new Class[]{RejectedExecutionHandler.class},
|
||||||
|
new RejectedProxyInvocationHandler(handler, rejectCount));
|
||||||
|
setRejectedExecutionHandler(rejectedProxy);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void execute(@NonNull Runnable command) {
|
||||||
|
if (taskDecorator != null) {
|
||||||
|
command = taskDecorator.decorate(command);
|
||||||
|
}
|
||||||
|
|
||||||
|
super.execute(command);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ExecutorService initializeExecutor() {
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get reject count.
|
||||||
|
*
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public Integer getRejectCount() {
|
||||||
|
return rejectCount.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,33 @@
|
|||||||
|
package cn.hippo4j.starter.core;
|
||||||
|
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
|
||||||
|
import java.lang.reflect.InvocationHandler;
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Rejected proxy invocation handler.
|
||||||
|
*
|
||||||
|
* @author chen.ma
|
||||||
|
* @date 2022/2/17 19:45
|
||||||
|
*/
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class RejectedProxyInvocationHandler implements InvocationHandler {
|
||||||
|
|
||||||
|
private final Object target;
|
||||||
|
|
||||||
|
private final AtomicInteger rejectCount;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
|
||||||
|
rejectCount.incrementAndGet();
|
||||||
|
try {
|
||||||
|
return method.invoke(target, args);
|
||||||
|
} catch (InvocationTargetException ex) {
|
||||||
|
throw ex.getCause();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue