feat: 线程池阻塞队列 SPI 方式集成.

pull/161/head
chen.ma 3 years ago
parent 3501e5fd55
commit fbed405dd9

@ -1,52 +0,0 @@
package io.dynamic.threadpool.common.enums;
/**
*
*
* @author chen.ma
* @date 2021/6/25 12:30
*/
public enum QueueTypeEnum {
/**
* {@link java.util.concurrent.ArrayBlockingQueue}
*/
ARRAY_BLOCKING_QUEUE(1),
/**
* {@link java.util.concurrent.LinkedBlockingQueue}
*/
LINKED_BLOCKING_QUEUE(2),
/**
* {@link java.util.concurrent.LinkedBlockingDeque}
*/
LINKED_BLOCKING_DEQUE(3),
/**
* {@link java.util.concurrent.SynchronousQueue}
*/
SYNCHRONOUS_QUEUE(4),
/**
* {@link java.util.concurrent.LinkedTransferQueue}
*/
LINKED_TRANSFER_QUEUE(5),
/**
* {@link java.util.concurrent.PriorityBlockingQueue}
*/
PRIORITY_BLOCKING_QUEUE(6),
/**
* {@link "io.dynamic.threadpool.starter.core.ResizableCapacityLinkedBlockIngQueue"}
*/
RESIZABLE_LINKED_BLOCKING_QUEUE(9);
public Integer type;
QueueTypeEnum(int type) {
this.type = type;
}
}

@ -1,7 +1,7 @@
package io.dynamic.threadpool.starter.adapter;
import io.dynamic.threadpool.common.config.ApplicationContextHolder;
import io.dynamic.threadpool.common.enums.QueueTypeEnum;
import io.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum;
import io.dynamic.threadpool.starter.operation.ThreadPoolOperation;
import io.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder;
import io.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrap;

@ -1,6 +1,6 @@
package io.dynamic.threadpool.starter.common;
import io.dynamic.threadpool.common.enums.QueueTypeEnum;
import io.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum;
import io.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder;
import io.dynamic.threadpool.starter.toolkit.thread.RejectedPolicies;

@ -1,6 +1,6 @@
package io.dynamic.threadpool.starter.handler;
import io.dynamic.threadpool.common.enums.QueueTypeEnum;
import io.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum;
import io.dynamic.threadpool.starter.core.ResizableCapacityLinkedBlockIngQueue;
import lombok.extern.slf4j.Slf4j;

