refactor: 优化异步线程池执行器

pull/9/head
Carina 3 years ago
parent e12e7dd30e
commit 3164d646e7

@ -0,0 +1,104 @@
package org.opsli.common.thread;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 线
*
* @author Parker
* @date 2020-10-08 10:24
*/
@Slf4j
public final class AsyncProcessCoordinator {
/**
* Task <br>
* Executor <br>
*/
public static class TaskWrapper implements Runnable {
private final Runnable gift;
private final CountDownLatch latch;
private final AtomicInteger count;
public TaskWrapper(final Runnable target) {
this.gift = target;
this.count = null;
this.latch = null;
}
public TaskWrapper(final Runnable target, final AsyncProcessExecutorByWait.AsyncWaitLock lock) {
if(lock == null){
this.gift = null;
this.count = null;
this.latch = null;
return;
}
this.gift = target;
this.count = lock.getCount();
this.latch = lock.getLatch();
}
@Override
public void run() {
// 捕获异常,避免在 Executor 里面被吞掉了
if (gift == null) {
return;
}
try {
// 执行任务
gift.run();
if(count != null){
// 标示已执行
count.decrementAndGet();
}
} catch (Exception e) {
String errMsg = StrUtil.format("线程池-包装的目标执行异常: {}", e.getMessage());
log.error(errMsg, e);
} finally {
if(latch != null){
latch.countDown();
}
}
}
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
/**
*
*/
private AsyncProcessCoordinator(){}
/**
*
*
* @param task
* @return boolean
*/
protected static boolean execute(final Runnable task) {
return AsyncProcessor.executeTask(new TaskWrapper(task));
}
/**
*
*
* @param task
* @return boolean
*/
protected static boolean execute(final Runnable task, final AsyncProcessExecutorByWait.AsyncWaitLock lock) {
boolean execute = AsyncProcessor.executeTask(new TaskWrapper(task, lock));
// 执行任务被拒绝 门闩减1 计数器不动 End
if(!execute){
lock.getLatch().countDown();
}
return execute;
}
}

@ -0,0 +1,25 @@
package org.opsli.common.thread;
/**
*
*
* @author Parker
* @date 202171513:43:37
*/
public interface AsyncProcessExecutor {
/**
*
* @param task
* @return AsyncProcessExecutor
*/
AsyncProcessExecutor put(final Runnable task);
/**
*
* @return boolean
*/
boolean execute();
}

@ -0,0 +1,70 @@
/**
* Copyright 2020 OPSLI https://www.opsli.com
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.opsli.common.thread;
import cn.hutool.core.collection.CollUtil;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
/**
* 线
*
* @author Parker
* @date 2020-12-10 10:36
*/
@Slf4j
public class AsyncProcessExecutorByNormal implements AsyncProcessExecutor{
/** 任务队列 */
private final List<Runnable> taskList;
public AsyncProcessExecutorByNormal(){
taskList = new ArrayList<>();
}
/**
*
* @param task
*/
@Override
public AsyncProcessExecutorByNormal put(final Runnable task){
taskList.add(task);
return this;
}
/**
* 线
*
* @return boolean
*/
@Override
public boolean execute(){
if(CollUtil.isEmpty(this.taskList)){
return true;
}
for (Runnable task : this.taskList) {
// 多线程执行任务
AsyncProcessCoordinator.execute(task);
}
// 返回执行结果
return true;
}
}

@ -13,9 +13,10 @@
* License for the specific language governing permissions and limitations under * License for the specific language governing permissions and limitations under
* the License. * the License.
*/ */
package org.opsli.common.thread.wait; package org.opsli.common.thread;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList; import java.util.ArrayList;
@ -31,14 +32,12 @@ import java.util.concurrent.atomic.AtomicInteger;
* @date 2020-12-10 10:36 * @date 2020-12-10 10:36
*/ */
@Slf4j @Slf4j
public class AsyncProcessWaitExecutor { public class AsyncProcessExecutorByWait implements AsyncProcessExecutor{
/** 任务执行计数器 */
private AtomicInteger count;
/** 任务队列 */ /** 任务队列 */
private final List<Runnable> taskList; private final List<Runnable> taskList;
public AsyncProcessWaitExecutor(){ public AsyncProcessExecutorByWait(){
taskList = new ArrayList<>(); taskList = new ArrayList<>();
} }
@ -46,48 +45,64 @@ public class AsyncProcessWaitExecutor {
* *
* @param task * @param task
*/ */
public void put(final Runnable task){ @Override
public AsyncProcessExecutorByWait put(final Runnable task){
taskList.add(task); taskList.add(task);
return this;
} }
/** /**
* 线 * 线
*
* @return boolean
*/ */
public void execute(){ @Override
public boolean execute(){
if(CollUtil.isEmpty(this.taskList)){ if(CollUtil.isEmpty(this.taskList)){
return; return true;
} }
// 初始化锁参数 // 锁
count = new AtomicInteger(this.taskList.size()); AsyncWaitLock lock = new AsyncWaitLock(this.taskList.size());
// 门闩 线程锁
CountDownLatch latch = new CountDownLatch(this.taskList.size());
for (Runnable task : this.taskList) { for (Runnable task : this.taskList) {
// 多线程执行任务 // 多线程执行任务
boolean execute = AsyncProcessQueueWait.execute(task, count, latch); AsyncProcessCoordinator.execute(task, lock);
// 执行任务被拒绝 门闩减1 计数器不动 End
if(!execute){
latch.countDown();
}
} }
// 线程锁 等待查询结果 结果完成后继续执行 // 线程锁 等待查询结果 结果完成后继续执行
try { try {
latch.await(); lock.getLatch().await();
}catch (Exception e){ }catch (Exception e){
log.error(e.getMessage(), e); log.error(e.getMessage(), e);
} }
// 返回执行结果
return lock.getCount().get() == 0;
} }
// ========================================
/** /**
* 线 * 线
*
* @author Parker
* @date 2020-10-08 10:24
*/ */
public boolean isSuccess(){ @Getter
if(CollUtil.isEmpty(this.taskList)){ public static class AsyncWaitLock {
return true;
/** 门闩 */
private final CountDownLatch latch;
/** 计数器 */
private final AtomicInteger count;
public AsyncWaitLock(int count){
// 初始化锁参数
this.count = new AtomicInteger(count);
// 门闩 线程锁
this.latch = new CountDownLatch(count);
} }
return count.get() == 0;
} }
} }

@ -0,0 +1,31 @@
package org.opsli.common.thread;
/**
*
*
* @author Parker
* @date 202171513:43:37
*/
public final class AsyncProcessExecutorFactory {
/**
*
* @return AsyncProcessExecutor
*/
public static AsyncProcessExecutor createWaitExecutor(){
return new AsyncProcessExecutorByWait();
}
/**
*
* @return AsyncProcessExecutor
*/
public static AsyncProcessExecutor createNormalExecutor(){
return new AsyncProcessExecutorByNormal();
}
// =====================
private AsyncProcessExecutorFactory(){}
}

@ -1,4 +1,4 @@
package org.opsli.common.thread.refuse; package org.opsli.common.thread;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.apache.commons.lang3.concurrent.BasicThreadFactory;
@ -6,23 +6,23 @@ import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import java.util.concurrent.*; import java.util.concurrent.*;
/** /**
* 线 - 线 * 线
* *
* @author * @author Parker
* @date 2020-10-08 10:24 * @date 2020-10-08 10:24
*/ */
@Slf4j @Slf4j
public class AsyncProcessorReFuse { public final class AsyncProcessor {
/** /**
* <br> *
*/ */
private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2; private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2;
/** /**
* 线 * 线
*/ */
private static final String THREAD_POOL_NAME = "ExternalConvertProcessPool-Refuse-%d"; private static final String THREAD_POOL_NAME = "ExternalConvertProcessPool-%d";
/** /**
* 线 * 线
@ -34,10 +34,10 @@ public class AsyncProcessorReFuse {
/** /**
* *
*/ */
private static final int DEFAULT_SIZE = 500; private static final int DEFAULT_SIZE = 1024;
/** /**
* 线 * 线
*/ */
private static final int DEFAULT_WAIT_TIME = 10; private static final int DEFAULT_WAIT_TIME = 10;
@ -46,7 +46,7 @@ public class AsyncProcessorReFuse {
*/ */
private static final long DEFAULT_KEEP_ALIVE = 60L; private static final long DEFAULT_KEEP_ALIVE = 60L;
/**NewEntryServiceImpl.java:689 /**
* Executor * Executor
*/ */
private static final ExecutorService EXECUTOR; private static final ExecutorService EXECUTOR;
@ -60,40 +60,56 @@ public class AsyncProcessorReFuse {
// 创建 Executor // 创建 Executor
// 此处默认最大值改为处理器数量的 4 倍 // 此处默认最大值改为处理器数量的 4 倍
try { try {
EXECUTOR = new ThreadPoolExecutor(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT * 4, DEFAULT_KEEP_ALIVE, EXECUTOR = new ThreadPoolExecutor(DEFAULT_MAX_CONCURRENT,
DEFAULT_MAX_CONCURRENT * 4, DEFAULT_KEEP_ALIVE,
TimeUnit.SECONDS, EXECUTOR_QUEUE, FACTORY); TimeUnit.SECONDS, EXECUTOR_QUEUE, FACTORY);
// 关闭事件的挂钩 // 主动关闭执行器
Runtime.getRuntime().addShutdownHook(new Thread(() -> { autoCloseProcess();
log.info("AsyncProcessorReFuse 异步处理器关闭");
EXECUTOR.shutdown();
try {
// 等待1秒执行关闭
if (!EXECUTOR.awaitTermination(DEFAULT_WAIT_TIME, TimeUnit.SECONDS)) {
log.error("AsyncProcessorReFuse 由于等待超时,异步处理器立即关闭");
EXECUTOR.shutdownNow();
}
} catch (InterruptedException e) {
log.error("AsyncProcessorReFuse 异步处理器关闭中断");
EXECUTOR.shutdownNow();
}
log.info("AsyncProcessorReFuse 异步处理器关闭完成");
}));
} catch (Exception e) { } catch (Exception e) {
log.error("AsyncProcessorReFuse 异步处理器初始化错误", e); log.error("AsyncProcessor 异步处理器初始化错误", e);
throw new ExceptionInInitializerError(e); throw new ExceptionInInitializerError(e);
} }
} }
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
/** /**
* *
*/ */
private AsyncProcessorReFuse() { private AsyncProcessor() {}
/**
*
*/
private static void autoCloseProcess() {
if(AsyncProcessor.EXECUTOR == null){
return;
}
// 这里不会自动关闭线程, 当线程超过阈值时 抛异常
// 关闭事件的挂钩
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("AsyncProcessor 异步处理器关闭");
AsyncProcessor.EXECUTOR.shutdown();
try {
// 等待1秒执行关闭
if (!AsyncProcessor.EXECUTOR.awaitTermination(AsyncProcessor.DEFAULT_WAIT_TIME,
TimeUnit.SECONDS)) {
log.error("AsyncProcessor 由于等待超时,异步处理器立即关闭");
AsyncProcessor.EXECUTOR.shutdownNow();
}
} catch (InterruptedException e) {
log.error("AsyncProcessor 异步处理器关闭中断");
AsyncProcessor.EXECUTOR.shutdownNow();
}
log.info("AsyncProcessor 异步处理器关闭完成");
}));
} }
/** /**
* <br> * <br>
* {@link } * {@link }
@ -101,11 +117,11 @@ public class AsyncProcessorReFuse {
* @param task * @param task
* @return boolean * @return boolean
*/ */
public static boolean executeTask(Runnable task) { protected static boolean executeTask(Runnable task) {
try { try {
EXECUTOR.execute(task); EXECUTOR.execute(task);
} catch (RejectedExecutionException e) { } catch (RejectedExecutionException e) {
log.error("AsyncProcessorReFuse 执行任务被拒绝", e); log.error("AsyncProcessor 执行任务被拒绝", e);
return false; return false;
} }
return true; return true;
@ -116,14 +132,14 @@ public class AsyncProcessorReFuse {
* {@link } * {@link }
* *
* @param task * @param task
* @return <T> * @return Future<T>
*/ */
public static <T> Future<T> submitTask(Callable<T> task) { protected static <T> Future<T> submitTask(Callable<T> task) {
try { try {
return EXECUTOR.submit(task); return EXECUTOR.submit(task);
} catch (RejectedExecutionException e) { } catch (RejectedExecutionException e) {
log.error("AsyncProcessorReFuse 执行任务被拒绝", e); log.error("AsyncProcessor 执行任务被拒绝", e);
throw new UnsupportedOperationException("AsyncProcessorReFuse 无法提交任务,已被拒绝", e); throw new UnsupportedOperationException("AsyncProcessor 无法提交任务,已被拒绝", e);
} }
} }
} }

@ -0,0 +1,20 @@
package org.opsli.common.thread;
import cn.hutool.core.thread.ThreadUtil;
public class Test {
public static void main(String[] args) {
AsyncProcessExecutor executor = new AsyncProcessExecutorByNormal();
for (int i = 0; i < 10000; i++) {
int finalI = i;
executor.put(()->{
ThreadUtil.sleep(1000);
System.out.println(finalI);
});
}
boolean execute = executor.execute();
System.out.println(execute);
}
}

@ -1,54 +0,0 @@
package org.opsli.common.thread.refuse;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
/**
* 线
*
* @author
* @date 2020-10-08 10:24
*/
@Slf4j
public class AsyncProcessQueueReFuse {
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
/**
* Task <br>
* Executor <br>
*/
public static class TaskWrapper implements Runnable {
private final Runnable gift;
public TaskWrapper(final Runnable target) {
this.gift = target;
}
@Override
public void run() {
// 捕获异常,避免在 Executor 里面被吞掉了
if (gift != null) {
try {
gift.run();
} catch (Exception e) {
String errMsg = StrUtil.format("线程池-包装的目标执行异常: {}", e.getMessage());
log.error(errMsg, e);
}
}
}
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
/**
*
*
* @param task
* @return boolean
*/
public static boolean execute(final Runnable task) {
return AsyncProcessorReFuse.executeTask(new TaskWrapper(task));
}
}

@ -1,65 +0,0 @@
package org.opsli.common.thread.wait;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 线 - 线
*
* @author
* @date 2020-10-08 10:24
*/
@Slf4j
public class AsyncProcessQueueWait {
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
/**
* Task <br>
* Executor <br>
*/
public static class TaskWrapper implements Runnable {
private final Runnable gift;
private final CountDownLatch latch;
private final AtomicInteger count;
public TaskWrapper(final Runnable target, final AtomicInteger count, final CountDownLatch latch) {
this.gift = target;
this.count = count;
this.latch = latch;
}
@Override
public void run() {
// 捕获异常,避免在 Executor 里面被吞掉了
if (gift != null) {
try {
gift.run();
// 标示已执行
count.decrementAndGet();
} catch (Exception e) {
String errMsg = StrUtil.format("线程池-包装的目标执行异常: {}", e.getMessage());
log.error(errMsg, e);
} finally {
latch.countDown();
}
}
}
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
/**
*
*
* @param task
* @return boolean
*/
public static boolean execute(final Runnable task, final AtomicInteger count, final CountDownLatch latch) {
return AsyncProcessorWait.executeTask(new TaskWrapper(task, count, latch));
}
}

@ -1,129 +0,0 @@
package org.opsli.common.thread.wait;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import java.util.concurrent.*;
/**
* 线 - 线
*
* @author
* @date 2020-10-08 10:24
*/
@Slf4j
public class AsyncProcessorWait {
/**
* <br>
*/
private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2;
/**
* 线
*/
private static final String THREAD_POOL_NAME = "ExternalConvertProcessPool-Wait-%d";
/**
* 线
*/
private static final ThreadFactory FACTORY = new BasicThreadFactory.Builder()
.namingPattern(THREAD_POOL_NAME)
.daemon(true).build();
/**
*
*/
private static final int DEFAULT_SIZE = 500;
/**
* 线
*/
private static final int DEFAULT_WAIT_TIME = 99999;
/**
* 线
*/
private static final long DEFAULT_KEEP_ALIVE = 60L;
/**NewEntryServiceImpl.java:689
* Executor
*/
private static final ExecutorService EXECUTOR;
/**
*
*/
private static final BlockingQueue<Runnable> EXECUTOR_QUEUE = new ArrayBlockingQueue<>(DEFAULT_SIZE);
static {
// 创建 Executor
// 此处默认最大值改为处理器数量的 4 倍
try {
EXECUTOR = new ThreadPoolExecutor(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT * 4, DEFAULT_KEEP_ALIVE,
TimeUnit.SECONDS, EXECUTOR_QUEUE, FACTORY);
// 这里不会自动关闭线程, 当线程超过阈值时 抛异常
// 关闭事件的挂钩
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("AsyncProcessorWait 异步处理器关闭");
EXECUTOR.shutdown();
try {
// 等待1秒执行关闭
if (!EXECUTOR.awaitTermination(DEFAULT_WAIT_TIME, TimeUnit.SECONDS)) {
log.error("AsyncProcessorWait 由于等待超时,异步处理器立即关闭");
EXECUTOR.shutdownNow();
}
} catch (InterruptedException e) {
log.error("AsyncProcessorWait 异步处理器关闭中断");
EXECUTOR.shutdownNow();
}
log.info("AsyncProcessorWait 异步处理器关闭完成");
}));
} catch (Exception e) {
log.error("AsyncProcessorWait 异步处理器初始化错误", e);
throw new ExceptionInInitializerError(e);
}
}
/**
*
*/
private AsyncProcessorWait() {
}
/**
* <br>
* {@link }
*
* @param task
* @return
*/
public static boolean executeTask(Runnable task) {
try {
EXECUTOR.execute(task);
} catch (RejectedExecutionException e) {
log.error("AsyncProcessorWait 执行任务被拒绝", e);
return false;
}
return true;
}
/**
* <br>
* {@link }
*
* @param task
* @return
*/
public static <T> Future<T> submitTask(Callable<T> task) {
try {
return EXECUTOR.submit(task);
} catch (RejectedExecutionException e) {
log.error("AsyncProcessorWait 执行任务被拒绝", e);
throw new UnsupportedOperationException("AsyncProcessorWait 无法提交任务,已被拒绝", e);
}
}
}

@ -21,7 +21,8 @@ import com.google.common.collect.Maps;
import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.RateLimiter;
import lombok.Data; import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.opsli.common.thread.refuse.AsyncProcessQueueReFuse; import org.opsli.common.thread.AsyncProcessExecutor;
import org.opsli.common.thread.AsyncProcessExecutorFactory;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import java.time.Duration; import java.time.Duration;
@ -184,12 +185,14 @@ public final class RateLimiterUtil {
public static void main(String[] args) { public static void main(String[] args) {
int count = 500; int count = 500;
RateLimiterUtil.removeIp("127.0.0.1"); RateLimiterUtil.removeIp("127.0.0.1");
AsyncProcessExecutor normalExecutor = AsyncProcessExecutorFactory.createNormalExecutor();
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
AsyncProcessQueueReFuse.execute(()->{ normalExecutor.put(()->{
boolean enter = RateLimiterUtil.enter("127.0.0.1","/api/v1", 2d); boolean enter = RateLimiterUtil.enter("127.0.0.1","/api/v1", 2d);
System.out.println(enter); System.out.println(enter);
}); });
} }
normalExecutor.execute();
} }
} }

