From 39ad08ded36d58a27b7d6ff1424927f156650523 Mon Sep 17 00:00:00 2001 From: "hongdan.qin" Date: Thu, 8 Dec 2022 16:17:12 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=20BlockingQueueTypeEnum?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../support/BlockingQueueTypeEnum.java | 208 ++++++++++++------ .../executor/support/CustomBlockingQueue.java | 4 +- .../support/MyArrayBlockingQueue.java | 26 +++ .../spi/DynamicThreadPoolServiceLoader.java | 28 ++- .../web/exception/NotSupportedException.java | 13 ++ .../toolkit/BlockingQueueTypeEnumTest.java | 119 ++++++++++ .../hippo4j/common/toolkit/GenericeTest.java | 22 ++ ...ommon.executor.support.CustomBlockingQueue | 1 + 8 files changed, 341 insertions(+), 80 deletions(-) create mode 100644 hippo4j-common/src/main/java/cn/hippo4j/common/executor/support/MyArrayBlockingQueue.java create mode 100644 hippo4j-common/src/main/java/cn/hippo4j/common/web/exception/NotSupportedException.java create mode 100644 hippo4j-common/src/test/java/cn/hippo4j/common/toolkit/BlockingQueueTypeEnumTest.java create mode 100644 hippo4j-common/src/test/java/cn/hippo4j/common/toolkit/GenericeTest.java create mode 100644 hippo4j-common/src/test/resources/META-INF/services/cn.hippo4j.common.executor.support.CustomBlockingQueue diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/executor/support/BlockingQueueTypeEnum.java b/hippo4j-common/src/main/java/cn/hippo4j/common/executor/support/BlockingQueueTypeEnum.java index 0333f699..3b44b5d3 100644 --- a/hippo4j-common/src/main/java/cn/hippo4j/common/executor/support/BlockingQueueTypeEnum.java +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/executor/support/BlockingQueueTypeEnum.java @@ -18,20 +18,14 @@ package cn.hippo4j.common.executor.support; import cn.hippo4j.common.spi.DynamicThreadPoolServiceLoader; +import cn.hippo4j.common.web.exception.NotSupportedException; import lombok.Getter; -import java.util.Arrays; -import java.util.Collection; -import java.util.Objects; -import java.util.Optional; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.LinkedTransferQueue; -import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.SynchronousQueue; -import java.util.stream.Stream; +import java.util.*; +import java.util.concurrent.*; +import java.util.function.Predicate; + +import static cn.hippo4j.common.web.exception.ErrorCodeEnum.SERVICE_ERROR; /** * Blocking queue type enum. @@ -41,37 +35,72 @@ public enum BlockingQueueTypeEnum { /** * {@link java.util.concurrent.ArrayBlockingQueue} */ - ARRAY_BLOCKING_QUEUE(1, "ArrayBlockingQueue"), + ARRAY_BLOCKING_QUEUE(1, "ArrayBlockingQueue") { + @Override + BlockingQueue of(Integer capacity) { + return new ArrayBlockingQueue<>(capacity); + } + }, /** * {@link java.util.concurrent.LinkedBlockingQueue} */ - LINKED_BLOCKING_QUEUE(2, "LinkedBlockingQueue"), + LINKED_BLOCKING_QUEUE(2, "LinkedBlockingQueue") { + @Override + BlockingQueue of(Integer capacity) { + return new LinkedBlockingQueue<>(capacity); + } + }, /** * {@link java.util.concurrent.LinkedBlockingDeque} */ - LINKED_BLOCKING_DEQUE(3, "LinkedBlockingDeque"), + LINKED_BLOCKING_DEQUE(3, "LinkedBlockingDeque") { + @Override + BlockingQueue of() { + return new LinkedBlockingDeque<>(); + } + }, /** * {@link java.util.concurrent.SynchronousQueue} */ - SYNCHRONOUS_QUEUE(4, "SynchronousQueue"), + SYNCHRONOUS_QUEUE(4, "SynchronousQueue") { + @Override + BlockingQueue of() { + return new SynchronousQueue<>(); + } + }, /** * {@link java.util.concurrent.LinkedTransferQueue} */ - LINKED_TRANSFER_QUEUE(5, "LinkedTransferQueue"), + LINKED_TRANSFER_QUEUE(5, "LinkedTransferQueue") { + @Override + BlockingQueue of() { + return new LinkedTransferQueue<>(); + } + }, /** * {@link java.util.concurrent.PriorityBlockingQueue} */ - PRIORITY_BLOCKING_QUEUE(6, "PriorityBlockingQueue"), + PRIORITY_BLOCKING_QUEUE(6, "PriorityBlockingQueue") { + @Override + BlockingQueue of(Integer capacity) { + return new PriorityBlockingQueue<>(capacity); + } + }, /** * {@link ResizableCapacityLinkedBlockingQueue} */ - RESIZABLE_LINKED_BLOCKING_QUEUE(9, "ResizableCapacityLinkedBlockingQueue"); + RESIZABLE_LINKED_BLOCKING_QUEUE(9, "ResizableCapacityLinkedBlockingQueue") { + @Override + BlockingQueue of(Integer capacity) { + return new ResizableCapacityLinkedBlockingQueue<>(capacity); + } + }; @Getter private Integer type; @@ -84,67 +113,110 @@ public enum BlockingQueueTypeEnum { this.name = name; } - private static final int DEFAULT_CAPACITY = 1024; + private static Map typeToEnumMap; + private static Map nameToEnumMap; static { - DynamicThreadPoolServiceLoader.register(CustomBlockingQueue.class); + final BlockingQueueTypeEnum[] values = BlockingQueueTypeEnum.values(); + typeToEnumMap = new HashMap<>(values.length); + nameToEnumMap = new HashMap<>(values.length); + for (BlockingQueueTypeEnum value : values) { + typeToEnumMap.put(value.type, value); + nameToEnumMap.put(value.name, value); + } } - public static BlockingQueue createBlockingQueue(String blockingQueueName, Integer capacity) { - BlockingQueue blockingQueue = null; - BlockingQueueTypeEnum queueTypeEnum = Stream.of(BlockingQueueTypeEnum.values()) - .filter(each -> Objects.equals(each.name, blockingQueueName)) - .findFirst() - .orElse(null); - if (queueTypeEnum != null) { - blockingQueue = createBlockingQueue(queueTypeEnum.type, capacity); - if (Objects.equals(blockingQueue.getClass().getSimpleName(), blockingQueueName)) { - return blockingQueue; - } + /** + * 子类按需实现,默认不支持该实例化方式 + * + * @param capacity + * @return + */ + BlockingQueue of(Integer capacity) { + throw new NotSupportedException("该队列必须有界", SERVICE_ERROR); + } + + /** + * 子类按需实现,默认不支持该实例化方式 + * + * @return + */ + BlockingQueue of() { + throw new NotSupportedException("该队列不支持有界", SERVICE_ERROR); + } + + private static BlockingQueue of(String blockingQueueName, Integer capacity) { + final BlockingQueueTypeEnum typeEnum = nameToEnumMap.get(blockingQueueName); + if (typeEnum == null) { + return null; } - Collection customBlockingQueues = DynamicThreadPoolServiceLoader - .getSingletonServiceInstances(CustomBlockingQueue.class); - blockingQueue = Optional.ofNullable(blockingQueue) - .orElseGet( - () -> customBlockingQueues.stream() - .filter(each -> Objects.equals(blockingQueueName, each.getName())) - .map(each -> each.generateBlockingQueue()) - .findFirst() - .orElseGet(() -> { - int temCapacity = capacity; - if (capacity == null || capacity <= 0) { - temCapacity = DEFAULT_CAPACITY; - } - return new LinkedBlockingQueue(temCapacity); - })); - return blockingQueue; + return Objects.isNull(capacity) ? typeEnum.of() : typeEnum.of(capacity); } - public static BlockingQueue createBlockingQueue(int type, Integer capacity) { - BlockingQueue blockingQueue = null; - if (Objects.equals(type, ARRAY_BLOCKING_QUEUE.type)) { - blockingQueue = new ArrayBlockingQueue(capacity); - } else if (Objects.equals(type, LINKED_BLOCKING_QUEUE.type)) { - blockingQueue = new LinkedBlockingQueue(capacity); - } else if (Objects.equals(type, LINKED_BLOCKING_DEQUE.type)) { - blockingQueue = new LinkedBlockingDeque(capacity); - } else if (Objects.equals(type, SYNCHRONOUS_QUEUE.type)) { - blockingQueue = new SynchronousQueue(); - } else if (Objects.equals(type, LINKED_TRANSFER_QUEUE.type)) { - blockingQueue = new LinkedTransferQueue(); - } else if (Objects.equals(type, PRIORITY_BLOCKING_QUEUE.type)) { - blockingQueue = new PriorityBlockingQueue(capacity); - } else if (Objects.equals(type, RESIZABLE_LINKED_BLOCKING_QUEUE.type)) { - blockingQueue = new ResizableCapacityLinkedBlockingQueue(capacity); + private static BlockingQueue of(int type, Integer capacity) { + final BlockingQueueTypeEnum typeEnum = typeToEnumMap.get(type); + if (typeEnum == null) { + return null; } + return Objects.isNull(capacity) ? typeEnum.of() : typeEnum.of(capacity); + } + + + private static final int DEFAULT_CAPACITY = 1024; + + static { + DynamicThreadPoolServiceLoader.register(CustomBlockingQueue.class); + } + + private static BlockingQueue customOrDefaultQueue(Integer capacity, Predicate predicate) { Collection customBlockingQueues = DynamicThreadPoolServiceLoader .getSingletonServiceInstances(CustomBlockingQueue.class); - blockingQueue = Optional.ofNullable(blockingQueue).orElseGet(() -> customBlockingQueues.stream() - .filter(each -> Objects.equals(type, each.getType())) + + return customBlockingQueues.stream() + .filter(predicate) .map(each -> each.generateBlockingQueue()) .findFirst() - .orElse(new LinkedBlockingQueue(capacity))); - return blockingQueue; + .orElseGet(() -> { + int temCapacity = capacity; + if (capacity == null || capacity <= 0) { + temCapacity = DEFAULT_CAPACITY; + } + return new LinkedBlockingQueue(temCapacity); + }); + } + + /** + * 根据队列名创建对应的队列 + * + * @param blockingQueueName {@linkplain BlockingQueueTypeEnum#name} 队列值 + * @param capacity 队列大小,如果不支持 队列名 对应队列. 则是默认队列 LinkedBlockingQueue 的大小 + * @return 返回要求的队列,或者 LinkedBlockingQueue 队列 + */ + public static BlockingQueue createBlockingQueue(String blockingQueueName, Integer capacity) { + final BlockingQueue of = of(blockingQueueName, capacity); + if (of != null) { + return of; + } + + return customOrDefaultQueue(capacity, + (customeQueue) -> Objects.equals(customeQueue.getName(), blockingQueueName)); + } + + /** + * 根据队列名创建对应的队列 + * + * @param type {@linkplain BlockingQueueTypeEnum#type} 队列值 + * @param capacity 队列大小,如果不支持 队列名 对应队列. 则是默认队列 LinkedBlockingQueue 的大小 + * @return 返回要求的队列,或者 LinkedBlockingQueue 队列 + */ + public static BlockingQueue createBlockingQueue(int type, Integer capacity) { + final BlockingQueue of = of(type, capacity); + if (of != null) { + return of; + } + + return customOrDefaultQueue(capacity, + (customeQueue) -> Objects.equals(customeQueue.getType(), type)); } public static String getBlockingQueueNameByType(int type) { diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/executor/support/CustomBlockingQueue.java b/hippo4j-common/src/main/java/cn/hippo4j/common/executor/support/CustomBlockingQueue.java index 6800b06a..52631e05 100644 --- a/hippo4j-common/src/main/java/cn/hippo4j/common/executor/support/CustomBlockingQueue.java +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/executor/support/CustomBlockingQueue.java @@ -22,7 +22,7 @@ import java.util.concurrent.BlockingQueue; /** * Custom blocking-queue. */ -public interface CustomBlockingQueue { +public interface CustomBlockingQueue { /** * Gets the custom blocking queue type. @@ -45,5 +45,5 @@ public interface CustomBlockingQueue { * * @return */ - BlockingQueue generateBlockingQueue(); + BlockingQueue generateBlockingQueue(); } diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/executor/support/MyArrayBlockingQueue.java b/hippo4j-common/src/main/java/cn/hippo4j/common/executor/support/MyArrayBlockingQueue.java new file mode 100644 index 00000000..37b9fd75 --- /dev/null +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/executor/support/MyArrayBlockingQueue.java @@ -0,0 +1,26 @@ +package cn.hippo4j.common.executor.support; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * @author hongdan.qin + * @date 2022/12/6 18:09 + */ +public class MyArrayBlockingQueue implements CustomBlockingQueue { + + @Override + public Integer getType() { + return null; + } + + @Override + public String getName() { + return this.getClass().getSimpleName(); + } + + @Override + public BlockingQueue generateBlockingQueue() { + return new LinkedBlockingQueue<>(20); + } +} diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/spi/DynamicThreadPoolServiceLoader.java b/hippo4j-common/src/main/java/cn/hippo4j/common/spi/DynamicThreadPoolServiceLoader.java index da118e98..02e5a642 100644 --- a/hippo4j-common/src/main/java/cn/hippo4j/common/spi/DynamicThreadPoolServiceLoader.java +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/spi/DynamicThreadPoolServiceLoader.java @@ -17,23 +17,31 @@ package cn.hippo4j.common.spi; +import cn.hippo4j.common.spi.annotation.SingletonSPI; +import cn.hippo4j.common.web.exception.NotSupportedException; + import java.lang.reflect.InvocationTargetException; -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedList; -import java.util.Map; -import java.util.ServiceLoader; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -import cn.hippo4j.common.spi.annotation.SingletonSPI; +import static cn.hippo4j.common.web.exception.ErrorCodeEnum.SERVICE_ERROR; /** * Dynamic thread-pool service loader. */ public class DynamicThreadPoolServiceLoader { - private static final Map, Collection> SERVICES = new ConcurrentHashMap<>(); + /** + * 类型安全的异构容器。 + * key : SPI interface + * value : key‘s subtype instance collection + */ + private static final Map, Collection> SERVICES = new ConcurrentHashMap<>(); + + private DynamicThreadPoolServiceLoader() { + throw new NotSupportedException("不支持实例化", SERVICE_ERROR); + } /** * Register. @@ -53,8 +61,8 @@ public class DynamicThreadPoolServiceLoader { * @param * @return */ - private static Collection load(final Class serviceInterface) { - Collection result = new LinkedList<>(); + private static Collection load(final Class serviceInterface) { + Collection result = new LinkedList<>(); for (T each : ServiceLoader.load(serviceInterface)) { result.add(each); } @@ -63,7 +71,7 @@ public class DynamicThreadPoolServiceLoader { /** * Get Service instances - * + * * @param serviceClass serviceClass * @param * @return diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/web/exception/NotSupportedException.java b/hippo4j-common/src/main/java/cn/hippo4j/common/web/exception/NotSupportedException.java new file mode 100644 index 00000000..4352973e --- /dev/null +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/web/exception/NotSupportedException.java @@ -0,0 +1,13 @@ +package cn.hippo4j.common.web.exception; + +/** + * @author hongdan.qin + * @date 2022/12/5 19:42 + */ +public class NotSupportedException extends AbstractException { + + public NotSupportedException(String message, ErrorCode errorCode) { + super(message, null, errorCode); + } + +} diff --git a/hippo4j-common/src/test/java/cn/hippo4j/common/toolkit/BlockingQueueTypeEnumTest.java b/hippo4j-common/src/test/java/cn/hippo4j/common/toolkit/BlockingQueueTypeEnumTest.java new file mode 100644 index 00000000..71ddab96 --- /dev/null +++ b/hippo4j-common/src/test/java/cn/hippo4j/common/toolkit/BlockingQueueTypeEnumTest.java @@ -0,0 +1,119 @@ +package cn.hippo4j.common.toolkit; + +import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum; +import cn.hippo4j.common.spi.TestInterfaceSPI; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.ServiceLoader; +import java.util.concurrent.BlockingQueue; + +/** + * @author hongdan.qin + * @date 2022/12/5 20:01 + */ +public class BlockingQueueTypeEnumTest { + @Test + void unboundQueue() { + final BlockingQueue arrayBlockingQueue = BlockingQueueTypeEnum.createBlockingQueue("ArrayBlockingQueue", 10); + System.out.println(arrayBlockingQueue); + } + + @Test + void wildGeneric() { + List ll = new ArrayList(); + System.out.println(ll.get(0)); +// 无法编译 +// ll.add("dd"); + } + + public static List ll() { + return null; + } + + @Test + void classLoader() { + for (TestInterfaceSPI spi : ServiceLoader.load(TestInterfaceSPI.class)) { + System.out.println(spi); + } + } + + @Test + void test202212061813() { + final BlockingQueue arrayBlockingQueue = BlockingQueueTypeEnum.createBlockingQueue("MyArrayBlockingQueue", null); + System.out.println(arrayBlockingQueue); +// arrayBlockingQueue.add("dd"); + + BlockingQueue workQueue = BlockingQueueTypeEnum.createBlockingQueue("MyArrayBlockingQueue", 20); + workQueue.add("dd"); + System.out.println(workQueue); + + BlockingQueue workQueue1 = BlockingQueueTypeEnum.createBlockingQueue("LinkedBlockingQueue", 20); + workQueue1.add(new Runnable() { + @Override + public void run() { + } + }); + + System.out.println(workQueue1); + } + + interface O { + R say(T content); + } + + abstract class S implements O { + public abstract R say(T content); + + public WR write(WT content) { + return null; + } + } + + class C extends S { + @Override + public R say(T content) { + return null; + } + + @Override + public WR1 write(WT1 content) { + return super.write(content); + } + } + + class D extends S { + @Override + public R say(T content) { + return null; + } +// can't be compile +// @Override +// public String write(String content) { +// return super.write(content); +// } + + // @Override + public WR write(WT content) { + return super.write(content); + } + } + + @Test + void test() { + D d = new D<>(); + S l = d; + d.write("ddd"); + l.write(Integer.valueOf(2)); + l.write("ddd"); + } + + @Test + void test202212071347(){ + final List ls = Arrays.asList("sd", "2"); + System.out.println(ls); + } + +} diff --git a/hippo4j-common/src/test/java/cn/hippo4j/common/toolkit/GenericeTest.java b/hippo4j-common/src/test/java/cn/hippo4j/common/toolkit/GenericeTest.java new file mode 100644 index 00000000..78085fb4 --- /dev/null +++ b/hippo4j-common/src/test/java/cn/hippo4j/common/toolkit/GenericeTest.java @@ -0,0 +1,22 @@ +package cn.hippo4j.common.toolkit; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; + +/** + * @author hongdan.qin + * @date 2022/12/7 14:43 + */ +public class GenericeTest { + public void test() { + final List ls = Arrays.asList("sd", "2"); + System.out.println(ls.get(0)); + } + + @Test + public void test1() { + System.out.println(System.getProperty("java.class.path")); + } +} diff --git a/hippo4j-common/src/test/resources/META-INF/services/cn.hippo4j.common.executor.support.CustomBlockingQueue b/hippo4j-common/src/test/resources/META-INF/services/cn.hippo4j.common.executor.support.CustomBlockingQueue new file mode 100644 index 00000000..7630fad0 --- /dev/null +++ b/hippo4j-common/src/test/resources/META-INF/services/cn.hippo4j.common.executor.support.CustomBlockingQueue @@ -0,0 +1 @@ +cn.hippo4j.common.executor.support.MyArrayBlockingQueue \ No newline at end of file