@ -9,9 +9,9 @@ import io.dynamic.threadpool.starter.banner.DynamicThreadPoolBanner;
import io.dynamic.threadpool.starter.common.CommonThreadPool;
import io.dynamic.threadpool.starter.config.DynamicThreadPoolProperties;
import io.dynamic.threadpool.starter.core.GlobalThreadPoolManage;
import io.dynamic.threadpool.starter.toolkit.thread.QueueTypeEnum;
import io.dynamic.threadpool.starter.remote.HttpAgent;
import io.dynamic.threadpool.starter.remote.ServerHttpAgent;
import io.dynamic.threadpool.starter.toolkit.BlockingQueueUtil;
import io.dynamic.threadpool.starter.toolkit.thread.ThreadPoolBuilder;
import io.dynamic.threadpool.starter.wrap.CustomThreadPoolExecutor;
import io.dynamic.threadpool.starter.wrap.DynamicThreadPoolWrap;
@ -61,7 +61,7 @@ public class ThreadPoolRunListener {
result = httpAgent.httpGet(Constants.CONFIG_CONTROLLER_PATH, null, queryStrMap, 3000L);
if (result.isSuccess() && result.getData() != null && (ppi = JSON.toJavaObject((JSON) result.getData(), PoolParameterInfo.class)) != null) {
// 使用相关参数创建线程池
BlockingQueue workQueue = BlockingQueueUtil.createBlockingQueue(ppi.getQueueType(), ppi.getCapacity());
BlockingQueue workQueue = QueueTypeEnum.createBlockingQueue(ppi.getQueueType(), ppi.getCapacity());
ThreadPoolExecutor poolExecutor = ThreadPoolBuilder.builder()
.isCustomPool(true)
.poolThreadSize(ppi.getCoreSize(), ppi.getMaxSize())

@ -0,0 +1,27 @@
package io.dynamic.threadpool.starter.spi.queue;
import java.util.concurrent.BlockingQueue;
/**
*
*
* @author chen.ma
* @date 2021/7/11 00:51
*/
public interface CustomBlockingQueue {
/**
*
*
* @return
*/
Integer getType();
/**
*
*
* @return
*/
BlockingQueue generateBlockingQueue();
}

@ -1,38 +0,0 @@
package io.dynamic.threadpool.starter.toolkit;
import io.dynamic.threadpool.common.enums.QueueTypeEnum;
import io.dynamic.threadpool.starter.core.ResizableCapacityLinkedBlockIngQueue;
import java.util.Objects;
import java.util.concurrent.*;
/**
*
*
* @author chen.ma
* @date 2021/6/20 16:50
*/
public class BlockingQueueUtil {
public static BlockingQueue createBlockingQueue(Integer type, Integer capacity) {
BlockingQueue blockingQueue = null;
if (Objects.equals(type, QueueTypeEnum.ARRAY_BLOCKING_QUEUE.type)) {
blockingQueue = new ArrayBlockingQueue(capacity);
} else if (Objects.equals(type, QueueTypeEnum.LINKED_BLOCKING_QUEUE.type)) {
blockingQueue = new LinkedBlockingQueue(capacity);
} else if (Objects.equals(type, QueueTypeEnum.LINKED_BLOCKING_DEQUE.type)) {
blockingQueue = new LinkedBlockingDeque(capacity);
} else if (Objects.equals(type, QueueTypeEnum.SYNCHRONOUS_QUEUE.type)) {
blockingQueue = new SynchronousQueue();
} else if (Objects.equals(type, QueueTypeEnum.LINKED_TRANSFER_QUEUE.type)) {
blockingQueue = new LinkedTransferQueue();
} else if (Objects.equals(type, QueueTypeEnum.PRIORITY_BLOCKING_QUEUE.type)) {
blockingQueue = new PriorityBlockingQueue(capacity);
} else if (Objects.equals(type, QueueTypeEnum.RESIZABLE_LINKED_BLOCKING_QUEUE.type)) {
blockingQueue = new ResizableCapacityLinkedBlockIngQueue(capacity);
} else {
throw new IllegalArgumentException("未找到类型匹配的阻塞队列.");
}
return blockingQueue;
}
}

@ -0,0 +1,93 @@
package io.dynamic.threadpool.starter.toolkit.thread;
import io.dynamic.threadpool.starter.core.ResizableCapacityLinkedBlockIngQueue;
import io.dynamic.threadpool.starter.spi.DynamicTpServiceLoader;
import io.dynamic.threadpool.starter.spi.queue.CustomBlockingQueue;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.*;
/**
*
*
* @author chen.ma
* @date 2021/6/25 12:30
*/
public enum QueueTypeEnum {
/**
* {@link java.util.concurrent.ArrayBlockingQueue}
*/
ARRAY_BLOCKING_QUEUE(1),
/**
* {@link java.util.concurrent.LinkedBlockingQueue}
*/
LINKED_BLOCKING_QUEUE(2),
/**
* {@link java.util.concurrent.LinkedBlockingDeque}
*/
LINKED_BLOCKING_DEQUE(3),
/**
* {@link java.util.concurrent.SynchronousQueue}
*/
SYNCHRONOUS_QUEUE(4),
/**
* {@link java.util.concurrent.LinkedTransferQueue}
*/
LINKED_TRANSFER_QUEUE(5),
/**
* {@link java.util.concurrent.PriorityBlockingQueue}
*/
PRIORITY_BLOCKING_QUEUE(6),
/**
* {@link "io.dynamic.threadpool.starter.core.ResizableCapacityLinkedBlockIngQueue"}
*/
RESIZABLE_LINKED_BLOCKING_QUEUE(9);
public Integer type;
QueueTypeEnum(int type) {
this.type = type;
}
static {
DynamicTpServiceLoader.register(CustomBlockingQueue.class);
}
public static BlockingQueue createBlockingQueue(Integer type, Integer capacity) {
BlockingQueue blockingQueue = null;
if (Objects.equals(type, ARRAY_BLOCKING_QUEUE.type)) {
blockingQueue = new ArrayBlockingQueue(capacity);
} else if (Objects.equals(type, LINKED_BLOCKING_QUEUE.type)) {
blockingQueue = new LinkedBlockingQueue(capacity);
} else if (Objects.equals(type, LINKED_BLOCKING_DEQUE.type)) {
blockingQueue = new LinkedBlockingDeque(capacity);
} else if (Objects.equals(type, SYNCHRONOUS_QUEUE.type)) {
blockingQueue = new SynchronousQueue();
} else if (Objects.equals(type, LINKED_TRANSFER_QUEUE.type)) {
blockingQueue = new LinkedTransferQueue();
} else if (Objects.equals(type, PRIORITY_BLOCKING_QUEUE.type)) {
blockingQueue = new PriorityBlockingQueue(capacity);
} else if (Objects.equals(type, RESIZABLE_LINKED_BLOCKING_QUEUE.type)) {
blockingQueue = new ResizableCapacityLinkedBlockIngQueue(capacity);
}
Collection<CustomBlockingQueue> customBlockingQueues = DynamicTpServiceLoader
.getSingletonServiceInstances(CustomBlockingQueue.class);
blockingQueue = Optional.ofNullable(blockingQueue).orElseGet(() -> customBlockingQueues.stream()
.filter(each -> Objects.equals(type, each.getType()))
.map(each -> each.generateBlockingQueue())
.findFirst()
.orElse(new LinkedBlockingQueue(capacity)));
return blockingQueue;
}
}

@ -1,10 +1,8 @@
package io.dynamic.threadpool.starter.toolkit.thread;
import io.dynamic.threadpool.common.enums.QueueTypeEnum;
import io.dynamic.threadpool.common.toolkit.Assert;
import io.dynamic.threadpool.starter.builder.Builder;
import io.dynamic.threadpool.starter.toolkit.BlockingQueueUtil;
import java.math.BigDecimal;
import java.util.concurrent.*;
@ -239,7 +237,7 @@ public class ThreadPoolBuilder implements Builder<ThreadPoolExecutor> {
// 快速消费线程池内置指定线程池
if (!builder.isFastPool) {
if (builder.workQueue == null) {
builder.workQueue = BlockingQueueUtil.createBlockingQueue(builder.queueType.type, builder.capacity);
builder.workQueue = QueueTypeEnum.createBlockingQueue(builder.queueType.type, builder.capacity);
}
initParam.setWorkQueue(builder.workQueue);
}

Loading…
Cancel
Save