Feature: 线程池报警动作通过发布事件的方式异步化.

pull/10/head
chen.ma 3 years ago
parent 4a8d9ff6b0
commit 1730efafca

@ -2,6 +2,7 @@ package com.github.dynamic.threadpool.starter.core;
import com.github.dynamic.threadpool.starter.alarm.ThreadPoolAlarm; import com.github.dynamic.threadpool.starter.alarm.ThreadPoolAlarm;
import com.github.dynamic.threadpool.starter.alarm.ThreadPoolAlarmManage; import com.github.dynamic.threadpool.starter.alarm.ThreadPoolAlarmManage;
import com.github.dynamic.threadpool.starter.event.EventExecutor;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import lombok.NonNull; import lombok.NonNull;
@ -23,7 +24,7 @@ import static com.github.dynamic.threadpool.common.constant.Constants.MAP_INITIA
* @author chen.ma * @author chen.ma
* @date 2021/7/8 21:47 * @date 2021/7/8 21:47
*/ */
public final class DynamicThreadPoolExecutor extends ThreadPoolExecutor { public class DynamicThreadPoolExecutor extends ThreadPoolExecutor {
private final AtomicInteger rejectCount = new AtomicInteger(); private final AtomicInteger rejectCount = new AtomicInteger();
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
@ -140,9 +141,7 @@ public final class DynamicThreadPoolExecutor extends ThreadPoolExecutor {
return this.threadPoolId; return this.threadPoolId;
} }
private final class Worker private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
extends AbstractQueuedSynchronizer
implements Runnable {
private static final long serialVersionUID = 6138294804551838833L; private static final long serialVersionUID = 6138294804551838833L;
@ -309,7 +308,9 @@ public final class DynamicThreadPoolExecutor extends ThreadPoolExecutor {
final void reject(Runnable command) { final void reject(Runnable command) {
rejectCount.incrementAndGet(); rejectCount.incrementAndGet();
ThreadPoolAlarmManage.checkPoolRejectAlarm(this); EventExecutor.publishEvent(
() -> ThreadPoolAlarmManage.checkPoolRejectAlarm(this)
);
handler.rejectedExecution(command, this); handler.rejectedExecution(command, this);
} }
@ -362,7 +363,9 @@ public final class DynamicThreadPoolExecutor extends ThreadPoolExecutor {
} }
} }
ThreadPoolAlarmManage.checkPoolLivenessAlarm(core, this); EventExecutor.publishEvent(
() -> ThreadPoolAlarmManage.checkPoolLivenessAlarm(core, this)
);
boolean workerStarted = false; boolean workerStarted = false;
boolean workerAdded = false; boolean workerAdded = false;
@ -538,7 +541,9 @@ public final class DynamicThreadPoolExecutor extends ThreadPoolExecutor {
c = ctl.get(); c = ctl.get();
} }
if (isRunning(c) && workQueue.offer(command)) { if (isRunning(c) && workQueue.offer(command)) {
ThreadPoolAlarmManage.checkPoolCapacityAlarm(this); EventExecutor.publishEvent(
() -> ThreadPoolAlarmManage.checkPoolCapacityAlarm(this)
);
int recheck = ctl.get(); int recheck = ctl.get();
if (!isRunning(recheck) && remove(command)) { if (!isRunning(recheck) && remove(command)) {
reject(command); reject(command);

@ -0,0 +1,35 @@
package com.github.dynamic.threadpool.starter.event;
import com.github.dynamic.threadpool.common.function.NoArgsConsumer;
import com.github.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum;
import com.github.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
/**
* .
*
* @author chen.ma
* @date 2021/11/8 23:44
*/
public class EventExecutor {
private static final ExecutorService EVENT_EXECUTOR = ThreadPoolBuilder.builder()
.threadFactory("event-executor")
.corePoolSize(Runtime.getRuntime().availableProcessors())
.maxPoolNum(Runtime.getRuntime().availableProcessors())
.workQueue(QueueTypeEnum.ARRAY_BLOCKING_QUEUE, 2048)
.rejected(new ThreadPoolExecutor.DiscardPolicy())
.build();
/**
* .
*
* @param consumer
*/
public static void publishEvent(NoArgsConsumer consumer) {
EVENT_EXECUTOR.execute(consumer::accept);
}
}
Loading…
Cancel
Save