@ -166,20 +166,19 @@ public class WebQueryBuilder<T extends BaseEntity> implements QueryBuilder<T>{
// 如果没有排序 默认按照 修改时间倒叙排序 // 如果没有排序 默认按照 修改时间倒叙排序
if(orderCount == 0){ if(orderCount == 0){
if(StringUtils.isNotEmpty(this.defaultOrderField)){ if(StringUtils.isNotEmpty(this.defaultOrderField)){
String key = this.defaultOrderField; String key = StringUtils.isNotEmpty(conf.get(this.defaultOrderField))
String keyStr = null; ?conf.get(this.defaultOrderField)
if(conf != null){ :this.defaultOrderField;
keyStr = conf.get(key); // 如果Key 与 默认Key 想等且Entity 不包含这个字段 则不进行排序
} if(StringUtils.equals(key, this.defaultOrderField)) {
// 如果Entity 不包含这个字段 则不进行排序
if(!ReflectUtil.hasField(entityClazz, key)) {
return queryWrapper;
}
// 检测 Conf 配置中是否已经指定该配置
if(StringUtils.isNotEmpty(keyStr)){
key = keyStr;
}else{
// 转换驼峰 为 数据库下划线字段 // 转换驼峰 为 数据库下划线字段
key = FieldUtil.humpToUnderline(key); key = FieldUtil.humpToUnderline(key);
} }
queryWrapper.orderByDesc(key); queryWrapper.orderByDesc(key);
} }
} }
@ -302,4 +301,4 @@ public class WebQueryBuilder<T extends BaseEntity> implements QueryBuilder<T>{
return ORDER.equals(handle); return ORDER.equals(handle);
} }
} }
} }

