perf: 优化线程池处理工具

pull/9/head
Carina 3 years ago
parent 66f151ccb8
commit c2138f379c

@ -16,10 +16,7 @@
package org.opsli.api.web.gentest.carinfo; package org.opsli.api.web.gentest.carinfo;
import org.opsli.api.base.result.ResultVo; import org.opsli.api.base.result.ResultVo;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.multipart.MultipartHttpServletRequest; import org.springframework.web.multipart.MultipartHttpServletRequest;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
@ -44,13 +41,15 @@ public interface TestCarRestApi {
String TITLE = "汽车信息管理"; String TITLE = "汽车信息管理";
/** 子标题 */ /** 子标题 */
String SUB_TITLE = "汽车信息"; String SUB_TITLE = "汽车信息";
/** URL 前缀 */
String URL_PREFIX = "/gentest/carinfo/{version}";
/** /**
* *
* @param model * @param model
* @return ResultVo * @return ResultVo
*/ */
@GetMapping("/get") @GetMapping(URL_PREFIX + "/get")
ResultVo<TestCarModel> get(TestCarModel model); ResultVo<TestCarModel> get(TestCarModel model);
/** /**
@ -60,7 +59,7 @@ public interface TestCarRestApi {
* @param request request * @param request request
* @return ResultVo * @return ResultVo
*/ */
@GetMapping("/findPage") @GetMapping(URL_PREFIX + "/findPage")
ResultVo<?> findPage( ResultVo<?> findPage(
@RequestParam(name = "pageNo", defaultValue = "1") Integer pageNo, @RequestParam(name = "pageNo", defaultValue = "1") Integer pageNo,
@RequestParam(name = "pageSize", defaultValue = "10") Integer pageSize, @RequestParam(name = "pageSize", defaultValue = "10") Integer pageSize,
@ -72,7 +71,7 @@ public interface TestCarRestApi {
* @param model * @param model
* @return ResultVo * @return ResultVo
*/ */
@PostMapping("/insert") @PostMapping(URL_PREFIX + "/insert")
ResultVo<?> insert(@RequestBody TestCarModel model); ResultVo<?> insert(@RequestBody TestCarModel model);
/** /**
@ -80,7 +79,7 @@ public interface TestCarRestApi {
* @param model * @param model
* @return ResultVo * @return ResultVo
*/ */
@PostMapping("/update") @PostMapping(URL_PREFIX + "/update")
ResultVo<?> update(@RequestBody TestCarModel model); ResultVo<?> update(@RequestBody TestCarModel model);
/** /**
@ -88,7 +87,7 @@ public interface TestCarRestApi {
* @param id ID * @param id ID
* @return ResultVo * @return ResultVo
*/ */
@PostMapping("/del") @PostMapping(URL_PREFIX + "/del")
ResultVo<?> del(String id); ResultVo<?> del(String id);
/** /**
@ -96,7 +95,7 @@ public interface TestCarRestApi {
* @param ids ID * @param ids ID
* @return ResultVo * @return ResultVo
*/ */
@PostMapping("/delAll") @PostMapping(URL_PREFIX + "/delAll")
ResultVo<?> delAll(String ids); ResultVo<?> delAll(String ids);
/** /**
@ -112,7 +111,7 @@ public interface TestCarRestApi {
* @param request request * @param request request
* @param response response * @param response response
*/ */
@GetMapping("/exportExcel") @GetMapping(URL_PREFIX + "/exportExcel")
void exportExcel(HttpServletRequest request, HttpServletResponse response); void exportExcel(HttpServletRequest request, HttpServletResponse response);
/** /**
@ -120,14 +119,14 @@ public interface TestCarRestApi {
* @param request request * @param request request
* @return ResultVo * @return ResultVo
*/ */
@PostMapping("/importExcel") @PostMapping(URL_PREFIX + "/importExcel")
ResultVo<?> importExcel(MultipartHttpServletRequest request); ResultVo<?> importExcel(MultipartHttpServletRequest request);
/** /**
* Excel * Excel
* @param response response * @param response response
*/ */
@GetMapping("/importExcel/template") @GetMapping(URL_PREFIX + "/importExcel/template")
void importTemplate(HttpServletResponse response); void importTemplate(HttpServletResponse response);
} }

@ -0,0 +1,18 @@
package org.opsli.common.annotation;
import java.lang.annotation.*;
/**
* 1
*
* @author Parker
* @date 2021102712:36:55
*/
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ApiVersion {
int value() default 1;
}

@ -1,3 +1,18 @@
/**
* Copyright 2020 OPSLI https://www.opsli.com
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.opsli.common.thread; package org.opsli.common.thread;
import java.util.function.Function; import java.util.function.Function;
@ -24,11 +39,4 @@ public interface AsyncProcessExecutor {
*/ */
boolean execute(); boolean execute();
/**
*
* @param callback
* @return boolean
*/
boolean executeErrorCallback(Function<Runnable, Void> callback);
} }

