perf: 优化线程执行器

pull/9/head
Carina 4 years ago
parent cccb7e37dc
commit c77d41efc0

@ -1,104 +0,0 @@
package org.opsli.common.thread;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 线
*
* @author Parker
* @date 2020-10-08 10:24
*/
@Slf4j
public final class AsyncProcessCoordinator {
/**
* Task <br>
* Executor <br>
*/
public static class TaskWrapper implements Runnable {
private final Runnable gift;
private final CountDownLatch latch;
private final AtomicInteger count;
public TaskWrapper(final Runnable target) {
this.gift = target;
this.count = null;
this.latch = null;
}
public TaskWrapper(final Runnable target, final AsyncProcessExecutorByWait.AsyncWaitLock lock) {
if(lock == null){
this.gift = null;
this.count = null;
this.latch = null;
return;
}
this.gift = target;
this.count = lock.getCount();
this.latch = lock.getLatch();
}
@Override
public void run() {
// 捕获异常,避免在 Executor 里面被吞掉了
if (gift == null) {
return;
}
try {
// 执行任务
gift.run();
if(count != null){
// 标示已执行
count.decrementAndGet();
}
} catch (Exception e) {
String errMsg = StrUtil.format("线程池-包装的目标执行异常: {}", e.getMessage());
log.error(errMsg, e);
} finally {
if(latch != null){
latch.countDown();
}
}
}
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
/**
*
*/
private AsyncProcessCoordinator(){}
/**
*
*
* @param task
* @return boolean
*/
protected static boolean execute(final Runnable task) {
return AsyncProcessor.executeTask(new TaskWrapper(task));
}
/**
*
*
* @param task
* @return boolean
*/
protected static boolean execute(final Runnable task, final AsyncProcessExecutorByWait.AsyncWaitLock lock) {
boolean execute = AsyncProcessor.executeTask(new TaskWrapper(task, lock));
// 执行任务被拒绝 门闩减1 计数器不动 End
if(!execute){
lock.getLatch().countDown();
}
return execute;
}
}

@ -1,5 +1,7 @@
package org.opsli.common.thread; package org.opsli.common.thread;
import java.util.function.Function;
/** /**
* *
* *
@ -22,4 +24,11 @@ public interface AsyncProcessExecutor {
*/ */
boolean execute(); boolean execute();
/**
*
* @param callback
* @return boolean
*/
boolean executeErrorCallback(Function<Runnable, Void> callback);
} }

@ -16,10 +16,13 @@
package org.opsli.common.thread; package org.opsli.common.thread;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.function.Function;
/** /**
* 线 * 线
@ -30,11 +33,35 @@ import java.util.List;
@Slf4j @Slf4j
public class AsyncProcessExecutorByNormal implements AsyncProcessExecutor{ public class AsyncProcessExecutorByNormal implements AsyncProcessExecutor{
/** 线程池字典 */
private static final Map<String, AsyncProcessor> EXECUTOR_MAP = Maps.newConcurrentMap();
/** 线程Key */
private final String key;
/** 任务队列 */ /** 任务队列 */
private final List<Runnable> taskList; private final List<Runnable> taskList;
/** 执行器 */
private final AsyncProcessor processor;
/**
*
*/
public AsyncProcessExecutorByNormal(){ public AsyncProcessExecutorByNormal(){
this.key = "def";
taskList = new ArrayList<>();
processor = AsyncProcessExecutorByNormal.getProcessor(this.key);
}
/**
*
* @param key 线Key
*/
public AsyncProcessExecutorByNormal(String key){
this.key = key;
taskList = new ArrayList<>(); taskList = new ArrayList<>();
processor = AsyncProcessExecutorByNormal.getProcessor(this.key);
} }
/** /**
@ -60,11 +87,57 @@ public class AsyncProcessExecutorByNormal implements AsyncProcessExecutor{
for (Runnable task : this.taskList) { for (Runnable task : this.taskList) {
// 多线程执行任务 // 多线程执行任务
AsyncProcessCoordinator.execute(task); this.execute(task);
} }
// 返回执行结果 // 返回执行结果
return true; return true;
} }
@Override
public boolean executeErrorCallback(Function<Runnable, Void> callback) {
if(CollUtil.isEmpty(this.taskList)){
return true;
}
for (Runnable task : this.taskList) {
// 多线程执行任务
boolean execute = this.execute(task);
// 执行任务被拒绝 门闩减1 计数器不动 End
if(!execute){
// 线程池失败后 返回该 Runnable
callback.apply(task);
}
}
return false;
}
// ====================================
/**
*
*
* @param task
* @return boolean
*/
private boolean execute(final Runnable task) {
return processor.executeTask(task);
}
/**
*
* @param key Key
* @return AsyncProcessor
*/
private synchronized static AsyncProcessor getProcessor(String key){
AsyncProcessor asyncProcessor = EXECUTOR_MAP.get(key);
if(null == asyncProcessor){
asyncProcessor = new AsyncProcessor(key);
EXECUTOR_MAP.put(key, asyncProcessor);
}
return asyncProcessor;
}
} }

