|
|
|
@ -210,8 +210,9 @@ public class ResizableCapacityLinkedBlockingQueue<E> extends AbstractQueue<E>
|
|
|
|
|
* than zero.
|
|
|
|
|
*/
|
|
|
|
|
public ResizableCapacityLinkedBlockingQueue(int capacity) {
|
|
|
|
|
if (capacity <= 0)
|
|
|
|
|
if (capacity <= 0) {
|
|
|
|
|
throw new IllegalArgumentException();
|
|
|
|
|
}
|
|
|
|
|
this.capacity = capacity;
|
|
|
|
|
last = head = new Node<E>(null);
|
|
|
|
|
}
|
|
|
|
@ -290,8 +291,9 @@ public class ResizableCapacityLinkedBlockingQueue<E> extends AbstractQueue<E>
|
|
|
|
|
*/
|
|
|
|
|
@Override
|
|
|
|
|
public void put(E o) throws InterruptedException {
|
|
|
|
|
if (o == null)
|
|
|
|
|
if (o == null) {
|
|
|
|
|
throw new NullPointerException();
|
|
|
|
|
}
|
|
|
|
|
// Note: convention in all put/take/etc is to preset
|
|
|
|
|
// local var holding count negative to indicate failure unless set.
|
|
|
|
|
int c = -1;
|
|
|
|
@ -304,21 +306,24 @@ public class ResizableCapacityLinkedBlockingQueue<E> extends AbstractQueue<E>
|
|
|
|
|
* some other waiting put) are signalled if it ever changes from capacity. Similarly for all other uses of count in other wait guards.
|
|
|
|
|
*/
|
|
|
|
|
try {
|
|
|
|
|
while (count.get() >= capacity)
|
|
|
|
|
while (count.get() >= capacity) {
|
|
|
|
|
notFull.await();
|
|
|
|
|
}
|
|
|
|
|
} catch (InterruptedException ie) {
|
|
|
|
|
notFull.signal(); // propagate to a non-interrupted thread
|
|
|
|
|
throw ie;
|
|
|
|
|
}
|
|
|
|
|
insert(o);
|
|
|
|
|
c = count.getAndIncrement();
|
|
|
|
|
if (c + 1 < capacity)
|
|
|
|
|
if (c + 1 < capacity) {
|
|
|
|
|
notFull.signal();
|
|
|
|
|
}
|
|
|
|
|
} finally {
|
|
|
|
|
putLock.unlock();
|
|
|
|
|
}
|
|
|
|
|
if (c == 0)
|
|
|
|
|
if (c == 0) {
|
|
|
|
|
signalNotEmpty();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
@ -338,8 +343,9 @@ public class ResizableCapacityLinkedBlockingQueue<E> extends AbstractQueue<E>
|
|
|
|
|
@Override
|
|
|
|
|
public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
|
|
|
|
|
|
|
|
|
|
if (o == null)
|
|
|
|
|
if (o == null) {
|
|
|
|
|
throw new NullPointerException();
|
|
|
|
|
}
|
|
|
|
|
long nanos = unit.toNanos(timeout);
|
|
|
|
|
int c = -1;
|
|
|
|
|
final ReentrantLock putLock = this.putLock;
|
|
|
|
@ -350,12 +356,14 @@ public class ResizableCapacityLinkedBlockingQueue<E> extends AbstractQueue<E>
|
|
|
|
|
if (count.get() < capacity) {
|
|
|
|
|
insert(o);
|
|
|
|
|
c = count.getAndIncrement();
|
|
|
|
|
if (c + 1 < capacity)
|
|
|
|
|
if (c + 1 < capacity) {
|
|
|
|
|
notFull.signal();
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
if (nanos <= 0)
|
|
|
|
|
if (nanos <= 0) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
try {
|
|
|
|
|
nanos = notFull.awaitNanos(nanos);
|
|
|
|
|
} catch (InterruptedException ie) {
|
|
|
|
@ -366,8 +374,9 @@ public class ResizableCapacityLinkedBlockingQueue<E> extends AbstractQueue<E>
|
|
|
|
|
} finally {
|
|
|
|
|
putLock.unlock();
|
|
|
|
|
}
|
|
|
|
|
if (c == 0)
|
|
|
|
|
if (c == 0) {
|
|
|
|
|
signalNotEmpty();
|
|
|
|
|
}
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -382,11 +391,13 @@ public class ResizableCapacityLinkedBlockingQueue<E> extends AbstractQueue<E>
|
|
|
|
|
*/
|
|
|
|
|
@Override
|
|
|
|
|
public boolean offer(E o) {
|
|
|
|
|
if (o == null)
|
|
|
|
|
if (o == null) {
|
|
|
|
|
throw new NullPointerException();
|
|
|
|
|
}
|
|
|
|
|
final AtomicInteger count = this.count;
|
|
|
|
|
if (count.get() >= capacity)
|
|
|
|
|
if (count.get() >= capacity) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
int c = -1;
|
|
|
|
|
final ReentrantLock putLock = this.putLock;
|
|
|
|
|
putLock.lock();
|
|
|
|
@ -394,14 +405,16 @@ public class ResizableCapacityLinkedBlockingQueue<E> extends AbstractQueue<E>
|
|
|
|
|
if (count.get() < capacity) {
|
|
|
|
|
insert(o);
|
|
|
|
|
c = count.getAndIncrement();
|
|
|
|
|
if (c + 1 < capacity)
|
|
|
|
|
if (c + 1 < capacity) {
|
|
|
|
|
notFull.signal();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} finally {
|
|
|
|
|
putLock.unlock();
|
|
|
|
|
}
|
|
|
|
|
if (c == 0)
|
|
|
|
|
if (c == 0) {
|
|
|
|
|
signalNotEmpty();
|
|
|
|
|
}
|
|
|
|
|
return c >= 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -414,8 +427,9 @@ public class ResizableCapacityLinkedBlockingQueue<E> extends AbstractQueue<E>
|
|
|
|
|
takeLock.lockInterruptibly();
|
|
|
|
|
try {
|
|
|
|
|
try {
|
|
|
|
|
while (count.get() == 0)
|
|
|
|
|
while (count.get() == 0) {
|
|
|
|
|
notEmpty.await();
|
|
|
|
|
}
|
|
|
|
|
} catch (InterruptedException ie) {
|
|
|
|
|
notEmpty.signal(); // propagate to a non-interrupted thread
|
|
|
|
|
throw ie;
|
|
|
|
@ -423,13 +437,15 @@ public class ResizableCapacityLinkedBlockingQueue<E> extends AbstractQueue<E>
|
|
|
|
|
|
|
|
|
|
x = extract();
|
|
|
|
|
c = count.getAndDecrement();
|
|
|
|
|
if (c > 1)
|
|
|
|
|
if (c > 1) {
|
|
|
|
|
notEmpty.signal();
|
|
|
|
|
}
|
|
|
|
|
} finally {
|
|
|
|
|
takeLock.unlock();
|
|
|
|
|
}
|
|
|
|
|
if (c >= capacity)
|
|
|
|
|
if (c >= capacity) {
|
|
|
|
|
signalNotFull();
|
|
|
|
|
}
|
|
|
|
|
return x;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -446,12 +462,14 @@ public class ResizableCapacityLinkedBlockingQueue<E> extends AbstractQueue<E>
|
|
|
|
|
if (count.get() > 0) {
|
|
|
|
|
x = extract();
|
|
|
|
|
c = count.getAndDecrement();
|
|
|
|
|
if (c > 1)
|
|
|
|
|
if (c > 1) {
|
|
|
|
|
notEmpty.signal();
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
if (nanos <= 0)
|
|
|
|
|
if (nanos <= 0) {
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
try {
|
|
|
|
|
nanos = notEmpty.awaitNanos(nanos);
|
|
|
|
|
} catch (InterruptedException ie) {
|
|
|
|
@ -462,16 +480,18 @@ public class ResizableCapacityLinkedBlockingQueue<E> extends AbstractQueue<E>
|
|
|
|
|
} finally {
|
|
|
|
|
takeLock.unlock();
|
|
|
|
|
}
|
|
|
|
|
if (c >= capacity)
|
|
|
|
|
if (c >= capacity) {
|
|
|
|
|
signalNotFull();
|
|
|
|
|
}
|
|
|
|
|
return x;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public E poll() {
|
|
|
|
|
final AtomicInteger count = this.count;
|
|
|
|
|
if (count.get() == 0)
|
|
|
|
|
if (count.get() == 0) {
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
E x = null;
|
|
|
|
|
int c = -1;
|
|
|
|
|
final ReentrantLock takeLock = this.takeLock;
|
|
|
|
@ -480,29 +500,33 @@ public class ResizableCapacityLinkedBlockingQueue<E> extends AbstractQueue<E>
|
|
|
|
|
if (count.get() > 0) {
|
|
|
|
|
x = extract();
|
|
|
|
|
c = count.getAndDecrement();
|
|
|
|
|
if (c > 1)
|
|
|
|
|
if (c > 1) {
|
|
|
|
|
notEmpty.signal();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} finally {
|
|
|
|
|
takeLock.unlock();
|
|
|
|
|
}
|
|
|
|
|
if (c >= capacity)
|
|
|
|
|
if (c >= capacity) {
|
|
|
|
|
signalNotFull();
|
|
|
|
|
}
|
|
|
|
|
return x;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public E peek() {
|
|
|
|
|
if (count.get() == 0)
|
|
|
|
|
if (count.get() == 0) {
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
final ReentrantLock takeLock = this.takeLock;
|
|
|
|
|
takeLock.lock();
|
|
|
|
|
try {
|
|
|
|
|
Node<E> first = head.next;
|
|
|
|
|
if (first == null)
|
|
|
|
|
if (first == null) {
|
|
|
|
|
return null;
|
|
|
|
|
else
|
|
|
|
|
} else {
|
|
|
|
|
return first.item;
|
|
|
|
|
}
|
|
|
|
|
} finally {
|
|
|
|
|
takeLock.unlock();
|
|
|
|
|
}
|
|
|
|
@ -510,8 +534,9 @@ public class ResizableCapacityLinkedBlockingQueue<E> extends AbstractQueue<E>
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public boolean remove(Object o) {
|
|
|
|
|
if (o == null)
|
|
|
|
|
if (o == null) {
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
boolean removed = false;
|
|
|
|
|
fullyLock();
|
|
|
|
|
try {
|
|
|
|
@ -528,8 +553,9 @@ public class ResizableCapacityLinkedBlockingQueue<E> extends AbstractQueue<E>
|
|
|
|
|
if (removed) {
|
|
|
|
|
p.item = null;
|
|
|
|
|
trail.next = p.next;
|
|
|
|
|
if (count.getAndDecrement() >= capacity)
|
|
|
|
|
if (count.getAndDecrement() >= capacity) {
|
|
|
|
|
notFull.signalAll();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} finally {
|
|
|
|
|
fullyUnlock();
|
|
|
|
@ -544,8 +570,9 @@ public class ResizableCapacityLinkedBlockingQueue<E> extends AbstractQueue<E>
|
|
|
|
|
int size = count.get();
|
|
|
|
|
Object[] a = new Object[size];
|
|
|
|
|
int k = 0;
|
|
|
|
|
for (Node<E> p = head.next; p != null; p = p.next)
|
|
|
|
|
for (Node<E> p = head.next; p != null; p = p.next) {
|
|
|
|
|
a[k++] = p.item;
|
|
|
|
|
}
|
|
|
|
|
return a;
|
|
|
|
|
} finally {
|
|
|
|
|
fullyUnlock();
|
|
|
|
@ -558,12 +585,13 @@ public class ResizableCapacityLinkedBlockingQueue<E> extends AbstractQueue<E>
|
|
|
|
|
fullyLock();
|
|
|
|
|
try {
|
|
|
|
|
int size = count.get();
|
|
|
|
|
if (a.length < size)
|
|
|
|
|
if (a.length < size) {
|
|
|
|
|
a = (T[]) java.lang.reflect.Array.newInstance(a.getClass().getComponentType(), size);
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
int k = 0;
|
|
|
|
|
for (Node<?> p = head.next; p != null; p = p.next)
|
|
|
|
|
for (Node<?> p = head.next; p != null; p = p.next) {
|
|
|
|
|
a[k++] = (T) p.item;
|
|
|
|
|
}
|
|
|
|
|
return a;
|
|
|
|
|
} finally {
|
|
|
|
|
fullyUnlock();
|
|
|
|
@ -585,8 +613,9 @@ public class ResizableCapacityLinkedBlockingQueue<E> extends AbstractQueue<E>
|
|
|
|
|
fullyLock();
|
|
|
|
|
try {
|
|
|
|
|
head.next = null;
|
|
|
|
|
if (count.getAndSet(0) >= capacity)
|
|
|
|
|
if (count.getAndSet(0) >= capacity) {
|
|
|
|
|
notFull.signalAll();
|
|
|
|
|
}
|
|
|
|
|
} finally {
|
|
|
|
|
fullyUnlock();
|
|
|
|
|
}
|
|
|
|
@ -594,17 +623,20 @@ public class ResizableCapacityLinkedBlockingQueue<E> extends AbstractQueue<E>
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public int drainTo(Collection<? super E> c) {
|
|
|
|
|
if (c == null)
|
|
|
|
|
if (c == null) {
|
|
|
|
|
throw new NullPointerException();
|
|
|
|
|
if (c == this)
|
|
|
|
|
}
|
|
|
|
|
if (c == this) {
|
|
|
|
|
throw new IllegalArgumentException();
|
|
|
|
|
}
|
|
|
|
|
Node<E> first;
|
|
|
|
|
fullyLock();
|
|
|
|
|
try {
|
|
|
|
|
first = head.next;
|
|
|
|
|
head.next = null;
|
|
|
|
|
if (count.getAndSet(0) >= capacity)
|
|
|
|
|
if (count.getAndSet(0) >= capacity) {
|
|
|
|
|
notFull.signalAll();
|
|
|
|
|
}
|
|
|
|
|
} finally {
|
|
|
|
|
fullyUnlock();
|
|
|
|
|
}
|
|
|
|
@ -620,12 +652,15 @@ public class ResizableCapacityLinkedBlockingQueue<E> extends AbstractQueue<E>
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public int drainTo(Collection<? super E> c, int maxElements) {
|
|
|
|
|
if (c == null)
|
|
|
|
|
if (c == null) {
|
|
|
|
|
throw new NullPointerException();
|
|
|
|
|
if (c == this)
|
|
|
|
|
}
|
|
|
|
|
if (c == this) {
|
|
|
|
|
throw new IllegalArgumentException();
|
|
|
|
|
if (maxElements <= 0)
|
|
|
|
|
}
|
|
|
|
|
if (maxElements <= 0) {
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
fullyLock();
|
|
|
|
|
try {
|
|
|
|
|
int n = 0;
|
|
|
|
@ -638,8 +673,9 @@ public class ResizableCapacityLinkedBlockingQueue<E> extends AbstractQueue<E>
|
|
|
|
|
}
|
|
|
|
|
if (n != 0) {
|
|
|
|
|
head.next = p;
|
|
|
|
|
if (count.getAndAdd(-n) >= capacity)
|
|
|
|
|
if (count.getAndAdd(-n) >= capacity) {
|
|
|
|
|
notFull.signalAll();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return n;
|
|
|
|
|
} finally {
|
|
|
|
@ -678,8 +714,9 @@ public class ResizableCapacityLinkedBlockingQueue<E> extends AbstractQueue<E>
|
|
|
|
|
takeLock.lock();
|
|
|
|
|
try {
|
|
|
|
|
current = head.next;
|
|
|
|
|
if (current != null)
|
|
|
|
|
if (current != null) {
|
|
|
|
|
currentElement = current.item;
|
|
|
|
|
}
|
|
|
|
|
} finally {
|
|
|
|
|
takeLock.unlock();
|
|
|
|
|
putLock.unlock();
|
|
|
|
@ -698,13 +735,15 @@ public class ResizableCapacityLinkedBlockingQueue<E> extends AbstractQueue<E>
|
|
|
|
|
putLock.lock();
|
|
|
|
|
takeLock.lock();
|
|
|
|
|
try {
|
|
|
|
|
if (current == null)
|
|
|
|
|
if (current == null) {
|
|
|
|
|
throw new NoSuchElementException();
|
|
|
|
|
}
|
|
|
|
|
E x = currentElement;
|
|
|
|
|
lastRet = current;
|
|
|
|
|
current = current.next;
|
|
|
|
|
if (current != null)
|
|
|
|
|
if (current != null) {
|
|
|
|
|
currentElement = current.item;
|
|
|
|
|
}
|
|
|
|
|
return x;
|
|
|
|
|
} finally {
|
|
|
|
|
takeLock.unlock();
|
|
|
|
@ -714,8 +753,9 @@ public class ResizableCapacityLinkedBlockingQueue<E> extends AbstractQueue<E>
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void remove() {
|
|
|
|
|
if (lastRet == null)
|
|
|
|
|
if (lastRet == null) {
|
|
|
|
|
throw new IllegalStateException();
|
|
|
|
|
}
|
|
|
|
|
final ReentrantLock putLock = ResizableCapacityLinkedBlockingQueue.this.putLock;
|
|
|
|
|
final ReentrantLock takeLock = ResizableCapacityLinkedBlockingQueue.this.takeLock;
|
|
|
|
|
putLock.lock();
|
|
|
|
@ -733,8 +773,9 @@ public class ResizableCapacityLinkedBlockingQueue<E> extends AbstractQueue<E>
|
|
|
|
|
p.item = null;
|
|
|
|
|
trail.next = p.next;
|
|
|
|
|
int c = count.getAndDecrement();
|
|
|
|
|
if (c >= capacity)
|
|
|
|
|
if (c >= capacity) {
|
|
|
|
|
notFull.signalAll();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} finally {
|
|
|
|
|
takeLock.unlock();
|
|
|
|
@ -759,9 +800,9 @@ public class ResizableCapacityLinkedBlockingQueue<E> extends AbstractQueue<E>
|
|
|
|
|
s.defaultWriteObject();
|
|
|
|
|
|
|
|
|
|
// Write out all elements in the proper order.
|
|
|
|
|
for (Node<E> p = head.next; p != null; p = p.next)
|
|
|
|
|
for (Node<E> p = head.next; p != null; p = p.next) {
|
|
|
|
|
s.writeObject(p.item);
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
// Use trailing null as sentinel
|
|
|
|
|
s.writeObject(null);
|
|
|
|
|
} finally {
|
|
|
|
@ -786,8 +827,9 @@ public class ResizableCapacityLinkedBlockingQueue<E> extends AbstractQueue<E>
|
|
|
|
|
for (;;) {
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
|
|
E item = (E) s.readObject();
|
|
|
|
|
if (item == null)
|
|
|
|
|
if (item == null) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
add(item);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|