优化多线程锁等待执行器,消除线程死锁操作隐患

v1.4.1
hiparker 5 years ago
parent 48c2a93182
commit 8ce987ffb0

@ -15,8 +15,11 @@
*/ */
package org.opsli.common.thread.wait; package org.opsli.common.thread.wait;
import cn.hutool.core.collection.CollUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -31,28 +34,37 @@ import java.util.concurrent.atomic.AtomicInteger;
@Slf4j @Slf4j
public class AsyncProcessWaitExecutor { public class AsyncProcessWaitExecutor {
/** 线程初始值 */
private final int initVal;
/** 任务执行计数器 */ /** 任务执行计数器 */
private AtomicInteger count; private AtomicInteger count;
/** 门闩 线程锁 */ /** 任务队列 */
private CountDownLatch latch; private final List<Runnable> taskList;
public AsyncProcessWaitExecutor(final int initVal){ public AsyncProcessWaitExecutor(){
this.initVal = initVal; taskList = new ArrayList<>();
if(this.initVal > 0){
// 计数器
count = new AtomicInteger(this.initVal);
latch = new CountDownLatch(this.initVal);
}
} }
/** /**
* *
* @param task * @param task
*/ */
public void execute(final Runnable task){ public void put(final Runnable task){
if(this.initVal > 0){ taskList.add(task);
}
/**
* 线
*/
public void execute(){
if(CollUtil.isEmpty(this.taskList)){
return;
}
// 初始化锁参数
count = new AtomicInteger(this.taskList.size());
// 门闩 线程锁
CountDownLatch latch = new CountDownLatch(this.taskList.size());
for (Runnable task : this.taskList) {
// 多线程执行任务 // 多线程执行任务
boolean execute = AsyncProcessQueueWait.execute(task, count, latch); boolean execute = AsyncProcessQueueWait.execute(task, count, latch);
// 执行任务被拒绝 门闩减1 计数器不动 End // 执行任务被拒绝 门闩减1 计数器不动 End
@ -60,14 +72,7 @@ public class AsyncProcessWaitExecutor {
latch.countDown(); latch.countDown();
} }
} }
}
/**
* 线
*/
public void await(){
if(this.initVal > 0){
// 线程锁 等待查询结果 结果完成后继续执行 // 线程锁 等待查询结果 结果完成后继续执行
try { try {
latch.await(); latch.await();
@ -75,17 +80,15 @@ public class AsyncProcessWaitExecutor {
log.error(e.getMessage(), e); log.error(e.getMessage(), e);
} }
} }
}
/** /**
* 线 * 线
*/ */
public boolean isSuccess(){ public boolean isSuccess(){
if(this.initVal > 0){ if(CollUtil.isEmpty(this.taskList)){
return count.get() == 0;
}
return true; return true;
} }
return count.get() == 0;
}
} }

@ -8,7 +8,7 @@ spring:
#redis 配置 #redis 配置
redis: redis:
database: 0 database: 0
host: 127.0.0.1 host: 10.0.0.254
password: '123456' password: '123456'
port: 6379 port: 6379
@ -50,7 +50,7 @@ redisson:
server: server:
enable: true enable: true
type: standalone type: standalone
address: "127.0.0.1:6379" address: "10.0.0.254:6379"
password: 123456 password: 123456
database: 0 database: 0

Loading…
Cancel
Save