diff --git a/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/AsyncProcessCoordinator.java b/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/AsyncProcessCoordinator.java deleted file mode 100644 index 6eb48ad8..00000000 --- a/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/AsyncProcessCoordinator.java +++ /dev/null @@ -1,104 +0,0 @@ -package org.opsli.common.thread; - - -import cn.hutool.core.util.StrUtil; -import lombok.extern.slf4j.Slf4j; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * 线程池执行器调用者 - * - * @author Parker - * @date 2020-10-08 10:24 - */ -@Slf4j -public final class AsyncProcessCoordinator { - - /** - * Task 包装类
- * 此类型的意义是记录可能会被 Executor 吃掉的异常
- */ - public static class TaskWrapper implements Runnable { - - private final Runnable gift; - private final CountDownLatch latch; - private final AtomicInteger count; - - public TaskWrapper(final Runnable target) { - this.gift = target; - this.count = null; - this.latch = null; - } - - public TaskWrapper(final Runnable target, final AsyncProcessExecutorByWait.AsyncWaitLock lock) { - if(lock == null){ - this.gift = null; - this.count = null; - this.latch = null; - return; - } - - this.gift = target; - this.count = lock.getCount(); - this.latch = lock.getLatch(); - } - - @Override - public void run() { - // 捕获异常,避免在 Executor 里面被吞掉了 - if (gift == null) { - return; - } - - try { - // 执行任务 - gift.run(); - - if(count != null){ - // 标示已执行 - count.decrementAndGet(); - } - } catch (Exception e) { - String errMsg = StrUtil.format("线程池-包装的目标执行异常: {}", e.getMessage()); - log.error(errMsg, e); - } finally { - if(latch != null){ - latch.countDown(); - } - } - } - } - - // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - - /** - * 此类型无法实例化 - */ - private AsyncProcessCoordinator(){} - - /** - * 执行指定的任务 - * - * @param task 任务 - * @return boolean - */ - protected static boolean execute(final Runnable task) { - return AsyncProcessor.executeTask(new TaskWrapper(task)); - } - - /** - * 执行指定的任务 - * - * @param task 任务 - * @return boolean - */ - protected static boolean execute(final Runnable task, final AsyncProcessExecutorByWait.AsyncWaitLock lock) { - boolean execute = AsyncProcessor.executeTask(new TaskWrapper(task, lock)); - // 执行任务被拒绝 门闩减1 计数器不动 End - if(!execute){ - lock.getLatch().countDown(); - } - return execute; - } -} diff --git a/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/AsyncProcessExecutor.java b/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/AsyncProcessExecutor.java index f5ff9618..87520b06 100644 --- a/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/AsyncProcessExecutor.java +++ b/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/AsyncProcessExecutor.java @@ -1,5 +1,7 @@ package org.opsli.common.thread; +import java.util.function.Function; + /** * 异步进程 执行器 * @@ -22,4 +24,11 @@ public interface AsyncProcessExecutor { */ boolean execute(); + /** + * 执行异常回调 + * @param callback 回调 + * @return boolean + */ + boolean executeErrorCallback(Function callback); + } diff --git a/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/AsyncProcessExecutorByNormal.java b/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/AsyncProcessExecutorByNormal.java index a31b5a7a..d14d7648 100644 --- a/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/AsyncProcessExecutorByNormal.java +++ b/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/AsyncProcessExecutorByNormal.java @@ -16,10 +16,13 @@ package org.opsli.common.thread; import cn.hutool.core.collection.CollUtil; +import com.google.common.collect.Maps; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.function.Function; /** * 多线程锁执行器 正常处理 @@ -30,11 +33,35 @@ import java.util.List; @Slf4j public class AsyncProcessExecutorByNormal implements AsyncProcessExecutor{ + /** 线程池字典 */ + private static final Map EXECUTOR_MAP = Maps.newConcurrentMap(); + + /** 线程Key */ + private final String key; + /** 任务队列 */ private final List taskList; + /** 执行器 */ + private final AsyncProcessor processor; + + /** + * 构造函数 + */ public AsyncProcessExecutorByNormal(){ + this.key = "def"; taskList = new ArrayList<>(); + processor = AsyncProcessExecutorByNormal.getProcessor(this.key); + } + + /** + * 构造函数 + * @param key 线程池唯一Key + */ + public AsyncProcessExecutorByNormal(String key){ + this.key = key; + taskList = new ArrayList<>(); + processor = AsyncProcessExecutorByNormal.getProcessor(this.key); } /** @@ -60,11 +87,57 @@ public class AsyncProcessExecutorByNormal implements AsyncProcessExecutor{ for (Runnable task : this.taskList) { // 多线程执行任务 - AsyncProcessCoordinator.execute(task); + this.execute(task); } // 返回执行结果 return true; } + @Override + public boolean executeErrorCallback(Function callback) { + + if(CollUtil.isEmpty(this.taskList)){ + return true; + } + + for (Runnable task : this.taskList) { + // 多线程执行任务 + boolean execute = this.execute(task); + // 执行任务被拒绝 门闩减1 计数器不动 End + if(!execute){ + // 线程池失败后 返回该 Runnable + callback.apply(task); + } + } + + return false; + } + + // ==================================== + + /** + * 执行指定的任务 + * + * @param task 任务 + * @return boolean + */ + private boolean execute(final Runnable task) { + return processor.executeTask(task); + } + + /** + * 获得执行器 + * @param key Key + * @return AsyncProcessor + */ + private synchronized static AsyncProcessor getProcessor(String key){ + AsyncProcessor asyncProcessor = EXECUTOR_MAP.get(key); + if(null == asyncProcessor){ + asyncProcessor = new AsyncProcessor(key); + EXECUTOR_MAP.put(key, asyncProcessor); + } + return asyncProcessor; + } + } diff --git a/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/AsyncProcessExecutorByWait.java b/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/AsyncProcessExecutorByWait.java index e4e62db0..e8528713 100644 --- a/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/AsyncProcessExecutorByWait.java +++ b/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/AsyncProcessExecutorByWait.java @@ -16,31 +16,60 @@ package org.opsli.common.thread; import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.StrUtil; +import com.google.common.collect.Maps; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; /** * 多线程锁执行器 * 用于当前方法中复杂业务多线程处理,等待线程执行完毕后 获得统一结果 * - * @author Parker + * @author 周鹏程 * @date 2020-12-10 10:36 */ @Slf4j -public class AsyncProcessExecutorByWait implements AsyncProcessExecutor{ +public class AsyncProcessExecutorByWait implements AsyncProcessExecutor { + + /** 线程池字典 */ + private static final Map EXECUTOR_MAP = Maps.newConcurrentMap(); + + /** 线程Key */ + private final String key; /** 任务队列 */ private final List taskList; + /** 执行器 */ + private final AsyncProcessor processor; + + /** + * 构造函数 + */ public AsyncProcessExecutorByWait(){ + this.key = "def"; + taskList = new ArrayList<>(); + processor = AsyncProcessExecutorByWait.getProcessor(this.key); + } + + /** + * 构造函数 + * @param key 线程池唯一Key + */ + public AsyncProcessExecutorByWait(String key){ + this.key = key; taskList = new ArrayList<>(); + processor = AsyncProcessExecutorByWait.getProcessor(this.key); } + /** * 执行 * @param task 任务 @@ -53,8 +82,6 @@ public class AsyncProcessExecutorByWait implements AsyncProcessExecutor{ /** * 执行 线程锁 等待查询结果 结果完成后继续执行 - * - * @return boolean 最终直接结果 */ @Override public boolean execute(){ @@ -64,9 +91,51 @@ public class AsyncProcessExecutorByWait implements AsyncProcessExecutor{ // 锁 AsyncWaitLock lock = new AsyncWaitLock(this.taskList.size()); + + for (Runnable task : this.taskList) { + // 多线程执行任务 + boolean execute = this.execute(task, lock); + // 执行任务被拒绝 门闩减1 计数器不动 End + if(!execute){ + lock.getLatch().countDown(); + } + } + + // 线程锁 等待查询结果 结果完成后继续执行 + try { + lock.getLatch().await(); + }catch (Exception e){ + log.error(e.getMessage(), e); + }finally { + this.taskList.clear(); + } + + // 返回执行结果 + return lock.getCount().get() == 0; + } + + /** + * 执行 线程锁 等待查询结果 结果完成后继续执行 + */ + @Override + public boolean executeErrorCallback(Function callback){ + if(CollUtil.isEmpty(this.taskList)){ + return true; + } + + // 锁 + AsyncWaitLock lock = new AsyncWaitLock(this.taskList.size()); + for (Runnable task : this.taskList) { // 多线程执行任务 - AsyncProcessCoordinator.execute(task, lock); + boolean execute = this.execute(task, lock); + // 执行任务被拒绝 门闩减1 计数器不动 End + if(!execute){ + // 线程池失败后 返回该 Runnable + callback.apply(task); + + lock.getLatch().countDown(); + } } // 线程锁 等待查询结果 结果完成后继续执行 @@ -74,13 +143,40 @@ public class AsyncProcessExecutorByWait implements AsyncProcessExecutor{ lock.getLatch().await(); }catch (Exception e){ log.error(e.getMessage(), e); + }finally { + this.taskList.clear(); } // 返回执行结果 return lock.getCount().get() == 0; } - // ======================================== + + /** + * 执行指定的任务 + * + * @param task 任务 + * @return boolean + */ + private boolean execute(final Runnable task, final AsyncWaitLock lock) { + return processor.executeTask(new TaskWrapper(task, lock)); + } + + /** + * 获得执行器 + * @param key Key + * @return AsyncProcessor + */ + private synchronized static AsyncProcessor getProcessor(String key){ + AsyncProcessor asyncProcessor = EXECUTOR_MAP.get(key); + if(null == asyncProcessor){ + asyncProcessor = new AsyncProcessor(key); + EXECUTOR_MAP.put(key, asyncProcessor); + } + return asyncProcessor; + } + + // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ /** * 线程锁对象 @@ -105,4 +201,51 @@ public class AsyncProcessExecutorByWait implements AsyncProcessExecutor{ } } + /** + * Task 包装类
+ * 此类型的意义是记录可能会被 Executor 吃掉的异常
+ */ + public static class TaskWrapper implements Runnable { + + private final Runnable gift; + private final CountDownLatch latch; + private final AtomicInteger count; + + public TaskWrapper(final Runnable target) { + this.gift = target; + this.count = null; + this.latch = null; + } + + public TaskWrapper(final Runnable target, final AsyncWaitLock lock) { + if (lock == null) { + this.gift = null; + this.count = null; + this.latch = null; + return; + } + + this.gift = target; + this.count = lock.getCount(); + this.latch = lock.getLatch(); + } + + @Override + public void run() { + // 捕获异常,避免在 Executor 里面被吞掉了 + if (gift != null) { + try { + gift.run(); + // 标示已执行 + count.decrementAndGet(); + } catch (Exception e) { + String errMsg = StrUtil.format("线程池-包装的目标执行异常: {}", e.getMessage()); + log.error(errMsg, e); + } finally { + latch.countDown(); + } + } + } + } + } diff --git a/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/AsyncProcessExecutorFactory.java b/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/AsyncProcessExecutorFactory.java index dd57cbfe..ca05231e 100644 --- a/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/AsyncProcessExecutorFactory.java +++ b/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/AsyncProcessExecutorFactory.java @@ -16,6 +16,15 @@ public final class AsyncProcessExecutorFactory { return new AsyncProcessExecutorByWait(); } + /** + * 创建等待执行器 + * @param key KEY + * @return AsyncProcessExecutor + */ + public static AsyncProcessExecutor createWaitExecutor(String key){ + return new AsyncProcessExecutorByWait(key); + } + /** * 创建正常执行器 * @return AsyncProcessExecutor @@ -24,6 +33,15 @@ public final class AsyncProcessExecutorFactory { return new AsyncProcessExecutorByNormal(); } + /** + * 创建正常执行器 + * @param key KEY + * @return AsyncProcessExecutor + */ + public static AsyncProcessExecutor createNormalExecutor(String key){ + return new AsyncProcessExecutorByNormal(key); + } + // ===================== private AsyncProcessExecutorFactory(){} diff --git a/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/AsyncProcessor.java b/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/AsyncProcessor.java index ca7771ae..df3f83c9 100644 --- a/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/AsyncProcessor.java +++ b/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/AsyncProcessor.java @@ -1,35 +1,30 @@ package org.opsli.common.thread; +import cn.hutool.core.util.StrUtil; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import java.util.concurrent.*; /** - * 线程池实际处理者 + * 自定义线程执行器 - 等待线程执行完毕不拒绝 * - * @author Parker + * @author 周鹏程 * @date 2020-10-08 10:24 */ @Slf4j -public final class AsyncProcessor { +public class AsyncProcessor { /** - * 默认最大并发数 + * 默认最大并发数
*/ private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2; /** * 线程池名称格式 */ - private static final String THREAD_POOL_NAME = "ExternalConvertProcessPool-%d"; - - /** - * 线程工厂名称 - */ - private static final ThreadFactory FACTORY = new BasicThreadFactory.Builder() - .namingPattern(THREAD_POOL_NAME) - .daemon(true).build(); + private static final String THREAD_POOL_NAME = "AsyncProcessPool-{}-%d"; /** * 默认队列大小 @@ -37,7 +32,7 @@ public final class AsyncProcessor { private static final int DEFAULT_SIZE = 1024; /** - * 默认线程池等待时间 秒 + * 默认线程池关闭等待时间 秒 */ private static final int DEFAULT_WAIT_TIME = 10; @@ -47,81 +42,71 @@ public final class AsyncProcessor { private static final long DEFAULT_KEEP_ALIVE = 60L; /** - * Executor + * 执行器 */ - private static final ExecutorService EXECUTOR; + private ExecutorService execute; + /** - * 执行队列 + * 初始化 */ - private static final BlockingQueue EXECUTOR_QUEUE = new ArrayBlockingQueue<>(DEFAULT_SIZE); + public AsyncProcessor(String key){ + if(StringUtils.isBlank(key)){ + return; + } + + // 线程工厂名称 + String formatThreadPoolName = StrUtil.format(THREAD_POOL_NAME, key); + BasicThreadFactory basicThreadFactory = new BasicThreadFactory.Builder() + .namingPattern(formatThreadPoolName) + .daemon(true).build(); - static { // 创建 Executor // 此处默认最大值改为处理器数量的 4 倍 try { - EXECUTOR = new ThreadPoolExecutor(DEFAULT_MAX_CONCURRENT, - DEFAULT_MAX_CONCURRENT * 4, DEFAULT_KEEP_ALIVE, - TimeUnit.SECONDS, EXECUTOR_QUEUE, FACTORY); + // 执行队列 + BlockingQueue executorQueue = new ArrayBlockingQueue<>(DEFAULT_SIZE); + execute = new ThreadPoolExecutor(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT * 4, DEFAULT_KEEP_ALIVE, + TimeUnit.SECONDS, executorQueue, basicThreadFactory); + // 这里不会自动关闭线程, 当线程超过阈值时 抛异常 + // 关闭事件的挂钩 + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + log.info("AsyncProcessorWait 异步处理器关闭"); + + execute.shutdown(); + + try { + // 等待1秒执行关闭 + if (!execute.awaitTermination(DEFAULT_WAIT_TIME, TimeUnit.SECONDS)) { + log.error("AsyncProcessorWait 由于等待超时,异步处理器立即关闭"); + execute.shutdownNow(); + } + } catch (InterruptedException e) { + log.error("AsyncProcessorWait 异步处理器关闭中断"); + execute.shutdownNow(); + } - // 主动关闭执行器 - autoCloseProcess(); + log.info("AsyncProcessorWait 异步处理器关闭完成"); + })); } catch (Exception e) { - log.error("AsyncProcessor 异步处理器初始化错误", e); + log.error("AsyncProcessorWait 异步处理器初始化错误", e); throw new ExceptionInInitializerError(e); } } - // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - - /** - * 此类型无法实例化 - */ - private AsyncProcessor() {} - - /** - * 主动关闭执行器 - */ - private static void autoCloseProcess() { - if(AsyncProcessor.EXECUTOR == null){ - return; - } - - // 这里不会自动关闭线程, 当线程超过阈值时 抛异常 - // 关闭事件的挂钩 - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - log.info("AsyncProcessor 异步处理器关闭"); - - AsyncProcessor.EXECUTOR.shutdown(); - try { - // 等待1秒执行关闭 - if (!AsyncProcessor.EXECUTOR.awaitTermination(AsyncProcessor.DEFAULT_WAIT_TIME, - TimeUnit.SECONDS)) { - log.error("AsyncProcessor 由于等待超时,异步处理器立即关闭"); - AsyncProcessor.EXECUTOR.shutdownNow(); - } - } catch (InterruptedException e) { - log.error("AsyncProcessor 异步处理器关闭中断"); - AsyncProcessor.EXECUTOR.shutdownNow(); - } - - log.info("AsyncProcessor 异步处理器关闭完成"); - })); - } - /** * 执行任务,不管是否成功
* 其实也就是包装以后的 {@link } 方法 * - * @param task 任务 - * @return boolean + * @param task + * @return */ - protected static boolean executeTask(Runnable task) { + public boolean executeTask(Runnable task) { try { - EXECUTOR.execute(task); + execute.execute(task); } catch (RejectedExecutionException e) { - log.error("AsyncProcessor 执行任务被拒绝", e); + log.error("AsyncProcessorWait 执行任务被拒绝", e); return false; } return true; @@ -134,12 +119,12 @@ public final class AsyncProcessor { * @param task 任务 * @return Future */ - protected static Future submitTask(Callable task) { + public Future submitTask(Callable task) { try { - return EXECUTOR.submit(task); + return execute.submit(task); } catch (RejectedExecutionException e) { - log.error("AsyncProcessor 执行任务被拒绝", e); - throw new UnsupportedOperationException("AsyncProcessor 无法提交任务,已被拒绝", e); + log.error("AsyncProcessorWait 执行任务被拒绝", e); + throw new UnsupportedOperationException("AsyncProcessorWait 无法提交任务,已被拒绝", e); } } } diff --git a/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/SyncProcessSingleExecutor.java b/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/SyncProcessSingleExecutor.java new file mode 100644 index 00000000..dadda2a2 --- /dev/null +++ b/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/SyncProcessSingleExecutor.java @@ -0,0 +1,80 @@ +package org.opsli.common.thread; + +import cn.hutool.core.thread.ThreadUtil; +import cn.hutool.core.util.StrUtil; +import com.google.common.collect.Maps; +import lombok.extern.slf4j.Slf4j; + +import java.util.Map; +import java.util.concurrent.ExecutorService; + +/** + * 单线程池 + * + * @author 周鹏程 + * @date 2021/8/27 17:00 + */ +@Slf4j +public final class SyncProcessSingleExecutor { + + private static final Map EXECUTOR_MAP = Maps.newConcurrentMap(); + + private static final String KEY = "def"; + + /** + * 执行器 + * @param r 任务 + */ + public static synchronized void execute(Runnable r){ + execute(KEY, r); + } + + + /** + * 执行器 + * @param key 唯一Key + * @param r 任务 + */ + public static synchronized void execute(String key, Runnable r){ + if(null == r){ + return; + } + + ExecutorService executorService = EXECUTOR_MAP.get(key); + if(null == executorService){ + executorService = ThreadUtil.newSingleExecutor(); + EXECUTOR_MAP.put(key, executorService); + } + + executorService.execute(new TaskWrapper(r)); + } + + + /** + * Task 包装类
+ * 此类型的意义是记录可能会被 Executor 吃掉的异常
+ */ + private static class TaskWrapper implements Runnable { + + private final Runnable gift; + + public TaskWrapper(final Runnable target) { + this.gift = target; + } + + @Override + public void run() { + // 捕获异常,避免在 Executor 里面被吞掉了 + if (gift != null) { + try { + gift.run(); + } catch (Exception e) { + String errMsg = StrUtil.format("线程池-包装的目标执行异常: {}", e.getMessage()); + log.error(errMsg, e); + } + } + } + } + + private SyncProcessSingleExecutor(){} +}