@ -94,26 +94,6 @@ public class AsyncProcessExecutorByNormal implements AsyncProcessExecutor{
return true; return true;
} }
@Override
public boolean executeErrorCallback(Function<Runnable, Void> callback) {
if(CollUtil.isEmpty(this.taskList)){
return true;
}
for (Runnable task : this.taskList) {
// 多线程执行任务
boolean execute = this.execute(task);
// 执行任务被拒绝 门闩减1 计数器不动 End
if(!execute){
// 线程池失败后 返回该 Runnable
callback.apply(task);
}
}
return false;
}
// ==================================== // ====================================
/** /**
@ -134,7 +114,8 @@ public class AsyncProcessExecutorByNormal implements AsyncProcessExecutor{
private synchronized static AsyncProcessor getProcessor(String key){ private synchronized static AsyncProcessor getProcessor(String key){
AsyncProcessor asyncProcessor = EXECUTOR_MAP.get(key); AsyncProcessor asyncProcessor = EXECUTOR_MAP.get(key);
if(null == asyncProcessor){ if(null == asyncProcessor){
asyncProcessor = new AsyncProcessor(key); asyncProcessor = new AsyncProcessor();
asyncProcessor.init(key);
EXECUTOR_MAP.put(key, asyncProcessor); EXECUTOR_MAP.put(key, asyncProcessor);
} }
return asyncProcessor; return asyncProcessor;

@ -16,21 +16,21 @@
package org.opsli.common.thread; package org.opsli.common.thread;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
/** /**
* 线 * 线
* 线线 * 线线
* 202111214:07:54 线
* *
* @author * @author
* @date 2020-12-10 10:36 * @date 2020-12-10 10:36
@ -44,19 +44,23 @@ public class AsyncProcessExecutorByWait implements AsyncProcessExecutor {
/** 线程Key */ /** 线程Key */
private final String key; private final String key;
/** 任务执行计数器 */
private AtomicInteger count;
/** 任务队列 */ /** 任务队列 */
private final List<Runnable> taskList; private final List<Callable<Object>> taskList;
/** 执行器 */ /** 执行器 */
private final AsyncProcessor processor; private final AsyncProcessor processor;
/** /**
* *
*/ */
public AsyncProcessExecutorByWait(){ public AsyncProcessExecutorByWait(){
this.key = "def"; this.key = "def";
taskList = new ArrayList<>(); taskList = new ArrayList<>();
processor = AsyncProcessExecutorByWait.getProcessor(this.key); processor = getProcessor(this.key);
} }
/** /**
@ -66,17 +70,18 @@ public class AsyncProcessExecutorByWait implements AsyncProcessExecutor {
public AsyncProcessExecutorByWait(String key){ public AsyncProcessExecutorByWait(String key){
this.key = key; this.key = key;
taskList = new ArrayList<>(); taskList = new ArrayList<>();
processor = AsyncProcessExecutorByWait.getProcessor(this.key); processor = getProcessor(this.key);
} }
/** /**
* *
* Runnable Callable
* @param task * @param task
*/ */
@Override @Override
public AsyncProcessExecutorByWait put(final Runnable task){ public AsyncProcessExecutor put(final Runnable task){
taskList.add(task); taskList.add(Executors.callable(task));
return this; return this;
} }
@ -89,58 +94,25 @@ public class AsyncProcessExecutorByWait implements AsyncProcessExecutor {
return true; return true;
} }
// 锁 // 初始化锁参数
AsyncWaitLock lock = new AsyncWaitLock(this.taskList.size()); count = new AtomicInteger(this.taskList.size());
// 门闩 线程锁
for (Runnable task : this.taskList) { CountDownLatch latch = new CountDownLatch(this.taskList.size());
// 多线程执行任务
boolean execute = this.execute(task, lock);
// 执行任务被拒绝 门闩减1 计数器不动 End
if(!execute){
lock.getLatch().countDown();
}
}
// 线程锁 等待查询结果 结果完成后继续执行
try {
lock.getLatch().await();
}catch (Exception e){
log.error(e.getMessage(), e);
}finally {
this.taskList.clear();
}
// 返回执行结果
return lock.getCount().get() == 0;
}
/**
* 线
*/
@Override
public boolean executeErrorCallback(Function<Runnable, Void> callback){
if(CollUtil.isEmpty(this.taskList)){
return true;
}
// 锁
AsyncWaitLock lock = new AsyncWaitLock(this.taskList.size());
for (Runnable task : this.taskList) {
// 多线程执行任务
boolean execute = this.execute(task, lock);
// 执行任务被拒绝 门闩减1 计数器不动 End
if(!execute){
// 线程池失败后 返回该 Runnable
callback.apply(task);
lock.getLatch().countDown(); for (Callable<Object> task : this.taskList) {
} // 回调减 门闩
processor.executeTaskAndCallback(task, (result)->{
if(result.getSuccess()){
count.decrementAndGet();
}
latch.countDown();
return null;
});
} }
// 线程锁 等待查询结果 结果完成后继续执行 // 线程锁 等待查询结果 结果完成后继续执行
try { try {
lock.getLatch().await(); latch.await();
}catch (Exception e){ }catch (Exception e){
log.error(e.getMessage(), e); log.error(e.getMessage(), e);
}finally { }finally {
@ -148,18 +120,7 @@ public class AsyncProcessExecutorByWait implements AsyncProcessExecutor {
} }
// 返回执行结果 // 返回执行结果
return lock.getCount().get() == 0; return count.get() == 0;
}
/**
*
*
* @param task
* @return boolean
*/
private boolean execute(final Runnable task, final AsyncWaitLock lock) {
return processor.executeTask(new TaskWrapper(task, lock));
} }
/** /**
@ -170,82 +131,11 @@ public class AsyncProcessExecutorByWait implements AsyncProcessExecutor {
private synchronized static AsyncProcessor getProcessor(String key){ private synchronized static AsyncProcessor getProcessor(String key){
AsyncProcessor asyncProcessor = EXECUTOR_MAP.get(key); AsyncProcessor asyncProcessor = EXECUTOR_MAP.get(key);
if(null == asyncProcessor){ if(null == asyncProcessor){
asyncProcessor = new AsyncProcessor(key); asyncProcessor = new AsyncProcessor();
asyncProcessor.init(key);
EXECUTOR_MAP.put(key, asyncProcessor); EXECUTOR_MAP.put(key, asyncProcessor);
} }
return asyncProcessor; return asyncProcessor;
} }
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
/**
* 线
*
* @author Parker
* @date 2020-10-08 10:24
*/
@Getter
public static class AsyncWaitLock {
/** 门闩 */
private final CountDownLatch latch;
/** 计数器 */
private final AtomicInteger count;
public AsyncWaitLock(int count){
// 初始化锁参数
this.count = new AtomicInteger(count);
// 门闩 线程锁
this.latch = new CountDownLatch(count);
}
}
/**
* Task <br>
* Executor <br>
*/
public static class TaskWrapper implements Runnable {
private final Runnable gift;
private final CountDownLatch latch;
private final AtomicInteger count;
public TaskWrapper(final Runnable target) {
this.gift = target;
this.count = null;
this.latch = null;
}
public TaskWrapper(final Runnable target, final AsyncWaitLock lock) {
if (lock == null) {
this.gift = null;
this.count = null;
this.latch = null;
return;
}
this.gift = target;
this.count = lock.getCount();
this.latch = lock.getLatch();
}
@Override
public void run() {
// 捕获异常,避免在 Executor 里面被吞掉了
if (gift != null) {
try {
gift.run();
// 标示已执行
count.decrementAndGet();
} catch (Exception e) {
String errMsg = StrUtil.format("线程池-包装的目标执行异常: {}", e.getMessage());
log.error(errMsg, e);
} finally {
latch.countDown();
}
}
}
}
} }

