优化 BlockingQueueTypeEnum

pull/1027/head
hongdan.qin 3 years ago
parent 585c8f6178
commit 39ad08ded3

@ -18,20 +18,14 @@
package cn.hippo4j.common.executor.support;
import cn.hippo4j.common.spi.DynamicThreadPoolServiceLoader;
import cn.hippo4j.common.web.exception.NotSupportedException;
import lombok.Getter;
import java.util.Arrays;
import java.util.Collection;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.stream.Stream;
import java.util.*;
import java.util.concurrent.*;
import java.util.function.Predicate;
import static cn.hippo4j.common.web.exception.ErrorCodeEnum.SERVICE_ERROR;
/**
* Blocking queue type enum.
@ -41,37 +35,72 @@ public enum BlockingQueueTypeEnum {
/**
* {@link java.util.concurrent.ArrayBlockingQueue}
*/
ARRAY_BLOCKING_QUEUE(1, "ArrayBlockingQueue"),
ARRAY_BLOCKING_QUEUE(1, "ArrayBlockingQueue") {
@Override
<T> BlockingQueue<T> of(Integer capacity) {
return new ArrayBlockingQueue<>(capacity);
}
},
/**
* {@link java.util.concurrent.LinkedBlockingQueue}
*/
LINKED_BLOCKING_QUEUE(2, "LinkedBlockingQueue"),
LINKED_BLOCKING_QUEUE(2, "LinkedBlockingQueue") {
@Override
<T> BlockingQueue<T> of(Integer capacity) {
return new LinkedBlockingQueue<>(capacity);
}
},
/**
* {@link java.util.concurrent.LinkedBlockingDeque}
*/
LINKED_BLOCKING_DEQUE(3, "LinkedBlockingDeque"),
LINKED_BLOCKING_DEQUE(3, "LinkedBlockingDeque") {
@Override
<T> BlockingQueue<T> of() {
return new LinkedBlockingDeque<>();
}
},
/**
* {@link java.util.concurrent.SynchronousQueue}
*/
SYNCHRONOUS_QUEUE(4, "SynchronousQueue"),
SYNCHRONOUS_QUEUE(4, "SynchronousQueue") {
@Override
<T> BlockingQueue<T> of() {
return new SynchronousQueue<>();
}
},
/**
* {@link java.util.concurrent.LinkedTransferQueue}
*/
LINKED_TRANSFER_QUEUE(5, "LinkedTransferQueue"),
LINKED_TRANSFER_QUEUE(5, "LinkedTransferQueue") {
@Override
<T> BlockingQueue<T> of() {
return new LinkedTransferQueue<>();
}
},
/**
* {@link java.util.concurrent.PriorityBlockingQueue}
*/
PRIORITY_BLOCKING_QUEUE(6, "PriorityBlockingQueue"),
PRIORITY_BLOCKING_QUEUE(6, "PriorityBlockingQueue") {
@Override
<T> BlockingQueue<T> of(Integer capacity) {
return new PriorityBlockingQueue<>(capacity);
}
},
/**
* {@link ResizableCapacityLinkedBlockingQueue}
*/
RESIZABLE_LINKED_BLOCKING_QUEUE(9, "ResizableCapacityLinkedBlockingQueue");
RESIZABLE_LINKED_BLOCKING_QUEUE(9, "ResizableCapacityLinkedBlockingQueue") {
@Override
<T> BlockingQueue<T> of(Integer capacity) {
return new ResizableCapacityLinkedBlockingQueue<>(capacity);
}
};
@Getter
private Integer type;
@ -84,67 +113,110 @@ public enum BlockingQueueTypeEnum {
this.name = name;
}
private static final int DEFAULT_CAPACITY = 1024;
private static Map<Integer, BlockingQueueTypeEnum> typeToEnumMap;
private static Map<String, BlockingQueueTypeEnum> nameToEnumMap;
static {
DynamicThreadPoolServiceLoader.register(CustomBlockingQueue.class);
final BlockingQueueTypeEnum[] values = BlockingQueueTypeEnum.values();
typeToEnumMap = new HashMap<>(values.length);
nameToEnumMap = new HashMap<>(values.length);
for (BlockingQueueTypeEnum value : values) {
typeToEnumMap.put(value.type, value);
nameToEnumMap.put(value.name, value);
}
}
public static BlockingQueue createBlockingQueue(String blockingQueueName, Integer capacity) {
BlockingQueue blockingQueue = null;
BlockingQueueTypeEnum queueTypeEnum = Stream.of(BlockingQueueTypeEnum.values())
.filter(each -> Objects.equals(each.name, blockingQueueName))
.findFirst()
.orElse(null);
if (queueTypeEnum != null) {
blockingQueue = createBlockingQueue(queueTypeEnum.type, capacity);
if (Objects.equals(blockingQueue.getClass().getSimpleName(), blockingQueueName)) {
return blockingQueue;
}
/**
*
*
* @param capacity
* @return
*/
<T> BlockingQueue<T> of(Integer capacity) {
throw new NotSupportedException("该队列必须有界", SERVICE_ERROR);
}
/**
*
*
* @return
*/
<T> BlockingQueue<T> of() {
throw new NotSupportedException("该队列不支持有界", SERVICE_ERROR);
}
private static <T> BlockingQueue<T> of(String blockingQueueName, Integer capacity) {
final BlockingQueueTypeEnum typeEnum = nameToEnumMap.get(blockingQueueName);
if (typeEnum == null) {
return null;
}
Collection<CustomBlockingQueue> customBlockingQueues = DynamicThreadPoolServiceLoader
.getSingletonServiceInstances(CustomBlockingQueue.class);
blockingQueue = Optional.ofNullable(blockingQueue)
.orElseGet(
() -> customBlockingQueues.stream()
.filter(each -> Objects.equals(blockingQueueName, each.getName()))
.map(each -> each.generateBlockingQueue())
.findFirst()
.orElseGet(() -> {
int temCapacity = capacity;
if (capacity == null || capacity <= 0) {
temCapacity = DEFAULT_CAPACITY;
}
return new LinkedBlockingQueue(temCapacity);
}));
return blockingQueue;
return Objects.isNull(capacity) ? typeEnum.of() : typeEnum.of(capacity);
}
public static BlockingQueue createBlockingQueue(int type, Integer capacity) {
BlockingQueue blockingQueue = null;
if (Objects.equals(type, ARRAY_BLOCKING_QUEUE.type)) {
blockingQueue = new ArrayBlockingQueue(capacity);
} else if (Objects.equals(type, LINKED_BLOCKING_QUEUE.type)) {
blockingQueue = new LinkedBlockingQueue(capacity);
} else if (Objects.equals(type, LINKED_BLOCKING_DEQUE.type)) {
blockingQueue = new LinkedBlockingDeque(capacity);
} else if (Objects.equals(type, SYNCHRONOUS_QUEUE.type)) {
blockingQueue = new SynchronousQueue();
} else if (Objects.equals(type, LINKED_TRANSFER_QUEUE.type)) {
blockingQueue = new LinkedTransferQueue();
} else if (Objects.equals(type, PRIORITY_BLOCKING_QUEUE.type)) {
blockingQueue = new PriorityBlockingQueue(capacity);
} else if (Objects.equals(type, RESIZABLE_LINKED_BLOCKING_QUEUE.type)) {
blockingQueue = new ResizableCapacityLinkedBlockingQueue(capacity);
private static <T> BlockingQueue<T> of(int type, Integer capacity) {
final BlockingQueueTypeEnum typeEnum = typeToEnumMap.get(type);
if (typeEnum == null) {
return null;
}
return Objects.isNull(capacity) ? typeEnum.of() : typeEnum.of(capacity);
}
private static final int DEFAULT_CAPACITY = 1024;
static {
DynamicThreadPoolServiceLoader.register(CustomBlockingQueue.class);
}
private static <T> BlockingQueue<T> customOrDefaultQueue(Integer capacity, Predicate<CustomBlockingQueue> predicate) {
Collection<CustomBlockingQueue> customBlockingQueues = DynamicThreadPoolServiceLoader
.getSingletonServiceInstances(CustomBlockingQueue.class);
blockingQueue = Optional.ofNullable(blockingQueue).orElseGet(() -> customBlockingQueues.stream()
.filter(each -> Objects.equals(type, each.getType()))
return customBlockingQueues.stream()
.filter(predicate)
.map(each -> each.generateBlockingQueue())
.findFirst()
.orElse(new LinkedBlockingQueue(capacity)));
return blockingQueue;
.orElseGet(() -> {
int temCapacity = capacity;
if (capacity == null || capacity <= 0) {
temCapacity = DEFAULT_CAPACITY;
}
return new LinkedBlockingQueue<T>(temCapacity);
});
}
/**
*
*
* @param blockingQueueName {@linkplain BlockingQueueTypeEnum#name}
* @param capacity . LinkedBlockingQueue
* @return LinkedBlockingQueue
*/
public static <T> BlockingQueue<T> createBlockingQueue(String blockingQueueName, Integer capacity) {
final BlockingQueue<T> of = of(blockingQueueName, capacity);
if (of != null) {
return of;
}
return customOrDefaultQueue(capacity,
(customeQueue) -> Objects.equals(customeQueue.getName(), blockingQueueName));
}
/**
*
*
* @param type {@linkplain BlockingQueueTypeEnum#type}
* @param capacity . LinkedBlockingQueue
* @return LinkedBlockingQueue
*/
public static <T> BlockingQueue<T> createBlockingQueue(int type, Integer capacity) {
final BlockingQueue<T> of = of(type, capacity);
if (of != null) {
return of;
}
return customOrDefaultQueue(capacity,
(customeQueue) -> Objects.equals(customeQueue.getType(), type));
}
public static String getBlockingQueueNameByType(int type) {

@ -22,7 +22,7 @@ import java.util.concurrent.BlockingQueue;
/**
* Custom blocking-queue.
*/
public interface CustomBlockingQueue {
public interface CustomBlockingQueue<T> {
/**
* Gets the custom blocking queue type.
@ -45,5 +45,5 @@ public interface CustomBlockingQueue {
*
* @return
*/
BlockingQueue generateBlockingQueue();
BlockingQueue<T> generateBlockingQueue();
}

@ -0,0 +1,26 @@
package cn.hippo4j.common.executor.support;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @author hongdan.qin
* @date 2022/12/6 18:09
*/
public class MyArrayBlockingQueue implements CustomBlockingQueue<Runnable> {
@Override
public Integer getType() {
return null;
}
@Override
public String getName() {
return this.getClass().getSimpleName();
}
@Override
public BlockingQueue<Runnable> generateBlockingQueue() {
return new LinkedBlockingQueue<>(20);
}
}

@ -17,23 +17,31 @@
package cn.hippo4j.common.spi;
import cn.hippo4j.common.spi.annotation.SingletonSPI;
import cn.hippo4j.common.web.exception.NotSupportedException;
import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import cn.hippo4j.common.spi.annotation.SingletonSPI;
import static cn.hippo4j.common.web.exception.ErrorCodeEnum.SERVICE_ERROR;
/**
* Dynamic thread-pool service loader.
*/
public class DynamicThreadPoolServiceLoader {
private static final Map<Class<?>, Collection<Object>> SERVICES = new ConcurrentHashMap<>();
/**
*
* key : SPI interface
* value : keys subtype instance collection
*/
private static final Map<Class<?>, Collection<?>> SERVICES = new ConcurrentHashMap<>();
private DynamicThreadPoolServiceLoader() {
throw new NotSupportedException("不支持实例化", SERVICE_ERROR);
}
/**
* Register.
@ -53,8 +61,8 @@ public class DynamicThreadPoolServiceLoader {
* @param <T>
* @return
*/
private static <T> Collection<Object> load(final Class<T> serviceInterface) {
Collection<Object> result = new LinkedList<>();
private static <T> Collection<T> load(final Class<T> serviceInterface) {
Collection<T> result = new LinkedList<>();
for (T each : ServiceLoader.load(serviceInterface)) {
result.add(each);
}
@ -63,7 +71,7 @@ public class DynamicThreadPoolServiceLoader {
/**
* Get Service instances
*
*
* @param serviceClass serviceClass
* @param <T>
* @return

@ -0,0 +1,13 @@
package cn.hippo4j.common.web.exception;
/**
* @author hongdan.qin
* @date 2022/12/5 19:42
*/
public class NotSupportedException extends AbstractException {
public NotSupportedException(String message, ErrorCode errorCode) {
super(message, null, errorCode);
}
}

@ -0,0 +1,119 @@
package cn.hippo4j.common.toolkit;
import cn.hippo4j.common.executor.support.BlockingQueueTypeEnum;
import cn.hippo4j.common.spi.TestInterfaceSPI;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.ServiceLoader;
import java.util.concurrent.BlockingQueue;
/**
* @author hongdan.qin
* @date 2022/12/5 20:01
*/
public class BlockingQueueTypeEnumTest {
@Test
void unboundQueue() {
final BlockingQueue arrayBlockingQueue = BlockingQueueTypeEnum.createBlockingQueue("ArrayBlockingQueue", 10);
System.out.println(arrayBlockingQueue);
}
@Test
void wildGeneric() {
List<?> ll = new ArrayList<String>();
System.out.println(ll.get(0));
// 无法编译
// ll.add("dd");
}
public static List<?> ll() {
return null;
}
@Test
void classLoader() {
for (TestInterfaceSPI spi : ServiceLoader.load(TestInterfaceSPI.class)) {
System.out.println(spi);
}
}
@Test
void test202212061813() {
final BlockingQueue<Runnable> arrayBlockingQueue = BlockingQueueTypeEnum.createBlockingQueue("MyArrayBlockingQueue", null);
System.out.println(arrayBlockingQueue);
// arrayBlockingQueue.add("dd");
BlockingQueue workQueue = BlockingQueueTypeEnum.createBlockingQueue("MyArrayBlockingQueue", 20);
workQueue.add("dd");
System.out.println(workQueue);
BlockingQueue<Runnable> workQueue1 = BlockingQueueTypeEnum.createBlockingQueue("LinkedBlockingQueue", 20);
workQueue1.add(new Runnable() {
@Override
public void run() {
}
});
System.out.println(workQueue1);
}
interface O<T, R> {
R say(T content);
}
abstract class S<T, R> implements O<T, R> {
public abstract R say(T content);
public <WT, WR> WR write(WT content) {
return null;
}
}
class C<T, R extends CharSequence> extends S<T, R> {
@Override
public R say(T content) {
return null;
}
@Override
public <WT1, WR1> WR1 write(WT1 content) {
return super.write(content);
}
}
class D<T, R> extends S<T, R> {
@Override
public R say(T content) {
return null;
}
// can't be compile
// @Override
// public String write(String content) {
// return super.write(content);
// }
// @Override
public <WT extends String, WR extends String> WR write(WT content) {
return super.write(content);
}
}
@Test
void test() {
D<String, String> d = new D<>();
S<String, String> l = d;
d.write("ddd");
l.write(Integer.valueOf(2));
l.write("ddd");
}
@Test
void test202212071347(){
final List<String> ls = Arrays.asList("sd", "2");
System.out.println(ls);
}
}

@ -0,0 +1,22 @@
package cn.hippo4j.common.toolkit;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
/**
* @author hongdan.qin
* @date 2022/12/7 14:43
*/
public class GenericeTest {
public void test() {
final List<String> ls = Arrays.asList("sd", "2");
System.out.println(ls.get(0));
}
@Test
public void test1() {
System.out.println(System.getProperty("java.class.path"));
}
}
Loading…
Cancel
Save