From 209856a174bc93c3901682f261bf441a2a80a83a Mon Sep 17 00:00:00 2001 From: lianyiwuming Date: Fri, 10 Mar 2023 22:10:22 +0800 Subject: [PATCH] refactor of BlockingQueueTypeEnum (#1032) Co-authored-by: hongdan.qin --- .../support/BlockingQueueTypeEnum.java | 254 +++++++++++++----- .../web/exception/NotSupportedException.java | 12 + 2 files changed, 200 insertions(+), 66 deletions(-) create mode 100644 hippo4j-common/src/main/java/cn/hippo4j/common/web/exception/NotSupportedException.java diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/executor/support/BlockingQueueTypeEnum.java b/hippo4j-common/src/main/java/cn/hippo4j/common/executor/support/BlockingQueueTypeEnum.java index 0333f699..30bb3920 100644 --- a/hippo4j-common/src/main/java/cn/hippo4j/common/executor/support/BlockingQueueTypeEnum.java +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/executor/support/BlockingQueueTypeEnum.java @@ -18,10 +18,12 @@ package cn.hippo4j.common.executor.support; import cn.hippo4j.common.spi.DynamicThreadPoolServiceLoader; +import cn.hippo4j.common.web.exception.NotSupportedException; import lombok.Getter; -import java.util.Arrays; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; @@ -31,7 +33,9 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.SynchronousQueue; -import java.util.stream.Stream; +import java.util.function.Predicate; + +import static cn.hippo4j.common.web.exception.ErrorCodeEnum.SERVICE_ERROR; /** * Blocking queue type enum. @@ -41,37 +45,72 @@ public enum BlockingQueueTypeEnum { /** * {@link java.util.concurrent.ArrayBlockingQueue} */ - ARRAY_BLOCKING_QUEUE(1, "ArrayBlockingQueue"), + ARRAY_BLOCKING_QUEUE(1, "ArrayBlockingQueue") { + @Override + BlockingQueue of(Integer capacity) { + return new ArrayBlockingQueue<>(capacity); + } + }, /** * {@link java.util.concurrent.LinkedBlockingQueue} */ - LINKED_BLOCKING_QUEUE(2, "LinkedBlockingQueue"), + LINKED_BLOCKING_QUEUE(2, "LinkedBlockingQueue") { + @Override + BlockingQueue of(Integer capacity) { + return new LinkedBlockingQueue<>(capacity); + } + }, /** * {@link java.util.concurrent.LinkedBlockingDeque} */ - LINKED_BLOCKING_DEQUE(3, "LinkedBlockingDeque"), + LINKED_BLOCKING_DEQUE(3, "LinkedBlockingDeque") { + @Override + BlockingQueue of() { + return new LinkedBlockingDeque<>(); + } + }, /** * {@link java.util.concurrent.SynchronousQueue} */ - SYNCHRONOUS_QUEUE(4, "SynchronousQueue"), + SYNCHRONOUS_QUEUE(4, "SynchronousQueue") { + @Override + BlockingQueue of() { + return new SynchronousQueue<>(); + } + }, /** * {@link java.util.concurrent.LinkedTransferQueue} */ - LINKED_TRANSFER_QUEUE(5, "LinkedTransferQueue"), + LINKED_TRANSFER_QUEUE(5, "LinkedTransferQueue") { + @Override + BlockingQueue of() { + return new LinkedTransferQueue<>(); + } + }, /** * {@link java.util.concurrent.PriorityBlockingQueue} */ - PRIORITY_BLOCKING_QUEUE(6, "PriorityBlockingQueue"), + PRIORITY_BLOCKING_QUEUE(6, "PriorityBlockingQueue") { + @Override + BlockingQueue of(Integer capacity) { + return new PriorityBlockingQueue<>(capacity); + } + }, /** * {@link ResizableCapacityLinkedBlockingQueue} */ - RESIZABLE_LINKED_BLOCKING_QUEUE(9, "ResizableCapacityLinkedBlockingQueue"); + RESIZABLE_LINKED_BLOCKING_QUEUE(9, "ResizableCapacityLinkedBlockingQueue") { + @Override + BlockingQueue of(Integer capacity) { + return new ResizableCapacityLinkedBlockingQueue<>(capacity); + } + }; @Getter private Integer type; @@ -84,80 +123,163 @@ public enum BlockingQueueTypeEnum { this.name = name; } - private static final int DEFAULT_CAPACITY = 1024; + private static Map typeToEnumMap; + private static Map nameToEnumMap; static { - DynamicThreadPoolServiceLoader.register(CustomBlockingQueue.class); + final BlockingQueueTypeEnum[] values = BlockingQueueTypeEnum.values(); + typeToEnumMap = new HashMap<>(values.length); + nameToEnumMap = new HashMap<>(values.length); + for (BlockingQueueTypeEnum value : values) { + typeToEnumMap.put(value.type, value); + nameToEnumMap.put(value.name, value); + } } - public static BlockingQueue createBlockingQueue(String blockingQueueName, Integer capacity) { - BlockingQueue blockingQueue = null; - BlockingQueueTypeEnum queueTypeEnum = Stream.of(BlockingQueueTypeEnum.values()) - .filter(each -> Objects.equals(each.name, blockingQueueName)) - .findFirst() - .orElse(null); - if (queueTypeEnum != null) { - blockingQueue = createBlockingQueue(queueTypeEnum.type, capacity); - if (Objects.equals(blockingQueue.getClass().getSimpleName(), blockingQueueName)) { - return blockingQueue; - } + /** + * Create the specified implement of BlockingQueue with init capacity. + * Abstract method, depends on sub override + * + * @param capacity the capacity of the queue + * @param the class of the objects in the BlockingQueue + * @return a BlockingQueue view of the specified T + */ + BlockingQueue of(Integer capacity) { + throw new NotSupportedException("该队列必须有界", SERVICE_ERROR); + } + + /** + * Create the specified implement of BlockingQueue,has no capacity limit. + * Abstract method, depends on sub override + * + * @param the class of the objects in the BlockingQueue + * @return a BlockingQueue view of the specified T + * @throws NotSupportedException + */ + BlockingQueue of() { + throw new NotSupportedException("该队列不支持有界", SERVICE_ERROR); + } + + /** + * Creates a BlockingQueue with the given {@link BlockingQueueTypeEnum#name BlockingQueueTypeEnum.name} + * and capacity. + * + * @param blockingQueueName {@link BlockingQueueTypeEnum#name BlockingQueueTypeEnum.name} + * @param capacity the capacity of the BlockingQueue + * @param the class of the objects in the BlockingQueue + * @return a BlockingQueue view of the specified T + */ + private static BlockingQueue of(String blockingQueueName, Integer capacity) { + final BlockingQueueTypeEnum typeEnum = nameToEnumMap.get(blockingQueueName); + if (typeEnum == null) { + return null; } - Collection customBlockingQueues = DynamicThreadPoolServiceLoader - .getSingletonServiceInstances(CustomBlockingQueue.class); - blockingQueue = Optional.ofNullable(blockingQueue) - .orElseGet( - () -> customBlockingQueues.stream() - .filter(each -> Objects.equals(blockingQueueName, each.getName())) - .map(each -> each.generateBlockingQueue()) - .findFirst() - .orElseGet(() -> { - int temCapacity = capacity; - if (capacity == null || capacity <= 0) { - temCapacity = DEFAULT_CAPACITY; - } - return new LinkedBlockingQueue(temCapacity); - })); - return blockingQueue; + return Objects.isNull(capacity) ? typeEnum.of() : typeEnum.of(capacity); } - public static BlockingQueue createBlockingQueue(int type, Integer capacity) { - BlockingQueue blockingQueue = null; - if (Objects.equals(type, ARRAY_BLOCKING_QUEUE.type)) { - blockingQueue = new ArrayBlockingQueue(capacity); - } else if (Objects.equals(type, LINKED_BLOCKING_QUEUE.type)) { - blockingQueue = new LinkedBlockingQueue(capacity); - } else if (Objects.equals(type, LINKED_BLOCKING_DEQUE.type)) { - blockingQueue = new LinkedBlockingDeque(capacity); - } else if (Objects.equals(type, SYNCHRONOUS_QUEUE.type)) { - blockingQueue = new SynchronousQueue(); - } else if (Objects.equals(type, LINKED_TRANSFER_QUEUE.type)) { - blockingQueue = new LinkedTransferQueue(); - } else if (Objects.equals(type, PRIORITY_BLOCKING_QUEUE.type)) { - blockingQueue = new PriorityBlockingQueue(capacity); - } else if (Objects.equals(type, RESIZABLE_LINKED_BLOCKING_QUEUE.type)) { - blockingQueue = new ResizableCapacityLinkedBlockingQueue(capacity); + /** + * Creates a BlockingQueue with the given {@link BlockingQueueTypeEnum#type BlockingQueueTypeEnum.type} + * and capacity. + * + * @param type {@link BlockingQueueTypeEnum#type BlockingQueueTypeEnum.type} + * @param capacity the capacity of the BlockingQueue + * @param the class of the objects in the BlockingQueue + * @return a BlockingQueue view of the specified T + */ + private static BlockingQueue of(int type, Integer capacity) { + final BlockingQueueTypeEnum typeEnum = typeToEnumMap.get(type); + if (typeEnum == null) { + return null; } + return Objects.isNull(capacity) ? typeEnum.of() : typeEnum.of(capacity); + } + + private static final int DEFAULT_CAPACITY = 1024; + + static { + DynamicThreadPoolServiceLoader.register(CustomBlockingQueue.class); + } + + + private static BlockingQueue customOrDefaultQueue(Integer capacity, Predicate predicate) { Collection customBlockingQueues = DynamicThreadPoolServiceLoader .getSingletonServiceInstances(CustomBlockingQueue.class); - blockingQueue = Optional.ofNullable(blockingQueue).orElseGet(() -> customBlockingQueues.stream() - .filter(each -> Objects.equals(type, each.getType())) + + return customBlockingQueues.stream() + .filter(predicate) .map(each -> each.generateBlockingQueue()) .findFirst() - .orElse(new LinkedBlockingQueue(capacity))); - return blockingQueue; + .orElseGet(() -> { + int temCapacity = capacity; + if (capacity == null || capacity <= 0) { + temCapacity = DEFAULT_CAPACITY; + } + return new LinkedBlockingQueue(temCapacity); + }); + } + + /** + * Creates a BlockingQueue with the given {@link BlockingQueueTypeEnum#name BlockingQueueTypeEnum.name} + * and capacity. if can't find the blockingQueueName with {@link BlockingQueueTypeEnum#name BlockingQueueTypeEnum.name}, + * create custom or default BlockingQueue {@link BlockingQueueTypeEnum#customOrDefaultQueue BlockingQueueTypeEnum.customOrDefaultQueue}. + * + * @param blockingQueueName {@link BlockingQueueTypeEnum#name BlockingQueueTypeEnum.name} + * @param capacity the capacity of the BlockingQueue + * @param the class of the objects in the BlockingQueue + * @return a BlockingQueue view of the specified T + */ + public static BlockingQueue createBlockingQueue(String blockingQueueName, Integer capacity) { + final BlockingQueue of = of(blockingQueueName, capacity); + if (of != null) { + return of; + } + + return customOrDefaultQueue(capacity, + (customeQueue) -> Objects.equals(customeQueue.getName(), blockingQueueName)); } + /** + * Creates a BlockingQueue with the given {@link BlockingQueueTypeEnum#type BlockingQueueTypeEnum.type} + * and capacity. if can't find the blockingQueueName with {@link BlockingQueueTypeEnum#type BlockingQueueTypeEnum.type}, + * create custom or default BlockingQueue {@link BlockingQueueTypeEnum#customOrDefaultQueue BlockingQueueTypeEnum.customOrDefaultQueue}. + * + * @param type {@link BlockingQueueTypeEnum#type BlockingQueueTypeEnum.type} + * @param capacity the capacity of the BlockingQueue + * @param the class of the objects in the BlockingQueue + * @return a BlockingQueue view of the specified T + */ + public static BlockingQueue createBlockingQueue(int type, Integer capacity) { + final BlockingQueue of = of(type, capacity); + if (of != null) { + return of; + } + + return customOrDefaultQueue(capacity, + (customeQueue) -> Objects.equals(customeQueue.getType(), type)); + } + + /** + * Map {@link BlockingQueueTypeEnum#type BlockingQueueTypeEnum.type } to {@link BlockingQueueTypeEnum#name BlockingQueueTypeEnum.name } + * or "" if can't mapping. + * + * @param type {@link BlockingQueueTypeEnum#type BlockingQueueTypeEnum.type} + * @return {@link BlockingQueueTypeEnum#name BlockingQueueTypeEnum.name } or "". + */ public static String getBlockingQueueNameByType(int type) { - Optional queueTypeEnum = Arrays.stream(BlockingQueueTypeEnum.values()) - .filter(each -> each.type == type) - .findFirst(); - return queueTypeEnum.map(each -> each.name).orElse(""); + return Optional.ofNullable(typeToEnumMap.get(type)) + .map(value -> value.getName()) + .orElse(""); } + /** + * find {@link BlockingQueueTypeEnum} by {@link BlockingQueueTypeEnum#name BlockingQueueTypeEnum.name } + * or {@link BlockingQueueTypeEnum#LINKED_BLOCKING_QUEUE} if can't mapping. + * + * @param name {@link BlockingQueueTypeEnum#name BlockingQueueTypeEnum.name } + * @return enum {@link BlockingQueueTypeEnum} + */ public static BlockingQueueTypeEnum getBlockingQueueTypeEnumByName(String name) { - Optional queueTypeEnum = Arrays.stream(BlockingQueueTypeEnum.values()) - .filter(each -> each.name.equals(name)) - .findFirst(); - return queueTypeEnum.orElse(LINKED_BLOCKING_QUEUE); + return Optional.ofNullable(nameToEnumMap.get(name)) + .orElse(LINKED_BLOCKING_QUEUE); } } diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/web/exception/NotSupportedException.java b/hippo4j-common/src/main/java/cn/hippo4j/common/web/exception/NotSupportedException.java new file mode 100644 index 00000000..261b5b45 --- /dev/null +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/web/exception/NotSupportedException.java @@ -0,0 +1,12 @@ +package cn.hippo4j.common.web.exception; + +/** + * This exception is thrown when a context implementation does not support the operation being invoked. + */ +public class NotSupportedException extends AbstractException { + + public NotSupportedException(String message, ErrorCode errorCode) { + super(message, null, errorCode); + } + +}