@ -0,0 +1,214 @@
/**
* Copyright 2020 OPSLI https://www.opsli.com
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.opsli.common.thread;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.google.common.collect.Maps;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 线
* 线线
*
* @author
* @date 2020-12-10 10:36
*/
@Slf4j
@Deprecated
public class AsyncProcessExecutorOldByWait implements AsyncProcessExecutor {
/** 线程池字典 */
private static final Map<String, AsyncProcessor> EXECUTOR_MAP = Maps.newConcurrentMap();
/** 线程Key */
private final String key;
/** 任务队列 */
private final List<Runnable> taskList;
/** 执行器 */
private final AsyncProcessor processor;
/**
*
*/
public AsyncProcessExecutorOldByWait(){
this.key = "def";
taskList = new ArrayList<>();
processor = getProcessor(this.key);
}
/**
*
* @param key 线Key
*/
public AsyncProcessExecutorOldByWait(String key){
this.key = key;
taskList = new ArrayList<>();
processor = getProcessor(this.key);
}
/**
*
* @param task
*/
@Override
public AsyncProcessExecutor put(final Runnable task){
taskList.add(task);
return this;
}
/**
* 线
*/
@Override
public boolean execute(){
if(CollUtil.isEmpty(this.taskList)){
return true;
}
// 锁
AsyncWaitLock lock = new AsyncWaitLock(this.taskList.size());
for (Runnable task : this.taskList) {
// 多线程执行任务
boolean execute = this.execute(task, lock);
// 执行任务被拒绝 门闩减1 计数器不动 End
if(!execute){
lock.getLatch().countDown();
}
}
// 线程锁 等待查询结果 结果完成后继续执行
try {
lock.getLatch().await();
}catch (Exception e){
log.error(e.getMessage(), e);
}finally {
this.taskList.clear();
}
// 返回执行结果
return lock.getCount().get() == 0;
}
/**
*
*
* @param task
* @return boolean
*/
private boolean execute(final Runnable task, final AsyncWaitLock lock) {
return processor.executeTask(new TaskWrapper(task, lock));
}
/**
*
* @param key Key
* @return AsyncProcessor
*/
private synchronized static AsyncProcessor getProcessor(String key){
AsyncProcessor asyncProcessor = EXECUTOR_MAP.get(key);
if(null == asyncProcessor){
asyncProcessor = new AsyncProcessor();
asyncProcessor.init(key);
EXECUTOR_MAP.put(key, asyncProcessor);
}
return asyncProcessor;
}
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
/**
* 线
*
* @author Parker
* @date 2020-10-08 10:24
*/
@Getter
public static class AsyncWaitLock {
/** 门闩 */
private final CountDownLatch latch;
/** 计数器 */
private final AtomicInteger count;
public AsyncWaitLock(int count){
// 初始化锁参数
this.count = new AtomicInteger(count);
// 门闩 线程锁
this.latch = new CountDownLatch(count);
}
}
/**
* Task <br>
* Executor <br>
*/
public static class TaskWrapper implements Runnable {
private final Runnable gift;
private final CountDownLatch latch;
private final AtomicInteger count;
public TaskWrapper(final Runnable target) {
this.gift = target;
this.count = null;
this.latch = null;
}
public TaskWrapper(final Runnable target, final AsyncWaitLock lock) {
if (lock == null) {
this.gift = null;
this.count = null;
this.latch = null;
return;
}
this.gift = target;
this.count = lock.getCount();
this.latch = lock.getLatch();
}
@Override
public void run() {
// 捕获异常,避免在 Executor 里面被吞掉了
if (gift != null) {
try {
gift.run();
// 标示已执行
count.decrementAndGet();
} catch (Exception e) {
String errMsg = StrUtil.format("线程池-包装的目标执行异常: {}", e.getMessage());
log.error(errMsg, e);
} finally {
latch.countDown();
}
}
}
}
}