@ -4,7 +4,8 @@ import lombok.extern.slf4j.Slf4j;
import org.opsli.api.base.result.ResultVo; import org.opsli.api.base.result.ResultVo;
import org.opsli.api.web.system.logs.LogsApi; import org.opsli.api.web.system.logs.LogsApi;
import org.opsli.api.wrapper.system.logs.LogsModel; import org.opsli.api.wrapper.system.logs.LogsModel;
import org.opsli.common.thread.refuse.AsyncProcessQueueReFuse; import org.opsli.common.thread.AsyncProcessExecutor;
import org.opsli.common.thread.AsyncProcessExecutorFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -31,13 +32,15 @@ public class LogsThreadPool {
return; return;
} }
AsyncProcessQueueReFuse.execute(()->{ AsyncProcessExecutor normalExecutor = AsyncProcessExecutorFactory.createNormalExecutor();
normalExecutor.put(()->{
// 存储临时 token // 存储临时 token
ResultVo<?> ret = logsApi.insert(logsModel); ResultVo<?> ret = logsApi.insert(logsModel);
if(!ret.isSuccess()){ if(!ret.isSuccess()){
log.error(ret.getMsg()); log.error(ret.getMsg());
} }
}); });
normalExecutor.execute();
} }

@ -29,12 +29,13 @@ import org.opsli.api.wrapper.system.user.UserModel;
import org.opsli.common.annotation.ApiCryptoAsymmetric; import org.opsli.common.annotation.ApiCryptoAsymmetric;
import org.opsli.common.annotation.Limiter; import org.opsli.common.annotation.Limiter;
import org.opsli.common.enums.DictType; import org.opsli.common.enums.DictType;
import org.opsli.common.thread.AsyncProcessExecutor;
import org.opsli.common.thread.AsyncProcessExecutorFactory;
import org.opsli.core.utils.ValidatorUtil; import org.opsli.core.utils.ValidatorUtil;
import org.opsli.core.api.TokenThreadLocal; import org.opsli.core.api.TokenThreadLocal;
import org.opsli.common.enums.AlertType; import org.opsli.common.enums.AlertType;
import org.opsli.common.enums.OptionsType; import org.opsli.common.enums.OptionsType;
import org.opsli.common.exception.TokenException; import org.opsli.common.exception.TokenException;
import org.opsli.common.thread.refuse.AsyncProcessQueueReFuse;
import org.opsli.common.utils.IPUtil; import org.opsli.common.utils.IPUtil;
import org.opsli.core.msg.TokenMsg; import org.opsli.core.msg.TokenMsg;
import org.opsli.core.utils.*; import org.opsli.core.utils.*;
@ -149,13 +150,15 @@ public class LoginRestController {
//生成token并保存到Redis //生成token并保存到Redis
ResultVo<UserTokenUtil.TokenRet> resultVo = UserTokenUtil.createToken(user); ResultVo<UserTokenUtil.TokenRet> resultVo = UserTokenUtil.createToken(user);
if(resultVo.isSuccess()){ if(resultVo.isSuccess()){
AsyncProcessExecutor normalExecutor = AsyncProcessExecutorFactory.createNormalExecutor();
// 异步保存IP // 异步保存IP
AsyncProcessQueueReFuse.execute(()->{ normalExecutor.put(()->{
// 保存用户最后登录IP // 保存用户最后登录IP
String clientIpAddress = IPUtil.getClientIdBySingle(request); String clientIpAddress = IPUtil.getClientIdBySingle(request);
user.setLoginIp(clientIpAddress); user.setLoginIp(clientIpAddress);
iUserService.updateLoginIp(user); iUserService.updateLoginIp(user);
}); });
normalExecutor.execute();
} }
return resultVo; return resultVo;
} }

@ -1,7 +1,8 @@
import cn.hutool.http.HttpUtil; import cn.hutool.http.HttpUtil;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import org.junit.Test; import org.junit.Test;
import org.opsli.common.thread.refuse.AsyncProcessQueueReFuse; import org.opsli.common.thread.AsyncProcessExecutor;
import org.opsli.common.thread.AsyncProcessExecutorFactory;
import java.util.Map; import java.util.Map;
@ -15,14 +16,16 @@ public class LimiterTest {
@Test @Test
public void test(){ public void test(){
AsyncProcessExecutor normalExecutor = AsyncProcessExecutorFactory.createNormalExecutor();
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
AsyncProcessQueueReFuse.execute(()->{ normalExecutor.put(()->{
Map<String,Object> map = Maps.newHashMap(); Map<String,Object> map = Maps.newHashMap();
map.put("username","demo"); map.put("username","demo");
String ret = HttpUtil.get("http://127.0.0.1:8080/opsli-boot/system/slipCount", map); String ret = HttpUtil.get("http://127.0.0.1:8080/opsli-boot/system/slipCount", map);
System.out.println(ret); System.out.println(ret);
}); });
} }
normalExecutor.execute();
} }
} }

Loading…
Cancel
Save