From 17426f053d7593151791af379b1f3687208349e5 Mon Sep 17 00:00:00 2001 From: "chen.ma" Date: Sun, 11 Sep 2022 16:14:54 +0800 Subject: [PATCH] Fix bug in put method of ResizableCapacityLinkedBlockingQueue (#685) --- .../ResizableCapacityLinkedBlockingQueue.java | 784 +++++++++++++++++- 1 file changed, 762 insertions(+), 22 deletions(-) diff --git a/hippo4j-common/src/main/java/cn/hippo4j/common/executor/support/ResizableCapacityLinkedBlockingQueue.java b/hippo4j-common/src/main/java/cn/hippo4j/common/executor/support/ResizableCapacityLinkedBlockingQueue.java index 9e9a8d10..7e4fc3bc 100644 --- a/hippo4j-common/src/main/java/cn/hippo4j/common/executor/support/ResizableCapacityLinkedBlockingQueue.java +++ b/hippo4j-common/src/main/java/cn/hippo4j/common/executor/support/ResizableCapacityLinkedBlockingQueue.java @@ -17,38 +17,778 @@ package cn.hippo4j.common.executor.support; -import cn.hutool.core.util.ReflectUtil; -import lombok.extern.slf4j.Slf4j; - -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; +import java.util.*; /** - * Resizable capacity linked-blocking-queue. Options Rabbitmq VariableLinkedBlockingQueue - */ -@Slf4j -public class ResizableCapacityLinkedBlockingQueue extends LinkedBlockingQueue { + * A clone of {@linkplain java.util.concurrent.LinkedBlockingQueue} + * with the addition of a {@link #setCapacity(int)} method, allowing us to + * change the capacity of the queue while it is in use.

+ *

+ * The documentation for LinkedBlockingQueue follows...

+ *

+ * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on + * linked nodes. + * This queue orders elements FIFO (first-in-first-out). + * The head of the queue is that element that has been on the + * queue the longest time. + * The tail of the queue is that element that has been on the + * queue the shortest time. New elements + * are inserted at the tail of the queue, and the queue retrieval + * operations obtain elements at the head of the queue. + * Linked queues typically have higher throughput than array-based queues but + * less predictable performance in most concurrent applications. + * + *

The optional capacity bound constructor argument serves as a + * way to prevent excessive queue expansion. The capacity, if unspecified, + * is equal to {@link Integer#MAX_VALUE}. Linked nodes are + * dynamically created upon each insertion unless this would bring the + * queue above capacity. + * + *

This class implements all of the optional methods + * of the {@link Collection} and {@link Iterator} interfaces. + * + *

This class is a member of the + * + * Java Collections Framework. + * + * @param the type of elements held in this collection + * @author Doug Lea + * @since 1.5 + **/ +public class ResizableCapacityLinkedBlockingQueue extends AbstractQueue + implements + BlockingQueue, + java.io.Serializable { + + private static final long serialVersionUID = -6903933977591709194L; + + /* + * A variant of the "two lock queue" algorithm. The putLock gates entry to put (and offer), and has an associated condition for waiting puts. Similarly for the takeLock. The "count" field that + * they both rely on is maintained as an atomic to avoid needing to get both locks in most cases. Also, to minimize need for puts to get takeLock and vice-versa, cascading notifies are used. When + * a put notices that it has enabled at least one take, it signals taker. That taker in turn signals others if more items have been entered since the signal. And symmetrically for takes signalling + * puts. Operations such as remove(Object) and iterators acquire both locks. + */ + + /** + * Linked list node class + */ + static class Node { + + /** + * The item, volatile to ensure barrier separating write and read + */ + volatile E item; + Node next; + + Node(E x) { + item = x; + } + } + + /** + * The capacity bound, or Integer.MAX_VALUE if none + */ + private int capacity; + + /** + * Current number of elements + */ + private final AtomicInteger count = new AtomicInteger(0); + + /** + * Head of linked list + */ + private transient Node head; + + /** + * Tail of linked list + */ + private transient Node last; + + /** + * Lock held by take, poll, etc + */ + private final ReentrantLock takeLock = new ReentrantLock(); + /** + * Wait queue for waiting takes + */ + private final Condition notEmpty = takeLock.newCondition(); + + /** + * Lock held by put, offer, etc + */ + private final ReentrantLock putLock = new ReentrantLock(); + + /** + * Wait queue for waiting puts + */ + private final Condition notFull = putLock.newCondition(); + + /** + * Signal a waiting take. Called only from put/offer (which do not + * otherwise ordinarily lock takeLock.) + */ + private void signalNotEmpty() { + final ReentrantLock takeLock = this.takeLock; + takeLock.lock(); + try { + notEmpty.signal(); + } finally { + takeLock.unlock(); + } + } + + /** + * Signal a waiting put. Called only from take/poll. + */ + private void signalNotFull() { + final ReentrantLock putLock = this.putLock; + putLock.lock(); + try { + notFull.signal(); + } finally { + putLock.unlock(); + } + } + + /** + * Create a node and link it at end of queue + * + * @param x the item + */ + private void insert(E x) { + last = last.next = new Node(x); + } + + /** + * Remove a node from head of queue, + * + * @return the node + */ + private E extract() { + Node first = head.next; + head = first; + E x = first.item; + first.item = null; + return x; + } + + /** + * Lock to prevent both puts and takes. + */ + private void fullyLock() { + putLock.lock(); + takeLock.lock(); + } + + /** + * Unlock to allow both puts and takes. + */ + private void fullyUnlock() { + takeLock.unlock(); + putLock.unlock(); + } + + /** + * Creates a LinkedBlockingQueue with a capacity of + * {@link Integer#MAX_VALUE}. + */ + public ResizableCapacityLinkedBlockingQueue() { + this(Integer.MAX_VALUE); + } + + /** + * Creates a LinkedBlockingQueue with the given (fixed) capacity. + * + * @param capacity the capacity of this queue. + * @throws IllegalArgumentException if capacity is not greater + * than zero. + */ public ResizableCapacityLinkedBlockingQueue(int capacity) { - super(capacity); + if (capacity <= 0) + throw new IllegalArgumentException(); + this.capacity = capacity; + last = head = new Node(null); + } + + /** + * Creates a LinkedBlockingQueue with a capacity of + * {@link Integer#MAX_VALUE}, initially containing the elements of the + * given collection, + * added in traversal order of the collection's iterator. + * + * @param c the collection of elements to initially contain + * @throws NullPointerException if c or any element within it + * is null + */ + public ResizableCapacityLinkedBlockingQueue(Collection c) { + this(Integer.MAX_VALUE); + for (Iterator it = c.iterator(); it.hasNext();) + add(it.next()); + } + + // this doc comment is overridden to remove the reference to collections + // greater in size than Integer.MAX_VALUE + + /** + * Returns the number of elements in this queue. + * + * @return the number of elements in this queue. + */ + @Override + public int size() { + return count.get(); + } + + /** + * Set a new capacity for the queue. Increasing the capacity can + * cause any waiting {@link #put(Object)} invocations to succeed if the new + * capacity is larger than the queue. + * + * @param capacity the new capacity for the queue + */ + public void setCapacity(int capacity) { + final int oldCapacity = this.capacity; + this.capacity = capacity; + final int size = count.get(); + if (capacity > size && size >= oldCapacity) { + signalNotFull(); + } + } + + // this doc comment is a modified copy of the inherited doc comment, + // without the reference to unlimited queues. + + /** + * Returns the number of elements that this queue can ideally (in + * the absence of memory or resource constraints) accept without + * blocking. This is always equal to the initial capacity of this queue + * less the current size of this queue. + *

Note that you cannot always tell if + * an attempt to add an element will succeed by + * inspecting remainingCapacity because it may be the + * case that a waiting consumer is ready to take an + * element out of an otherwise full queue. + */ + @Override + public int remainingCapacity() { + return capacity - count.get(); + } + + /** + * Adds the specified element to the tail of this queue, waiting if + * necessary for space to become available. + * + * @param o the element to add + * @throws InterruptedException if interrupted while waiting. + * @throws NullPointerException if the specified element is null. + */ + @Override + public void put(E o) throws InterruptedException { + if (o == null) + throw new NullPointerException(); + // Note: convention in all put/take/etc is to preset + // local var holding count negative to indicate failure unless set. + int c = -1; + final ReentrantLock putLock = this.putLock; + final AtomicInteger count = this.count; + putLock.lockInterruptibly(); + try { + /* + * Note that count is used in wait guard even though it is not protected by lock. This works because count can only decrease at this point (all other puts are shut out by lock), and we (or + * some other waiting put) are signalled if it ever changes from capacity. Similarly for all other uses of count in other wait guards. + */ + try { + while (count.get() >= capacity) + notFull.await(); + } catch (InterruptedException ie) { + notFull.signal(); // propagate to a non-interrupted thread + throw ie; + } + insert(o); + c = count.getAndIncrement(); + if (c + 1 < capacity) + notFull.signal(); + } finally { + putLock.unlock(); + } + if (c == 0) + signalNotEmpty(); + } + + /** + * Inserts the specified element at the tail of this queue, waiting if + * necessary up to the specified wait time for space to become available. + * + * @param o the element to add + * @param timeout how long to wait before giving up, in units of + * unit + * @param unit a TimeUnit determining how to interpret the + * timeout parameter + * @return true if successful, or false if + * the specified waiting time elapses before space is available. + * @throws InterruptedException if interrupted while waiting. + * @throws NullPointerException if the specified element is null. + */ + @Override + public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException { + + if (o == null) + throw new NullPointerException(); + long nanos = unit.toNanos(timeout); + int c = -1; + final ReentrantLock putLock = this.putLock; + final AtomicInteger count = this.count; + putLock.lockInterruptibly(); + try { + for (;;) { + if (count.get() < capacity) { + insert(o); + c = count.getAndIncrement(); + if (c + 1 < capacity) + notFull.signal(); + break; + } + if (nanos <= 0) + return false; + try { + nanos = notFull.awaitNanos(nanos); + } catch (InterruptedException ie) { + notFull.signal(); // propagate to a non-interrupted thread + throw ie; + } + } + } finally { + putLock.unlock(); + } + if (c == 0) + signalNotEmpty(); + return true; + } + + /** + * Inserts the specified element at the tail of this queue if possible, + * returning immediately if this queue is full. + * + * @param o the element to add. + * @return true if it was possible to add the element to + * this queue, else false + * @throws NullPointerException if the specified element is null + */ + @Override + public boolean offer(E o) { + if (o == null) + throw new NullPointerException(); + final AtomicInteger count = this.count; + if (count.get() >= capacity) + return false; + int c = -1; + final ReentrantLock putLock = this.putLock; + putLock.lock(); + try { + if (count.get() < capacity) { + insert(o); + c = count.getAndIncrement(); + if (c + 1 < capacity) + notFull.signal(); + } + } finally { + putLock.unlock(); + } + if (c == 0) + signalNotEmpty(); + return c >= 0; + } + + @Override + public E take() throws InterruptedException { + E x; + int c = -1; + final AtomicInteger count = this.count; + final ReentrantLock takeLock = this.takeLock; + takeLock.lockInterruptibly(); + try { + try { + while (count.get() == 0) + notEmpty.await(); + } catch (InterruptedException ie) { + notEmpty.signal(); // propagate to a non-interrupted thread + throw ie; + } + + x = extract(); + c = count.getAndDecrement(); + if (c > 1) + notEmpty.signal(); + } finally { + takeLock.unlock(); + } + if (c >= capacity) + signalNotFull(); + return x; + } + + @Override + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + E x = null; + int c = -1; + long nanos = unit.toNanos(timeout); + final AtomicInteger count = this.count; + final ReentrantLock takeLock = this.takeLock; + takeLock.lockInterruptibly(); + try { + for (;;) { + if (count.get() > 0) { + x = extract(); + c = count.getAndDecrement(); + if (c > 1) + notEmpty.signal(); + break; + } + if (nanos <= 0) + return null; + try { + nanos = notEmpty.awaitNanos(nanos); + } catch (InterruptedException ie) { + notEmpty.signal(); // propagate to a non-interrupted thread + throw ie; + } + } + } finally { + takeLock.unlock(); + } + if (c >= capacity) + signalNotFull(); + return x; + } + + @Override + public E poll() { + final AtomicInteger count = this.count; + if (count.get() == 0) + return null; + E x = null; + int c = -1; + final ReentrantLock takeLock = this.takeLock; + takeLock.lock(); + try { + if (count.get() > 0) { + x = extract(); + c = count.getAndDecrement(); + if (c > 1) + notEmpty.signal(); + } + } finally { + takeLock.unlock(); + } + if (c >= capacity) + signalNotFull(); + return x; + } + + @Override + public E peek() { + if (count.get() == 0) + return null; + final ReentrantLock takeLock = this.takeLock; + takeLock.lock(); + try { + Node first = head.next; + if (first == null) + return null; + else + return first.item; + } finally { + takeLock.unlock(); + } + } + + @Override + public boolean remove(Object o) { + if (o == null) + return false; + boolean removed = false; + fullyLock(); + try { + Node trail = head; + Node p = head.next; + while (p != null) { + if (o.equals(p.item)) { + removed = true; + break; + } + trail = p; + p = p.next; + } + if (removed) { + p.item = null; + trail.next = p.next; + if (count.getAndDecrement() >= capacity) + notFull.signalAll(); + } + } finally { + fullyUnlock(); + } + return removed; } - public synchronized boolean setCapacity(Integer capacity) { - boolean successFlag = true; + @Override + public Object[] toArray() { + fullyLock(); try { - int oldCapacity = (int) ReflectUtil.getFieldValue(this, "capacity"); - AtomicInteger count = (AtomicInteger) ReflectUtil.getFieldValue(this, "count"); int size = count.get(); + Object[] a = new Object[size]; + int k = 0; + for (Node p = head.next; p != null; p = p.next) + a[k++] = p.item; + return a; + } finally { + fullyUnlock(); + } + } + + @Override + @SuppressWarnings("unchecked") + public T[] toArray(T[] a) { + fullyLock(); + try { + int size = count.get(); + if (a.length < size) + a = (T[]) java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), size); + + int k = 0; + for (Node p = head.next; p != null; p = p.next) + a[k++] = (T) p.item; + return a; + } finally { + fullyUnlock(); + } + } - ReflectUtil.setFieldValue(this, "capacity", capacity); - if (capacity > size && size >= oldCapacity) { - ReflectUtil.invoke(this, "signalNotFull"); + @Override + public String toString() { + fullyLock(); + try { + return super.toString(); + } finally { + fullyUnlock(); + } + } + + @Override + public void clear() { + fullyLock(); + try { + head.next = null; + if (count.getAndSet(0) >= capacity) + notFull.signalAll(); + } finally { + fullyUnlock(); + } + } + + @Override + public int drainTo(Collection c) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + Node first; + fullyLock(); + try { + first = head.next; + head.next = null; + if (count.getAndSet(0) >= capacity) + notFull.signalAll(); + } finally { + fullyUnlock(); + } + // Transfer the elements outside of locks + int n = 0; + for (Node p = first; p != null; p = p.next) { + c.add(p.item); + p.item = null; + ++n; + } + return n; + } + + @Override + public int drainTo(Collection c, int maxElements) { + if (c == null) + throw new NullPointerException(); + if (c == this) + throw new IllegalArgumentException(); + if (maxElements <= 0) + return 0; + fullyLock(); + try { + int n = 0; + Node p = head.next; + while (p != null && n < maxElements) { + c.add(p.item); + p.item = null; + p = p.next; + ++n; } - } catch (Exception ex) { - log.error("Dynamic modification of blocking queue size failed.", ex); - successFlag = false; + if (n != 0) { + head.next = p; + if (count.getAndAdd(-n) >= capacity) + notFull.signalAll(); + } + return n; + } finally { + fullyUnlock(); } - return successFlag; } -} + /** + * Returns an iterator over the elements in this queue in proper sequence. + * The returned Iterator is a "weakly consistent" iterator that + * will never throw {@link java.util.ConcurrentModificationException}, + * and guarantees to traverse elements as they existed upon + * construction of the iterator, and may (but is not guaranteed to) + * reflect any modifications subsequent to construction. + * + * @return an iterator over the elements in this queue in proper sequence. + */ + @Override + public Iterator iterator() { + return new Itr(); + } + + private class Itr implements Iterator { + + /* + * Basic weak-consistent iterator. At all times hold the next item to hand out so that if hasNext() reports true, we will still have it to return even if lost race with a take etc. + */ + private Node current; + private Node lastRet; + private E currentElement; + + Itr() { + final ReentrantLock putLock = ResizableCapacityLinkedBlockingQueue.this.putLock; + final ReentrantLock takeLock = ResizableCapacityLinkedBlockingQueue.this.takeLock; + putLock.lock(); + takeLock.lock(); + try { + current = head.next; + if (current != null) + currentElement = current.item; + } finally { + takeLock.unlock(); + putLock.unlock(); + } + } + + @Override + public boolean hasNext() { + return current != null; + } + + @Override + public E next() { + final ReentrantLock putLock = ResizableCapacityLinkedBlockingQueue.this.putLock; + final ReentrantLock takeLock = ResizableCapacityLinkedBlockingQueue.this.takeLock; + putLock.lock(); + takeLock.lock(); + try { + if (current == null) + throw new NoSuchElementException(); + E x = currentElement; + lastRet = current; + current = current.next; + if (current != null) + currentElement = current.item; + return x; + } finally { + takeLock.unlock(); + putLock.unlock(); + } + } + + @Override + public void remove() { + if (lastRet == null) + throw new IllegalStateException(); + final ReentrantLock putLock = ResizableCapacityLinkedBlockingQueue.this.putLock; + final ReentrantLock takeLock = ResizableCapacityLinkedBlockingQueue.this.takeLock; + putLock.lock(); + takeLock.lock(); + try { + Node node = lastRet; + lastRet = null; + Node trail = head; + Node p = head.next; + while (p != null && p != node) { + trail = p; + p = p.next; + } + if (p == node) { + p.item = null; + trail.next = p.next; + int c = count.getAndDecrement(); + if (c >= capacity) + notFull.signalAll(); + } + } finally { + takeLock.unlock(); + putLock.unlock(); + } + } + } + + /** + * Save the state to a stream (that is, serialize it). + * + * @param s the stream + * @serialData The capacity is emitted (int), followed by all of + * its elements (each an Object) in the proper order, + * followed by a null + */ + private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException { + + fullyLock(); + try { + // Write out any hidden stuff, plus capacity + s.defaultWriteObject(); + + // Write out all elements in the proper order. + for (Node p = head.next; p != null; p = p.next) + s.writeObject(p.item); + + // Use trailing null as sentinel + s.writeObject(null); + } finally { + fullyUnlock(); + } + } + + /** + * Reconstitute this queue instance from a stream (that is, + * deserialize it). + * + * @param s the stream + */ + private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { + // Read in capacity, and any hidden stuff + s.defaultReadObject(); + + count.set(0); + last = head = new Node(null); + + // Read in all elements and place in queue + for (;;) { + @SuppressWarnings("unchecked") + E item = (E) s.readObject(); + if (item == null) + break; + add(item); + } + } +} \ No newline at end of file