@ -1,11 +1,16 @@
package org.opsli.common.thread; package org.opsli.common.thread;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.google.common.util.concurrent.*;
import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.opsli.common.thread.ThreadPoolFactory;
import java.util.concurrent.*; import java.util.concurrent.Callable;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
/** /**
* 线 - 线 * 线 - 线
@ -16,20 +21,10 @@ import java.util.concurrent.*;
@Slf4j @Slf4j
public class AsyncProcessor { public class AsyncProcessor {
/**
* <br>
*/
private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2;
/** /**
* 线 * 线
*/ */
private static final String THREAD_POOL_NAME = "AsyncProcessPool-{}-%d"; private static final String THREAD_POOL_NAME = "AsyncProcessorWaitPool-{}-%d";
/**
*
*/
private static final int DEFAULT_SIZE = 1024;
/** /**
* 线 * 线
@ -37,59 +32,51 @@ public class AsyncProcessor {
private static final int DEFAULT_WAIT_TIME = 10; private static final int DEFAULT_WAIT_TIME = 10;
/** /**
* 线 * 线
*/ */
private static final long DEFAULT_KEEP_ALIVE = 60L; private ListeningExecutorService execute;
/**
*
*/
private ExecutorService execute;
/** /**
* *
* @param key 线
*/ */
public AsyncProcessor(String key){ public void init(String key){
if(StringUtils.isBlank(key)){ if(StringUtils.isBlank(key)){
return; return;
} }
// 线程工厂名称 // 线程工厂名称
String formatThreadPoolName = StrUtil.format(THREAD_POOL_NAME, key); String formatThreadPoolName = StrUtil.format(THREAD_POOL_NAME, key);
BasicThreadFactory basicThreadFactory = new BasicThreadFactory.Builder()
.namingPattern(formatThreadPoolName)
.daemon(true).build();
// 创建 Executor // 创建 Executor
// 此处默认最大值改为处理器数量的 4 倍 // 此处默认最大值改为处理器数量的 4 倍
try { try {
// 执行队列 // 监听执行器
BlockingQueue<Runnable> executorQueue = new ArrayBlockingQueue<>(DEFAULT_SIZE); execute = MoreExecutors.listeningDecorator(
execute = new ThreadPoolExecutor(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT * 4, DEFAULT_KEEP_ALIVE, ThreadPoolFactory.createDefThreadPool(formatThreadPoolName));
TimeUnit.SECONDS, executorQueue, basicThreadFactory);
// 这里不会自动关闭线程, 当线程超过阈值时 抛异常 // 这里不会自动关闭线程, 当线程超过阈值时 抛异常
// 关闭事件的挂钩 // 关闭事件的挂钩
Runtime.getRuntime().addShutdownHook(new Thread(() -> { Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("AsyncProcessorWait 异步处理器关闭"); log.info("ProcessorWait 异步处理器关闭");
execute.shutdown(); execute.shutdown();
try { try {
// 等待1秒执行关闭 // 等待1秒执行关闭
if (!execute.awaitTermination(DEFAULT_WAIT_TIME, TimeUnit.SECONDS)) { if (!execute.awaitTermination(DEFAULT_WAIT_TIME, TimeUnit.SECONDS)) {
log.error("AsyncProcessorWait 由于等待超时,异步处理器立即关闭"); log.error("ProcessorWait 由于等待超时,异步处理器立即关闭");
execute.shutdownNow(); execute.shutdownNow();
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.error("AsyncProcessorWait 异步处理器关闭中断"); log.error("ProcessorWait 异步处理器关闭中断");
execute.shutdownNow(); execute.shutdownNow();
} }
log.info("AsyncProcessorWait 异步处理器关闭完成"); log.info("ProcessorWait 异步处理器关闭完成");
})); }));
} catch (Exception e) { } catch (Exception e) {
log.error("AsyncProcessorWait 异步处理器初始化错误", e); log.error("ProcessorWait 异步处理器初始化错误", e);
throw new ExceptionInInitializerError(e); throw new ExceptionInInitializerError(e);
} }
} }
@ -99,8 +86,8 @@ public class AsyncProcessor {
* <br> * <br>
* {@link } * {@link }
* *
* @param task * @param task
* @return * @return boolean
*/ */
public boolean executeTask(Runnable task) { public boolean executeTask(Runnable task) {
try { try {
@ -117,14 +104,43 @@ public class AsyncProcessor {
* {@link } * {@link }
* *
* @param task * @param task
* @return Future<T>
*/ */
public <T> Future<T> submitTask(Callable<T> task) { public <T> void executeTaskAndCallback(Callable<T> task, Function<CallbackResult<T>, Void> callback) {
try { ListenableFuture<T> future = execute.submit(task);
return execute.submit(task); Futures.addCallback(future, new FutureCallback<T>() {
} catch (RejectedExecutionException e) { @Override
log.error("AsyncProcessorWait 执行任务被拒绝", e); public void onSuccess(T result) {
throw new UnsupportedOperationException("AsyncProcessorWait 无法提交任务,已被拒绝", e); CallbackResult<T> callbackResult = new CallbackResult<>();
} callbackResult.setSuccess(true);
callbackResult.setResult(result);
// 线程池失败后 返回该 Runnable
callback.apply(callbackResult);
}
@Override
public void onFailure(Throwable t) {
log.error("线程名称:{} - 执行异常信息:{}", Thread.currentThread().getName(), t.getMessage());
CallbackResult<T> callbackResult = new CallbackResult<>();
callbackResult.setSuccess(false);
callback.apply(callbackResult);
}
}, execute);
}
// =================
/**
*
* @param <T>
*/
@Data
public static class CallbackResult<T>{
/** 状态 */
private Boolean success;
/** 结果 */
private T result;
} }
} }

@ -0,0 +1,101 @@
package org.opsli.common.thread;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 线
*
* @author Parker
* @date 2021/11/2 10:48
*/
public final class ThreadPoolFactory {
/**
* <br>
*/
private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2;
/**
* 线
*/
private static final long DEFAULT_KEEP_ALIVE = 60L;
/**
*
*/
private static final int DEFAULT_SIZE = 1024;
/**
* 线
*/
private static final String DEFAULT_THREAD_POOL_NAME = "ProcessPool-{}-%d";
/**
* 线
* @return ThreadPoolExecutor
*/
public static ThreadPoolExecutor createDefThreadPool(){
return createInitThreadPool(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT * 4, DEFAULT_KEEP_ALIVE,
TimeUnit.SECONDS, DEFAULT_SIZE, DEFAULT_THREAD_POOL_NAME, new ThreadPoolExecutor.CallerRunsPolicy());
}
/**
* 线
*
* @param poolName 线
* @return ThreadPoolExecutor
*/
public static ThreadPoolExecutor createDefThreadPool(String poolName){
return createInitThreadPool(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT * 4, DEFAULT_KEEP_ALIVE,
TimeUnit.SECONDS, DEFAULT_SIZE, poolName, new ThreadPoolExecutor.CallerRunsPolicy());
}
/**
* 线
*
* @param maxConcurrent 线
* @param poolName 线
* @return ThreadPoolExecutor
*/
public static ThreadPoolExecutor createDefThreadPool(int maxConcurrent, String poolName){
return createInitThreadPool(maxConcurrent, maxConcurrent * 4, DEFAULT_KEEP_ALIVE,
TimeUnit.SECONDS, DEFAULT_SIZE, poolName, new ThreadPoolExecutor.CallerRunsPolicy());
}
/**
* 线
* @param coreConcurrent 线
* @param maxConcurrent 线
* @param keepAlive 线
* @param timeUnit 线
* @param queueSize
* @param poolName 线
* @param handler
* @return ThreadPoolExecutor
*/
public static ThreadPoolExecutor createInitThreadPool(final int coreConcurrent,
final int maxConcurrent,
final long keepAlive,
final TimeUnit timeUnit,
final int queueSize,
final String poolName,
final RejectedExecutionHandler handler
){
return new ThreadPoolExecutor(coreConcurrent, maxConcurrent, keepAlive, timeUnit,
new LinkedBlockingDeque<>(queueSize),
new ThreadFactoryBuilder().setNameFormat(poolName).build(),
handler
);
}
private ThreadPoolFactory(){}
}

@ -0,0 +1,44 @@
package org.opsli.core.api;
import org.opsli.common.annotation.ApiVersion;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.util.Assert;
import org.springframework.web.servlet.mvc.condition.RequestCondition;
import org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerMapping;
import java.lang.reflect.Method;
import java.util.Objects;
/**
* RequestMappingHandlerMapping
*
* @author Parker
* @date 2021102712:40:45
*/
public class ApiRequestMappingHandlerMapping extends RequestMappingHandlerMapping {
@Override
protected RequestCondition<?> getCustomTypeCondition(Class<?> handlerType) {
// 扫描类上的 @ApiVersion
ApiVersion apiVersion = AnnotationUtils.findAnnotation(handlerType, ApiVersion.class);
return createRequestCondition(apiVersion);
}
@Override
protected RequestCondition<?> getCustomMethodCondition(Method method) {
// 扫描方法上的 @ApiVersion
ApiVersion apiVersion = AnnotationUtils.findAnnotation(method, ApiVersion.class);
return createRequestCondition(apiVersion);
}
private RequestCondition<ApiVersionCondition> createRequestCondition(ApiVersion apiVersion) {
if (Objects.isNull(apiVersion)) {
return null;
}
int value = apiVersion.value();
Assert.isTrue(value >= 1, "Api Version Must be greater than or equal to 1");
return new ApiVersionCondition(value);
}
}

@ -0,0 +1,63 @@
package org.opsli.core.api;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.servlet.mvc.condition.RequestCondition;
import javax.servlet.http.HttpServletRequest;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* API
*
*
* @author Parker
* @date 2021102712:39:09
*/
@Data
@Slf4j
public class ApiVersionCondition implements RequestCondition<ApiVersionCondition> {
/**
* : api/v[1-n]/fun
*/
private final static Pattern VERSION_PREFIX = Pattern.compile("/v(\\d+)/");
private int apiVersion;
ApiVersionCondition(int apiVersion) {
this.apiVersion = apiVersion;
}
/**
* @ApiVersion > @ApiVersion
*/
@Override
public ApiVersionCondition combine(ApiVersionCondition other) {
return new ApiVersionCondition(other.getApiVersion());
}
/**
* ApiVersionCondition
*/
@Override
public ApiVersionCondition getMatchingCondition(HttpServletRequest request) {
Matcher m = VERSION_PREFIX.matcher(request.getRequestURI());
if (m.find()) {
int version = Integer.parseInt(m.group(1));
if (version >= getApiVersion()) {
return this;
}
}
return null;
}
/**
* ApiVersionCondition
*/
@Override
public int compareTo(ApiVersionCondition other, HttpServletRequest request) {
return other.getApiVersion() - getApiVersion();
}
}

@ -17,7 +17,9 @@ package org.opsli.core.autoconfigure.conf;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.opsli.common.annotation.ApiRestController; import org.opsli.common.annotation.ApiRestController;
import org.opsli.core.api.ApiRequestMappingHandlerMapping;
import org.opsli.core.autoconfigure.properties.ApiPathProperties; import org.opsli.core.autoconfigure.properties.ApiPathProperties;
import org.springframework.boot.autoconfigure.web.servlet.WebMvcRegistrations;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
import org.springframework.web.cors.CorsConfiguration; import org.springframework.web.cors.CorsConfiguration;
@ -25,6 +27,7 @@ import org.springframework.web.cors.UrlBasedCorsConfigurationSource;
import org.springframework.web.filter.CorsFilter; import org.springframework.web.filter.CorsFilter;
import org.springframework.web.servlet.config.annotation.PathMatchConfigurer; import org.springframework.web.servlet.config.annotation.PathMatchConfigurer;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
import org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerMapping;
import javax.annotation.Resource; import javax.annotation.Resource;
@ -36,11 +39,20 @@ import javax.annotation.Resource;
*/ */
@Slf4j @Slf4j
@Configuration @Configuration
public class SpringWebMvcConfig implements WebMvcConfigurer { public class SpringWebMvcConfig implements WebMvcConfigurer, WebMvcRegistrations {
@Resource @Resource
private ApiPathProperties apiPathProperties; private ApiPathProperties apiPathProperties;
/**
* RequestMappingHandlerMapping
* @return RequestMappingHandlerMapping
*/
@Override
public RequestMappingHandlerMapping getRequestMappingHandlerMapping() {
return new ApiRequestMappingHandlerMapping();
}
/** /**
* ApiRestController * ApiRestController
* @param configurer * @param configurer

Loading…
Cancel
Save