diff --git a/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/AsyncProcessQueue.java b/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/refuse/AsyncProcessQueueReFuse.java similarity index 89% rename from opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/AsyncProcessQueue.java rename to opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/refuse/AsyncProcessQueueReFuse.java index 1053624d..3e867c5e 100644 --- a/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/AsyncProcessQueue.java +++ b/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/refuse/AsyncProcessQueueReFuse.java @@ -1,4 +1,4 @@ -package org.opsli.common.thread; +package org.opsli.common.thread.refuse; import cn.hutool.core.util.StrUtil; @@ -10,7 +10,7 @@ import lombok.extern.slf4j.Slf4j; * @Description: 自定义线程有界队列 */ @Slf4j -public class AsyncProcessQueue { +public class AsyncProcessQueueReFuse { // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ /** @@ -48,6 +48,6 @@ public class AsyncProcessQueue { * @return */ public static boolean execute(final Runnable task) { - return AsyncProcessor.executeTask(new TaskWrapper(task)); + return AsyncProcessorReFuse.executeTask(new TaskWrapper(task)); } } diff --git a/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/AsyncProcessor.java b/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/refuse/AsyncProcessorReFuse.java similarity index 94% rename from opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/AsyncProcessor.java rename to opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/refuse/AsyncProcessorReFuse.java index 4dfb728c..cee89b93 100644 --- a/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/AsyncProcessor.java +++ b/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/refuse/AsyncProcessorReFuse.java @@ -1,4 +1,4 @@ -package org.opsli.common.thread; +package org.opsli.common.thread.refuse; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.concurrent.BasicThreadFactory; @@ -8,10 +8,10 @@ import java.util.concurrent.*; /** * @Author: 一枝花算不算浪漫 * @CreateTime: 2020-10-08 10:24 - * @Description: 自定义线程执行器 + * @Description: 自定义线程执行器 - 线程超时自动拒绝 */ @Slf4j -public class AsyncProcessor { +public class AsyncProcessorReFuse { /** * 默认最大并发数
@@ -21,7 +21,7 @@ public class AsyncProcessor { /** * 线程池名称格式 */ - private static final String THREAD_POOL_NAME = "ExternalConvertProcessPool-%d"; + private static final String THREAD_POOL_NAME = "ExternalConvertProcessPool-Refuse-%d"; /** * 线程工厂名称 @@ -90,7 +90,7 @@ public class AsyncProcessor { /** * 此类型无法实例化 */ - private AsyncProcessor() { + private AsyncProcessorReFuse() { } /** diff --git a/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/wait/AsyncProcessQueueWait.java b/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/wait/AsyncProcessQueueWait.java new file mode 100644 index 00000000..3ae3cac4 --- /dev/null +++ b/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/wait/AsyncProcessQueueWait.java @@ -0,0 +1,64 @@ +package org.opsli.common.thread.wait; + + +import cn.hutool.core.util.StrUtil; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @Author: 一枝花算不算浪漫 + * @CreateTime: 2020-10-08 10:24 + * @Description: 自定义线程有界队列 - 等待线程执行完毕不拒绝 + */ +@Slf4j +public class AsyncProcessQueueWait { + + // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + /** + * Task 包装类
+ * 此类型的意义是记录可能会被 Executor 吃掉的异常
+ */ + public static class TaskWrapper implements Runnable { + + private final Runnable gift; + private final CountDownLatch latch; + private final AtomicInteger count; + + public TaskWrapper(final Runnable target, final AtomicInteger count, final CountDownLatch latch) { + this.gift = target; + this.count = count; + this.latch = latch; + } + + @Override + public void run() { + // 捕获异常,避免在 Executor 里面被吞掉了 + if (gift != null) { + try { + gift.run(); + } catch (Exception e) { + String errMsg = StrUtil.format("线程池-包装的目标执行异常: {}", e.getMessage()); + log.error(errMsg, e); + } finally { + // 标示已执行 + count.decrementAndGet(); + latch.countDown(); + } + } + } + } + + // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + + /** + * 执行指定的任务 + * + * @param task + * @return + */ + public static boolean execute(final Runnable task, final AtomicInteger count, final CountDownLatch latch) { + return AsyncProcessorWait.executeTask(new TaskWrapper(task, count, latch)); + } +} diff --git a/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/wait/AsyncProcessWaitExecutor.java b/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/wait/AsyncProcessWaitExecutor.java new file mode 100644 index 00000000..06c8ae90 --- /dev/null +++ b/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/wait/AsyncProcessWaitExecutor.java @@ -0,0 +1,91 @@ +/** + * Copyright 2020 OPSLI 快速开发平台 https://www.opsli.com + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.opsli.common.thread.wait; + +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @Author: Parker + * @CreateTime: 2020-12-10 10:36 + * @Description: 多线程锁执行器 + * + * 用于当前方法中复杂业务多线程处理,等待线程执行完毕后 获得统一结果 + * + */ +@Slf4j +public class AsyncProcessWaitExecutor { + + /** 线程初始值 */ + private final int initVal; + /** 任务执行计数器 */ + private AtomicInteger count; + /** 门闩 线程锁 */ + private CountDownLatch latch; + + public AsyncProcessWaitExecutor(final int initVal){ + this.initVal = initVal; + if(this.initVal > 0){ + // 计数器 + count = new AtomicInteger(this.initVal); + latch = new CountDownLatch(this.initVal); + } + } + + /** + * 执行 + * @param task 任务 + */ + public void execute(final Runnable task){ + if(this.initVal > 0){ + // 多线程执行任务 + boolean execute = AsyncProcessQueueWait.execute(task, count, latch); + // 执行任务被拒绝 门闩减1 计数器不动 End + if(!execute){ + latch.countDown(); + } + } + } + + + /** + * 线程锁 等待查询结果 结果完成后继续执行 + */ + public void await(){ + if(this.initVal > 0){ + // 线程锁 等待查询结果 结果完成后继续执行 + try { + latch.await(); + }catch (Exception e){ + log.error(e.getMessage(), e); + } + } + } + + /** + * 线程锁 等待查询结果 结果完成后继续执行 + */ + public boolean isSuccess(){ + if(this.initVal > 0){ + return count.get() == 0; + } + return true; + } + + +} diff --git a/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/wait/AsyncProcessorWait.java b/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/wait/AsyncProcessorWait.java new file mode 100644 index 00000000..43175140 --- /dev/null +++ b/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/wait/AsyncProcessorWait.java @@ -0,0 +1,128 @@ +package org.opsli.common.thread.wait; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; + +import java.util.concurrent.*; + +/** + * @Author: 一枝花算不算浪漫 + * @CreateTime: 2020-10-08 10:24 + * @Description: 自定义线程执行器 - 等待线程执行完毕不拒绝 + */ +@Slf4j +public class AsyncProcessorWait { + + /** + * 默认最大并发数
+ */ + private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2; + + /** + * 线程池名称格式 + */ + private static final String THREAD_POOL_NAME = "ExternalConvertProcessPool-Wait-%d"; + + /** + * 线程工厂名称 + */ + private static final ThreadFactory FACTORY = new BasicThreadFactory.Builder() + .namingPattern(THREAD_POOL_NAME) + .daemon(true).build(); + + /** + * 默认队列大小 + */ + private static final int DEFAULT_SIZE = 500; + + /** + * 默认线程等待时间 秒 + */ + private static final int DEFAULT_WAIT_TIME = 99999; + + /** + * 默认线程存活时间 + */ + private static final long DEFAULT_KEEP_ALIVE = 60L; + + /**NewEntryServiceImpl.java:689 + * Executor + */ + private static final ExecutorService EXECUTOR; + + /** + * 执行队列 + */ + private static final BlockingQueue EXECUTOR_QUEUE = new ArrayBlockingQueue<>(DEFAULT_SIZE); + + static { + // 创建 Executor + // 此处默认最大值改为处理器数量的 4 倍 + try { + EXECUTOR = new ThreadPoolExecutor(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT * 4, DEFAULT_KEEP_ALIVE, + TimeUnit.SECONDS, EXECUTOR_QUEUE, FACTORY); + // 这里不会自动关闭线程, 当线程超过阈值时 抛异常 + // 关闭事件的挂钩 + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + log.info("AsyncProcessor shutting down."); + + EXECUTOR.shutdown(); + + try { + // 等待1秒执行关闭 + if (!EXECUTOR.awaitTermination(DEFAULT_WAIT_TIME, TimeUnit.SECONDS)) { + log.error("AsyncProcessor shutdown immediately due to wait timeout."); + EXECUTOR.shutdownNow(); + } + } catch (InterruptedException e) { + log.error("AsyncProcessor shutdown interrupted."); + EXECUTOR.shutdownNow(); + } + + log.info("AsyncProcessor shutdown complete."); + })); + } catch (Exception e) { + log.error("AsyncProcessor init error.", e); + throw new ExceptionInInitializerError(e); + } + } + + /** + * 此类型无法实例化 + */ + private AsyncProcessorWait() { + } + + /** + * 执行任务,不管是否成功
+ * 其实也就是包装以后的 {@link } 方法 + * + * @param task + * @return + */ + public static boolean executeTask(Runnable task) { + try { + EXECUTOR.execute(task); + } catch (RejectedExecutionException e) { + log.error("Task executing was rejected.", e); + return false; + } + return true; + } + + /** + * 提交任务,并可以在稍后获取其执行情况
+ * 当提交失败时,会抛出 {@link } + * + * @param task + * @return + */ + public static Future submitTask(Callable task) { + try { + return EXECUTOR.submit(task); + } catch (RejectedExecutionException e) { + log.error("Task executing was rejected.", e); + throw new UnsupportedOperationException("Unable to submit the task, rejected.", e); + } + } +} diff --git a/opsli-base-support/opsli-core/src/main/java/org/opsli/core/thread/LogsThreadPool.java b/opsli-base-support/opsli-core/src/main/java/org/opsli/core/thread/LogsThreadPool.java index 067f523e..8647657e 100644 --- a/opsli-base-support/opsli-core/src/main/java/org/opsli/core/thread/LogsThreadPool.java +++ b/opsli-base-support/opsli-core/src/main/java/org/opsli/core/thread/LogsThreadPool.java @@ -4,7 +4,7 @@ import lombok.extern.slf4j.Slf4j; import org.opsli.api.base.result.ResultVo; import org.opsli.api.web.system.logs.LogsApi; import org.opsli.api.wrapper.system.logs.LogsModel; -import org.opsli.common.thread.AsyncProcessQueue; +import org.opsli.common.thread.refuse.AsyncProcessQueueReFuse; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -32,7 +32,7 @@ public class LogsThreadPool { return; } - AsyncProcessQueue.execute(()->{ + AsyncProcessQueueReFuse.execute(()->{ // 存储临时 token ResultVo ret = logsApi.insert(logsModel); if(!ret.isSuccess()){ @@ -43,6 +43,7 @@ public class LogsThreadPool { // ======================== + @Autowired public void setLogsApi(LogsApi logsApi) { LogsThreadPool.logsApi = logsApi;