refactor of BlockingQueueTypeEnum (#1032)

Co-authored-by: hongdan.qin <hongdan.qin@dmall.com>
pull/1098/head
lianyiwuming 1 year ago committed by GitHub
parent b64c78d114
commit 209856a174
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -18,10 +18,12 @@
package cn.hippo4j.common.executor.support;
import cn.hippo4j.common.spi.DynamicThreadPoolServiceLoader;
import cn.hippo4j.common.web.exception.NotSupportedException;
import lombok.Getter;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
@ -31,7 +33,9 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.stream.Stream;
import java.util.function.Predicate;
import static cn.hippo4j.common.web.exception.ErrorCodeEnum.SERVICE_ERROR;
/**
* Blocking queue type enum.
@ -41,37 +45,72 @@ public enum BlockingQueueTypeEnum {
/**
* {@link java.util.concurrent.ArrayBlockingQueue}
*/
ARRAY_BLOCKING_QUEUE(1, "ArrayBlockingQueue"),
ARRAY_BLOCKING_QUEUE(1, "ArrayBlockingQueue") {
@Override
<T> BlockingQueue<T> of(Integer capacity) {
return new ArrayBlockingQueue<>(capacity);
}
},
/**
* {@link java.util.concurrent.LinkedBlockingQueue}
*/
LINKED_BLOCKING_QUEUE(2, "LinkedBlockingQueue"),
LINKED_BLOCKING_QUEUE(2, "LinkedBlockingQueue") {
@Override
<T> BlockingQueue<T> of(Integer capacity) {
return new LinkedBlockingQueue<>(capacity);
}
},
/**
* {@link java.util.concurrent.LinkedBlockingDeque}
*/
LINKED_BLOCKING_DEQUE(3, "LinkedBlockingDeque"),
LINKED_BLOCKING_DEQUE(3, "LinkedBlockingDeque") {
@Override
<T> BlockingQueue<T> of() {
return new LinkedBlockingDeque<>();
}
},
/**
* {@link java.util.concurrent.SynchronousQueue}
*/
SYNCHRONOUS_QUEUE(4, "SynchronousQueue"),
SYNCHRONOUS_QUEUE(4, "SynchronousQueue") {
@Override
<T> BlockingQueue<T> of() {
return new SynchronousQueue<>();
}
},
/**
* {@link java.util.concurrent.LinkedTransferQueue}
*/
LINKED_TRANSFER_QUEUE(5, "LinkedTransferQueue"),
LINKED_TRANSFER_QUEUE(5, "LinkedTransferQueue") {
@Override
<T> BlockingQueue<T> of() {
return new LinkedTransferQueue<>();
}
},
/**
* {@link java.util.concurrent.PriorityBlockingQueue}
*/
PRIORITY_BLOCKING_QUEUE(6, "PriorityBlockingQueue"),
PRIORITY_BLOCKING_QUEUE(6, "PriorityBlockingQueue") {
@Override
<T> BlockingQueue<T> of(Integer capacity) {
return new PriorityBlockingQueue<>(capacity);
}
},
/**
* {@link ResizableCapacityLinkedBlockingQueue}
*/
RESIZABLE_LINKED_BLOCKING_QUEUE(9, "ResizableCapacityLinkedBlockingQueue");
RESIZABLE_LINKED_BLOCKING_QUEUE(9, "ResizableCapacityLinkedBlockingQueue") {
@Override
<T> BlockingQueue<T> of(Integer capacity) {
return new ResizableCapacityLinkedBlockingQueue<>(capacity);
}
};
@Getter
private Integer type;
@ -84,80 +123,163 @@ public enum BlockingQueueTypeEnum {
this.name = name;
}
private static final int DEFAULT_CAPACITY = 1024;
private static Map<Integer, BlockingQueueTypeEnum> typeToEnumMap;
private static Map<String, BlockingQueueTypeEnum> nameToEnumMap;
static {
DynamicThreadPoolServiceLoader.register(CustomBlockingQueue.class);
final BlockingQueueTypeEnum[] values = BlockingQueueTypeEnum.values();
typeToEnumMap = new HashMap<>(values.length);
nameToEnumMap = new HashMap<>(values.length);
for (BlockingQueueTypeEnum value : values) {
typeToEnumMap.put(value.type, value);
nameToEnumMap.put(value.name, value);
}
}
public static BlockingQueue createBlockingQueue(String blockingQueueName, Integer capacity) {
BlockingQueue blockingQueue = null;
BlockingQueueTypeEnum queueTypeEnum = Stream.of(BlockingQueueTypeEnum.values())
.filter(each -> Objects.equals(each.name, blockingQueueName))
.findFirst()
.orElse(null);
if (queueTypeEnum != null) {
blockingQueue = createBlockingQueue(queueTypeEnum.type, capacity);
if (Objects.equals(blockingQueue.getClass().getSimpleName(), blockingQueueName)) {
return blockingQueue;
}
/**
* Create the specified implement of BlockingQueue with init capacity.
* Abstract method, depends on sub override
*
* @param capacity the capacity of the queue
* @param <T> the class of the objects in the BlockingQueue
* @return a BlockingQueue view of the specified T
*/
<T> BlockingQueue<T> of(Integer capacity) {
throw new NotSupportedException("该队列必须有界", SERVICE_ERROR);
}
/**
* Create the specified implement of BlockingQueue,has no capacity limit.
* Abstract method, depends on sub override
*
* @param <T> the class of the objects in the BlockingQueue
* @return a BlockingQueue view of the specified T
* @throws NotSupportedException
*/
<T> BlockingQueue<T> of() {
throw new NotSupportedException("该队列不支持有界", SERVICE_ERROR);
}
/**
* Creates a BlockingQueue with the given {@link BlockingQueueTypeEnum#name BlockingQueueTypeEnum.name}
* and capacity.
*
* @param blockingQueueName {@link BlockingQueueTypeEnum#name BlockingQueueTypeEnum.name}
* @param capacity the capacity of the BlockingQueue
* @param <T> the class of the objects in the BlockingQueue
* @return a BlockingQueue view of the specified T
*/
private static <T> BlockingQueue<T> of(String blockingQueueName, Integer capacity) {
final BlockingQueueTypeEnum typeEnum = nameToEnumMap.get(blockingQueueName);
if (typeEnum == null) {
return null;
}
Collection<CustomBlockingQueue> customBlockingQueues = DynamicThreadPoolServiceLoader
.getSingletonServiceInstances(CustomBlockingQueue.class);
blockingQueue = Optional.ofNullable(blockingQueue)
.orElseGet(
() -> customBlockingQueues.stream()
.filter(each -> Objects.equals(blockingQueueName, each.getName()))
.map(each -> each.generateBlockingQueue())
.findFirst()
.orElseGet(() -> {
int temCapacity = capacity;
if (capacity == null || capacity <= 0) {
temCapacity = DEFAULT_CAPACITY;
}
return new LinkedBlockingQueue(temCapacity);
}));
return blockingQueue;
return Objects.isNull(capacity) ? typeEnum.of() : typeEnum.of(capacity);
}
public static BlockingQueue createBlockingQueue(int type, Integer capacity) {
BlockingQueue blockingQueue = null;
if (Objects.equals(type, ARRAY_BLOCKING_QUEUE.type)) {
blockingQueue = new ArrayBlockingQueue(capacity);
} else if (Objects.equals(type, LINKED_BLOCKING_QUEUE.type)) {
blockingQueue = new LinkedBlockingQueue(capacity);
} else if (Objects.equals(type, LINKED_BLOCKING_DEQUE.type)) {
blockingQueue = new LinkedBlockingDeque(capacity);
} else if (Objects.equals(type, SYNCHRONOUS_QUEUE.type)) {
blockingQueue = new SynchronousQueue();
} else if (Objects.equals(type, LINKED_TRANSFER_QUEUE.type)) {
blockingQueue = new LinkedTransferQueue();
} else if (Objects.equals(type, PRIORITY_BLOCKING_QUEUE.type)) {
blockingQueue = new PriorityBlockingQueue(capacity);
} else if (Objects.equals(type, RESIZABLE_LINKED_BLOCKING_QUEUE.type)) {
blockingQueue = new ResizableCapacityLinkedBlockingQueue(capacity);
/**
* Creates a BlockingQueue with the given {@link BlockingQueueTypeEnum#type BlockingQueueTypeEnum.type}
* and capacity.
*
* @param type {@link BlockingQueueTypeEnum#type BlockingQueueTypeEnum.type}
* @param capacity the capacity of the BlockingQueue
* @param <T> the class of the objects in the BlockingQueue
* @return a BlockingQueue view of the specified T
*/
private static <T> BlockingQueue<T> of(int type, Integer capacity) {
final BlockingQueueTypeEnum typeEnum = typeToEnumMap.get(type);
if (typeEnum == null) {
return null;
}
return Objects.isNull(capacity) ? typeEnum.of() : typeEnum.of(capacity);
}
private static final int DEFAULT_CAPACITY = 1024;
static {
DynamicThreadPoolServiceLoader.register(CustomBlockingQueue.class);
}
private static <T> BlockingQueue<T> customOrDefaultQueue(Integer capacity, Predicate<CustomBlockingQueue> predicate) {
Collection<CustomBlockingQueue> customBlockingQueues = DynamicThreadPoolServiceLoader
.getSingletonServiceInstances(CustomBlockingQueue.class);
blockingQueue = Optional.ofNullable(blockingQueue).orElseGet(() -> customBlockingQueues.stream()
.filter(each -> Objects.equals(type, each.getType()))
return customBlockingQueues.stream()
.filter(predicate)
.map(each -> each.generateBlockingQueue())
.findFirst()
.orElse(new LinkedBlockingQueue(capacity)));
return blockingQueue;
.orElseGet(() -> {
int temCapacity = capacity;
if (capacity == null || capacity <= 0) {
temCapacity = DEFAULT_CAPACITY;
}
return new LinkedBlockingQueue<T>(temCapacity);
});
}
/**
* Creates a BlockingQueue with the given {@link BlockingQueueTypeEnum#name BlockingQueueTypeEnum.name}
* and capacity. if can't find the blockingQueueName with {@link BlockingQueueTypeEnum#name BlockingQueueTypeEnum.name},
* create custom or default BlockingQueue {@link BlockingQueueTypeEnum#customOrDefaultQueue BlockingQueueTypeEnum.customOrDefaultQueue}.
*
* @param blockingQueueName {@link BlockingQueueTypeEnum#name BlockingQueueTypeEnum.name}
* @param capacity the capacity of the BlockingQueue
* @param <T> the class of the objects in the BlockingQueue
* @return a BlockingQueue view of the specified T
*/
public static <T> BlockingQueue<T> createBlockingQueue(String blockingQueueName, Integer capacity) {
final BlockingQueue<T> of = of(blockingQueueName, capacity);
if (of != null) {
return of;
}
return customOrDefaultQueue(capacity,
(customeQueue) -> Objects.equals(customeQueue.getName(), blockingQueueName));
}
/**
* Creates a BlockingQueue with the given {@link BlockingQueueTypeEnum#type BlockingQueueTypeEnum.type}
* and capacity. if can't find the blockingQueueName with {@link BlockingQueueTypeEnum#type BlockingQueueTypeEnum.type},
* create custom or default BlockingQueue {@link BlockingQueueTypeEnum#customOrDefaultQueue BlockingQueueTypeEnum.customOrDefaultQueue}.
*
* @param type {@link BlockingQueueTypeEnum#type BlockingQueueTypeEnum.type}
* @param capacity the capacity of the BlockingQueue
* @param <T> the class of the objects in the BlockingQueue
* @return a BlockingQueue view of the specified T
*/
public static <T> BlockingQueue<T> createBlockingQueue(int type, Integer capacity) {
final BlockingQueue<T> of = of(type, capacity);
if (of != null) {
return of;
}
return customOrDefaultQueue(capacity,
(customeQueue) -> Objects.equals(customeQueue.getType(), type));
}
/**
* Map {@link BlockingQueueTypeEnum#type BlockingQueueTypeEnum.type } to {@link BlockingQueueTypeEnum#name BlockingQueueTypeEnum.name }
* or "" if can't mapping.
*
* @param type {@link BlockingQueueTypeEnum#type BlockingQueueTypeEnum.type}
* @return {@link BlockingQueueTypeEnum#name BlockingQueueTypeEnum.name } or "".
*/
public static String getBlockingQueueNameByType(int type) {
Optional<BlockingQueueTypeEnum> queueTypeEnum = Arrays.stream(BlockingQueueTypeEnum.values())
.filter(each -> each.type == type)
.findFirst();
return queueTypeEnum.map(each -> each.name).orElse("");
return Optional.ofNullable(typeToEnumMap.get(type))
.map(value -> value.getName())
.orElse("");
}
/**
* find {@link BlockingQueueTypeEnum} by {@link BlockingQueueTypeEnum#name BlockingQueueTypeEnum.name }
* or {@link BlockingQueueTypeEnum#LINKED_BLOCKING_QUEUE} if can't mapping.
*
* @param name {@link BlockingQueueTypeEnum#name BlockingQueueTypeEnum.name }
* @return enum {@link BlockingQueueTypeEnum}
*/
public static BlockingQueueTypeEnum getBlockingQueueTypeEnumByName(String name) {
Optional<BlockingQueueTypeEnum> queueTypeEnum = Arrays.stream(BlockingQueueTypeEnum.values())
.filter(each -> each.name.equals(name))
.findFirst();
return queueTypeEnum.orElse(LINKED_BLOCKING_QUEUE);
return Optional.ofNullable(nameToEnumMap.get(name))
.orElse(LINKED_BLOCKING_QUEUE);
}
}

@ -0,0 +1,12 @@
package cn.hippo4j.common.web.exception;
/**
* This exception is thrown when a context implementation does not support the operation being invoked.
*/
public class NotSupportedException extends AbstractException {
public NotSupportedException(String message, ErrorCode errorCode) {
super(message, null, errorCode);
}
}
Loading…
Cancel
Save