@ -16,31 +16,60 @@
package org.opsli.common.thread; package org.opsli.common.thread;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.google.common.collect.Maps;
import lombok.Getter; import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
/** /**
* 线 * 线
* 线线 * 线线
* *
* @author Parker * @author
* @date 2020-12-10 10:36 * @date 2020-12-10 10:36
*/ */
@Slf4j @Slf4j
public class AsyncProcessExecutorByWait implements AsyncProcessExecutor{ public class AsyncProcessExecutorByWait implements AsyncProcessExecutor {
/** 线程池字典 */
private static final Map<String, AsyncProcessor> EXECUTOR_MAP = Maps.newConcurrentMap();
/** 线程Key */
private final String key;
/** 任务队列 */ /** 任务队列 */
private final List<Runnable> taskList; private final List<Runnable> taskList;
/** 执行器 */
private final AsyncProcessor processor;
/**
*
*/
public AsyncProcessExecutorByWait(){ public AsyncProcessExecutorByWait(){
this.key = "def";
taskList = new ArrayList<>(); taskList = new ArrayList<>();
processor = AsyncProcessExecutorByWait.getProcessor(this.key);
} }
/**
*
* @param key 线Key
*/
public AsyncProcessExecutorByWait(String key){
this.key = key;
taskList = new ArrayList<>();
processor = AsyncProcessExecutorByWait.getProcessor(this.key);
}
/** /**
* *
* @param task * @param task
@ -53,8 +82,6 @@ public class AsyncProcessExecutorByWait implements AsyncProcessExecutor{
/** /**
* 线 * 线
*
* @return boolean
*/ */
@Override @Override
public boolean execute(){ public boolean execute(){
@ -64,9 +91,51 @@ public class AsyncProcessExecutorByWait implements AsyncProcessExecutor{
// 锁 // 锁
AsyncWaitLock lock = new AsyncWaitLock(this.taskList.size()); AsyncWaitLock lock = new AsyncWaitLock(this.taskList.size());
for (Runnable task : this.taskList) {
// 多线程执行任务
boolean execute = this.execute(task, lock);
// 执行任务被拒绝 门闩减1 计数器不动 End
if(!execute){
lock.getLatch().countDown();
}
}
// 线程锁 等待查询结果 结果完成后继续执行
try {
lock.getLatch().await();
}catch (Exception e){
log.error(e.getMessage(), e);
}finally {
this.taskList.clear();
}
// 返回执行结果
return lock.getCount().get() == 0;
}
/**
* 线
*/
@Override
public boolean executeErrorCallback(Function<Runnable, Void> callback){
if(CollUtil.isEmpty(this.taskList)){
return true;
}
// 锁
AsyncWaitLock lock = new AsyncWaitLock(this.taskList.size());
for (Runnable task : this.taskList) { for (Runnable task : this.taskList) {
// 多线程执行任务 // 多线程执行任务
AsyncProcessCoordinator.execute(task, lock); boolean execute = this.execute(task, lock);
// 执行任务被拒绝 门闩减1 计数器不动 End
if(!execute){
// 线程池失败后 返回该 Runnable
callback.apply(task);
lock.getLatch().countDown();
}
} }
// 线程锁 等待查询结果 结果完成后继续执行 // 线程锁 等待查询结果 结果完成后继续执行
@ -74,13 +143,40 @@ public class AsyncProcessExecutorByWait implements AsyncProcessExecutor{
lock.getLatch().await(); lock.getLatch().await();
}catch (Exception e){ }catch (Exception e){
log.error(e.getMessage(), e); log.error(e.getMessage(), e);
}finally {
this.taskList.clear();
} }
// 返回执行结果 // 返回执行结果
return lock.getCount().get() == 0; return lock.getCount().get() == 0;
} }
// ========================================
/**
*
*
* @param task
* @return boolean
*/
private boolean execute(final Runnable task, final AsyncWaitLock lock) {
return processor.executeTask(new TaskWrapper(task, lock));
}
/**
*
* @param key Key
* @return AsyncProcessor
*/
private synchronized static AsyncProcessor getProcessor(String key){
AsyncProcessor asyncProcessor = EXECUTOR_MAP.get(key);
if(null == asyncProcessor){
asyncProcessor = new AsyncProcessor(key);
EXECUTOR_MAP.put(key, asyncProcessor);
}
return asyncProcessor;
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
/** /**
* 线 * 线
@ -105,4 +201,51 @@ public class AsyncProcessExecutorByWait implements AsyncProcessExecutor{
} }
} }
/**
* Task <br>
* Executor <br>
*/
public static class TaskWrapper implements Runnable {
private final Runnable gift;
private final CountDownLatch latch;
private final AtomicInteger count;
public TaskWrapper(final Runnable target) {
this.gift = target;
this.count = null;
this.latch = null;
}
public TaskWrapper(final Runnable target, final AsyncWaitLock lock) {
if (lock == null) {
this.gift = null;
this.count = null;
this.latch = null;
return;
}
this.gift = target;
this.count = lock.getCount();
this.latch = lock.getLatch();
}
@Override
public void run() {
// 捕获异常,避免在 Executor 里面被吞掉了
if (gift != null) {
try {
gift.run();
// 标示已执行
count.decrementAndGet();
} catch (Exception e) {
String errMsg = StrUtil.format("线程池-包装的目标执行异常: {}", e.getMessage());
log.error(errMsg, e);
} finally {
latch.countDown();
}
}
}
}
} }

@ -16,6 +16,15 @@ public final class AsyncProcessExecutorFactory {
return new AsyncProcessExecutorByWait(); return new AsyncProcessExecutorByWait();
} }
/**
*
* @param key KEY
* @return AsyncProcessExecutor
*/
public static AsyncProcessExecutor createWaitExecutor(String key){
return new AsyncProcessExecutorByWait(key);
}
/** /**
* *
* @return AsyncProcessExecutor * @return AsyncProcessExecutor
@ -24,6 +33,15 @@ public final class AsyncProcessExecutorFactory {
return new AsyncProcessExecutorByNormal(); return new AsyncProcessExecutorByNormal();
} }
/**
*
* @param key KEY
* @return AsyncProcessExecutor
*/
public static AsyncProcessExecutor createNormalExecutor(String key){
return new AsyncProcessExecutorByNormal(key);
}
// ===================== // =====================
private AsyncProcessExecutorFactory(){} private AsyncProcessExecutorFactory(){}

