From 8ce987ffb07e846e5daf44c9e5302f9d80b9c11c Mon Sep 17 00:00:00 2001 From: hiparker Date: Thu, 8 Apr 2021 12:55:43 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=A4=9A=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E9=94=81=E7=AD=89=E5=BE=85=E6=89=A7=E8=A1=8C=E5=99=A8=EF=BC=8C?= =?UTF-8?q?=E6=B6=88=E9=99=A4=E7=BA=BF=E7=A8=8B=E6=AD=BB=E9=94=81=E6=93=8D?= =?UTF-8?q?=E4=BD=9C=E9=9A=90=E6=82=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../thread/wait/AsyncProcessWaitExecutor.java | 63 ++++++++++--------- .../src/main/resources/application-dev.yaml | 4 +- 2 files changed, 35 insertions(+), 32 deletions(-) diff --git a/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/wait/AsyncProcessWaitExecutor.java b/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/wait/AsyncProcessWaitExecutor.java index 06c8ae90..8713fe33 100644 --- a/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/wait/AsyncProcessWaitExecutor.java +++ b/opsli-base-support/opsli-common/src/main/java/org/opsli/common/thread/wait/AsyncProcessWaitExecutor.java @@ -15,8 +15,11 @@ */ package org.opsli.common.thread.wait; +import cn.hutool.core.collection.CollUtil; import lombok.extern.slf4j.Slf4j; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; @@ -31,28 +34,37 @@ import java.util.concurrent.atomic.AtomicInteger; @Slf4j public class AsyncProcessWaitExecutor { - /** 线程初始值 */ - private final int initVal; /** 任务执行计数器 */ private AtomicInteger count; - /** 门闩 线程锁 */ - private CountDownLatch latch; + /** 任务队列 */ + private final List taskList; - public AsyncProcessWaitExecutor(final int initVal){ - this.initVal = initVal; - if(this.initVal > 0){ - // 计数器 - count = new AtomicInteger(this.initVal); - latch = new CountDownLatch(this.initVal); - } + public AsyncProcessWaitExecutor(){ + taskList = new ArrayList<>(); } /** * 执行 * @param task 任务 */ - public void execute(final Runnable task){ - if(this.initVal > 0){ + public void put(final Runnable task){ + taskList.add(task); + } + + /** + * 执行 线程锁 等待查询结果 结果完成后继续执行 + */ + public void execute(){ + if(CollUtil.isEmpty(this.taskList)){ + return; + } + + // 初始化锁参数 + count = new AtomicInteger(this.taskList.size()); + // 门闩 线程锁 + CountDownLatch latch = new CountDownLatch(this.taskList.size()); + + for (Runnable task : this.taskList) { // 多线程执行任务 boolean execute = AsyncProcessQueueWait.execute(task, count, latch); // 执行任务被拒绝 门闩减1 计数器不动 End @@ -60,20 +72,12 @@ public class AsyncProcessWaitExecutor { latch.countDown(); } } - } - - /** - * 线程锁 等待查询结果 结果完成后继续执行 - */ - public void await(){ - if(this.initVal > 0){ - // 线程锁 等待查询结果 结果完成后继续执行 - try { - latch.await(); - }catch (Exception e){ - log.error(e.getMessage(), e); - } + // 线程锁 等待查询结果 结果完成后继续执行 + try { + latch.await(); + }catch (Exception e){ + log.error(e.getMessage(), e); } } @@ -81,11 +85,10 @@ public class AsyncProcessWaitExecutor { * 线程锁 等待查询结果 结果完成后继续执行 */ public boolean isSuccess(){ - if(this.initVal > 0){ - return count.get() == 0; + if(CollUtil.isEmpty(this.taskList)){ + return true; } - return true; + return count.get() == 0; } - } diff --git a/opsli-starter/src/main/resources/application-dev.yaml b/opsli-starter/src/main/resources/application-dev.yaml index ecc69659..324918ac 100644 --- a/opsli-starter/src/main/resources/application-dev.yaml +++ b/opsli-starter/src/main/resources/application-dev.yaml @@ -8,7 +8,7 @@ spring: #redis 配置 redis: database: 0 - host: 127.0.0.1 + host: 10.0.0.254 password: '123456' port: 6379 @@ -50,7 +50,7 @@ redisson: server: enable: true type: standalone - address: "127.0.0.1:6379" + address: "10.0.0.254:6379" password: 123456 database: 0