|
|
@ -4,6 +4,7 @@ import cn.hutool.core.util.ReflectUtil;
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
|
|
|
|
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
|
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
/**
|
|
|
|
* Resizable capacity linkedBlockIngQueue.
|
|
|
|
* Resizable capacity linkedBlockIngQueue.
|
|
|
@ -20,13 +21,24 @@ public class ResizableCapacityLinkedBlockIngQueue<E> extends LinkedBlockingQueue
|
|
|
|
|
|
|
|
|
|
|
|
public synchronized boolean setCapacity(Integer capacity) {
|
|
|
|
public synchronized boolean setCapacity(Integer capacity) {
|
|
|
|
boolean successFlag = true;
|
|
|
|
boolean successFlag = true;
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
|
|
|
* TODO:后续考虑切换 Rabbitmq VariableLinkedBlockingQueue
|
|
|
|
|
|
|
|
*/
|
|
|
|
try {
|
|
|
|
try {
|
|
|
|
|
|
|
|
int oldCapacity = (int) ReflectUtil.getFieldValue(this, "capacity");
|
|
|
|
|
|
|
|
AtomicInteger count = (AtomicInteger) ReflectUtil.getFieldValue(this, "count");
|
|
|
|
|
|
|
|
int size = count.get();
|
|
|
|
|
|
|
|
|
|
|
|
ReflectUtil.setFieldValue(this, "capacity", capacity);
|
|
|
|
ReflectUtil.setFieldValue(this, "capacity", capacity);
|
|
|
|
|
|
|
|
if (capacity > size && size >= oldCapacity) {
|
|
|
|
|
|
|
|
ReflectUtil.invoke(this, "signalNotFull");
|
|
|
|
|
|
|
|
}
|
|
|
|
} catch (Exception ex) {
|
|
|
|
} catch (Exception ex) {
|
|
|
|
// ignore
|
|
|
|
// ignore
|
|
|
|
log.error("Dynamic modification of blocking queue size failed.", ex);
|
|
|
|
log.error("Dynamic modification of blocking queue size failed.", ex);
|
|
|
|
successFlag = false;
|
|
|
|
successFlag = false;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return successFlag;
|
|
|
|
return successFlag;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|