@ -1,35 +1,30 @@
package org.opsli.common.thread; package org.opsli.common.thread;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import java.util.concurrent.*; import java.util.concurrent.*;
/** /**
* 线 * 线 - 线
* *
* @author Parker * @author
* @date 2020-10-08 10:24 * @date 2020-10-08 10:24
*/ */
@Slf4j @Slf4j
public final class AsyncProcessor { public class AsyncProcessor {
/** /**
* * <br>
*/ */
private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2; private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2;
/** /**
* 线 * 线
*/ */
private static final String THREAD_POOL_NAME = "ExternalConvertProcessPool-%d"; private static final String THREAD_POOL_NAME = "AsyncProcessPool-{}-%d";
/**
* 线
*/
private static final ThreadFactory FACTORY = new BasicThreadFactory.Builder()
.namingPattern(THREAD_POOL_NAME)
.daemon(true).build();
/** /**
* *
@ -37,7 +32,7 @@ public final class AsyncProcessor {
private static final int DEFAULT_SIZE = 1024; private static final int DEFAULT_SIZE = 1024;
/** /**
* 线 * 线
*/ */
private static final int DEFAULT_WAIT_TIME = 10; private static final int DEFAULT_WAIT_TIME = 10;
@ -47,66 +42,56 @@ public final class AsyncProcessor {
private static final long DEFAULT_KEEP_ALIVE = 60L; private static final long DEFAULT_KEEP_ALIVE = 60L;
/** /**
* Executor *
*/ */
private static final ExecutorService EXECUTOR; private ExecutorService execute;
/**
*
*/
private static final BlockingQueue<Runnable> EXECUTOR_QUEUE = new ArrayBlockingQueue<>(DEFAULT_SIZE);
static {
// 创建 Executor
// 此处默认最大值改为处理器数量的 4 倍
try {
EXECUTOR = new ThreadPoolExecutor(DEFAULT_MAX_CONCURRENT,
DEFAULT_MAX_CONCURRENT * 4, DEFAULT_KEEP_ALIVE,
TimeUnit.SECONDS, EXECUTOR_QUEUE, FACTORY);
// 主动关闭执行器
autoCloseProcess();
} catch (Exception e) {
log.error("AsyncProcessor 异步处理器初始化错误", e);
throw new ExceptionInInitializerError(e);
}
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
/**
*
*/
private AsyncProcessor() {}
/** /**
* *
*/ */
private static void autoCloseProcess() { public AsyncProcessor(String key){
if(AsyncProcessor.EXECUTOR == null){ if(StringUtils.isBlank(key)){
return; return;
} }
// 线程工厂名称
String formatThreadPoolName = StrUtil.format(THREAD_POOL_NAME, key);
BasicThreadFactory basicThreadFactory = new BasicThreadFactory.Builder()
.namingPattern(formatThreadPoolName)
.daemon(true).build();
// 创建 Executor
// 此处默认最大值改为处理器数量的 4 倍
try {
// 执行队列
BlockingQueue<Runnable> executorQueue = new ArrayBlockingQueue<>(DEFAULT_SIZE);
execute = new ThreadPoolExecutor(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT * 4, DEFAULT_KEEP_ALIVE,
TimeUnit.SECONDS, executorQueue, basicThreadFactory);
// 这里不会自动关闭线程, 当线程超过阈值时 抛异常 // 这里不会自动关闭线程, 当线程超过阈值时 抛异常
// 关闭事件的挂钩 // 关闭事件的挂钩
Runtime.getRuntime().addShutdownHook(new Thread(() -> { Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("AsyncProcessor 异步处理器关闭"); log.info("AsyncProcessorWait 异步处理器关闭");
execute.shutdown();
AsyncProcessor.EXECUTOR.shutdown();
try { try {
// 等待1秒执行关闭 // 等待1秒执行关闭
if (!AsyncProcessor.EXECUTOR.awaitTermination(AsyncProcessor.DEFAULT_WAIT_TIME, if (!execute.awaitTermination(DEFAULT_WAIT_TIME, TimeUnit.SECONDS)) {
TimeUnit.SECONDS)) { log.error("AsyncProcessorWait 由于等待超时,异步处理器立即关闭");
log.error("AsyncProcessor 由于等待超时,异步处理器立即关闭"); execute.shutdownNow();
AsyncProcessor.EXECUTOR.shutdownNow();
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.error("AsyncProcessor 异步处理器关闭中断"); log.error("AsyncProcessorWait 异步处理器关闭中断");
AsyncProcessor.EXECUTOR.shutdownNow(); execute.shutdownNow();
} }
log.info("AsyncProcessor 异步处理器关闭完成"); log.info("AsyncProcessorWait 异步处理器关闭完成");
})); }));
} catch (Exception e) {
log.error("AsyncProcessorWait 异步处理器初始化错误", e);
throw new ExceptionInInitializerError(e);
}
} }
@ -114,14 +99,14 @@ public final class AsyncProcessor {
* <br> * <br>
* {@link } * {@link }
* *
* @param task * @param task
* @return boolean * @return
*/ */
protected static boolean executeTask(Runnable task) { public boolean executeTask(Runnable task) {
try { try {
EXECUTOR.execute(task); execute.execute(task);
} catch (RejectedExecutionException e) { } catch (RejectedExecutionException e) {
log.error("AsyncProcessor 执行任务被拒绝", e); log.error("AsyncProcessorWait 执行任务被拒绝", e);
return false; return false;
} }
return true; return true;
@ -134,12 +119,12 @@ public final class AsyncProcessor {
* @param task * @param task
* @return Future<T> * @return Future<T>
*/ */
protected static <T> Future<T> submitTask(Callable<T> task) { public <T> Future<T> submitTask(Callable<T> task) {
try { try {
return EXECUTOR.submit(task); return execute.submit(task);
} catch (RejectedExecutionException e) { } catch (RejectedExecutionException e) {
log.error("AsyncProcessor 执行任务被拒绝", e); log.error("AsyncProcessorWait 执行任务被拒绝", e);
throw new UnsupportedOperationException("AsyncProcessor 无法提交任务,已被拒绝", e); throw new UnsupportedOperationException("AsyncProcessorWait 无法提交任务,已被拒绝", e);
} }
} }
} }

@ -0,0 +1,80 @@
package org.opsli.common.thread;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.StrUtil;
import com.google.common.collect.Maps;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.ExecutorService;
/**
* 线
*
* @author
* @date 2021/8/27 17:00
*/
@Slf4j
public final class SyncProcessSingleExecutor {
private static final Map<String, ExecutorService> EXECUTOR_MAP = Maps.newConcurrentMap();
private static final String KEY = "def";
/**
*
* @param r
*/
public static synchronized void execute(Runnable r){
execute(KEY, r);
}
/**
*
* @param key Key
* @param r
*/
public static synchronized void execute(String key, Runnable r){
if(null == r){
return;
}
ExecutorService executorService = EXECUTOR_MAP.get(key);
if(null == executorService){
executorService = ThreadUtil.newSingleExecutor();
EXECUTOR_MAP.put(key, executorService);
}
executorService.execute(new TaskWrapper(r));
}
/**
* Task <br>
* Executor <br>
*/
private static class TaskWrapper implements Runnable {
private final Runnable gift;
public TaskWrapper(final Runnable target) {
this.gift = target;
}
@Override
public void run() {
// 捕获异常,避免在 Executor 里面被吞掉了
if (gift != null) {
try {
gift.run();
} catch (Exception e) {
String errMsg = StrUtil.format("线程池-包装的目标执行异常: {}", e.getMessage());
log.error(errMsg, e);
}
}
}
}
private SyncProcessSingleExecutor(){}
}
Loading…
Cancel
Save