Class LinkedTransferQueue<E>
- java.lang.Object
-
- java.util.AbstractCollection<E>
-
- java.util.AbstractQueue<E>
-
- com.google.code.yanf4j.util.LinkedTransferQueue<E>
-
- Type Parameters:
E- the type of elements held in this collection
- All Implemented Interfaces:
java.lang.Iterable<E>,java.util.Collection<E>,java.util.concurrent.BlockingQueue<E>,java.util.Queue<E>
- Direct Known Subclasses:
FlowControlLinkedTransferQueue
public class LinkedTransferQueue<E> extends java.util.AbstractQueue<E> implements java.util.concurrent.BlockingQueue<E>An unbounded TransferQueue based on linked nodes. This queue orders elements FIFO (first-in-first-out) with respect to any given producer. The head of the queue is that element that has been on the queue the longest time for some producer. The tail of the queue is that element that has been on the queue the shortest time for some producer.Beware that, unlike in most collections, the size method is NOT a constant-time operation. Because of the asynchronous nature of these queues, determining the current number of elements requires a traversal of the elements.
This class and its iterator implement all of the optional methods of the
CollectionandIteratorinterfaces.Memory consistency effects: As with other concurrent collections, actions in a thread prior to placing an object into a
LinkedTransferQueuehappen-before actions subsequent to the access or removal of that element from theLinkedTransferQueuein another thread.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description (package private) classLinkedTransferQueue.ItrIterators.static classLinkedTransferQueue.PaddedAtomicReference<T>Padded version of AtomicReference used for head, tail and cleanMe, to alleviate contention across threads CASing one vs the other.private static classLinkedTransferQueue.QNodeNode class for LinkedTransferQueue.
-
Field Summary
Fields Modifier and Type Field Description private LinkedTransferQueue.PaddedAtomicReference<LinkedTransferQueue.QNode>cleanMeReference to a cancelled node that might not yet have been unlinked from queue because it was the last inserted node when it cancelled.private LinkedTransferQueue.PaddedAtomicReference<LinkedTransferQueue.QNode>headhead of the queueprivate static intmaxTimedSpinsThe number of times to spin before blocking in timed waits.private static intmaxUntimedSpinsThe number of times to spin before blocking in untimed waits.private static intNCPUSThe number of CPUs, for spin controlprivate static intNOWAITprivate static longspinForTimeoutThresholdThe number of nanoseconds for which it is faster to spin rather than to use timed park.private LinkedTransferQueue.PaddedAtomicReference<LinkedTransferQueue.QNode>tailtail of the queueprivate static intTIMEOUTprivate static intWAIT
-
Constructor Summary
Constructors Constructor Description LinkedTransferQueue()Creates an initially empty LinkedTransferQueue.LinkedTransferQueue(java.util.Collection<? extends E> c)Creates a LinkedTransferQueue initially containing the elements of the given collection, added in traversal order of the collection's iterator.
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description private booleanadvanceHead(LinkedTransferQueue.QNode h, LinkedTransferQueue.QNode nh)Tries to cas nh as new head; if successful, unlink old head's next node to avoid garbage retention.private java.lang.ObjectawaitFulfill(LinkedTransferQueue.QNode pred, LinkedTransferQueue.QNode s, java.lang.Object e, int mode, long nanos)Spins/blocks until node s is fulfilled or caller gives up, depending on wait mode.(package private) Ecast(java.lang.Object e)(package private) voidclean(LinkedTransferQueue.QNode pred, LinkedTransferQueue.QNode s)Gets rid of cancelled node s with original predecessor pred.intdrainTo(java.util.Collection<? super E> c)intdrainTo(java.util.Collection<? super E> c, int maxElements)private java.lang.Objectfulfill(java.lang.Object e)Version of xfer for poll() and tryTransfer, which simplifies control paths both here and in xferprivate LinkedTransferQueue.QNodegetValidatedTail()Returns validated tail for use in cleaning methodsintgetWaitingConsumerCount()booleanhasWaitingConsumer()booleanisEmpty()java.util.Iterator<E>iterator()booleanoffer(E e)booleanoffer(E e, long timeout, java.util.concurrent.TimeUnit unit)Epeek()Epoll()Epoll(long timeout, java.util.concurrent.TimeUnit unit)voidput(E e)private LinkedTransferQueue.QNodereclean()Tries to unsplice the cancelled node held in cleanMe that was previously uncleanable because it was at tail.intremainingCapacity()intsize()Returns the number of elements in this queue.Etake()voidtransfer(E e)(package private) LinkedTransferQueue.QNodetraversalHead()Return head after performing any outstanding helping stepsbooleantryTransfer(E e)booleantryTransfer(E e, long timeout, java.util.concurrent.TimeUnit unit)private java.lang.Objectxfer(java.lang.Object e, int mode, long nanos)Puts or takes an item.-
Methods inherited from class java.util.AbstractCollection
contains, containsAll, remove, removeAll, retainAll, toArray, toArray, toString
-
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
-
-
-
-
Field Detail
-
NOWAIT
private static final int NOWAIT
- See Also:
- Constant Field Values
-
TIMEOUT
private static final int TIMEOUT
- See Also:
- Constant Field Values
-
WAIT
private static final int WAIT
- See Also:
- Constant Field Values
-
NCPUS
private static final int NCPUS
The number of CPUs, for spin control
-
maxTimedSpins
private static final int maxTimedSpins
The number of times to spin before blocking in timed waits. The value is empirically derived -- it works well across a variety of processors and OSes. Empirically, the best value seems not to vary with number of CPUs (beyond 2) so is just a constant.
-
maxUntimedSpins
private static final int maxUntimedSpins
The number of times to spin before blocking in untimed waits. This is greater than timed value because untimed waits spin faster since they don't need to check times on each spin.
-
spinForTimeoutThreshold
private static final long spinForTimeoutThreshold
The number of nanoseconds for which it is faster to spin rather than to use timed park. A rough estimate suffices.- See Also:
- Constant Field Values
-
head
private final LinkedTransferQueue.PaddedAtomicReference<LinkedTransferQueue.QNode> head
head of the queue
-
tail
private final LinkedTransferQueue.PaddedAtomicReference<LinkedTransferQueue.QNode> tail
tail of the queue
-
cleanMe
private final LinkedTransferQueue.PaddedAtomicReference<LinkedTransferQueue.QNode> cleanMe
Reference to a cancelled node that might not yet have been unlinked from queue because it was the last inserted node when it cancelled.
-
-
Constructor Detail
-
LinkedTransferQueue
public LinkedTransferQueue()
Creates an initially empty LinkedTransferQueue.
-
LinkedTransferQueue
public LinkedTransferQueue(java.util.Collection<? extends E> c)
Creates a LinkedTransferQueue initially containing the elements of the given collection, added in traversal order of the collection's iterator.- Parameters:
c- the collection of elements to initially contain- Throws:
java.lang.NullPointerException- if the specified collection or any of its elements are null
-
-
Method Detail
-
advanceHead
private boolean advanceHead(LinkedTransferQueue.QNode h, LinkedTransferQueue.QNode nh)
Tries to cas nh as new head; if successful, unlink old head's next node to avoid garbage retention.
-
xfer
private java.lang.Object xfer(java.lang.Object e, int mode, long nanos)Puts or takes an item. Used for most queue operations (except poll() and tryTransfer()). See the similar code in SynchronousQueue for detailed explanation.- Parameters:
e- the item or if null, signifies that this is a takemode- the wait mode: NOWAIT, TIMEOUT, WAITnanos- timeout in nanosecs, used only if mode is TIMEOUT- Returns:
- an item, or null on failure
-
fulfill
private java.lang.Object fulfill(java.lang.Object e)
Version of xfer for poll() and tryTransfer, which simplifies control paths both here and in xfer
-
awaitFulfill
private java.lang.Object awaitFulfill(LinkedTransferQueue.QNode pred, LinkedTransferQueue.QNode s, java.lang.Object e, int mode, long nanos)
Spins/blocks until node s is fulfilled or caller gives up, depending on wait mode.- Parameters:
pred- the predecessor of waiting nodes- the waiting nodee- the comparison value for checking matchmode- modenanos- timeout value- Returns:
- matched item, or s if cancelled
-
getValidatedTail
private LinkedTransferQueue.QNode getValidatedTail()
Returns validated tail for use in cleaning methods
-
clean
void clean(LinkedTransferQueue.QNode pred, LinkedTransferQueue.QNode s)
Gets rid of cancelled node s with original predecessor pred.- Parameters:
pred- predecessor of cancelled nodes- the cancelled node
-
reclean
private LinkedTransferQueue.QNode reclean()
Tries to unsplice the cancelled node held in cleanMe that was previously uncleanable because it was at tail.- Returns:
- current cleanMe node (or null)
-
cast
E cast(java.lang.Object e)
-
put
public void put(E e) throws java.lang.InterruptedException
- Specified by:
putin interfacejava.util.concurrent.BlockingQueue<E>- Throws:
java.lang.InterruptedException
-
offer
public boolean offer(E e, long timeout, java.util.concurrent.TimeUnit unit) throws java.lang.InterruptedException
- Specified by:
offerin interfacejava.util.concurrent.BlockingQueue<E>- Throws:
java.lang.InterruptedException
-
offer
public boolean offer(E e)
-
transfer
public void transfer(E e) throws java.lang.InterruptedException
- Throws:
java.lang.InterruptedException
-
tryTransfer
public boolean tryTransfer(E e, long timeout, java.util.concurrent.TimeUnit unit) throws java.lang.InterruptedException
- Throws:
java.lang.InterruptedException
-
tryTransfer
public boolean tryTransfer(E e)
-
take
public E take() throws java.lang.InterruptedException
- Specified by:
takein interfacejava.util.concurrent.BlockingQueue<E>- Throws:
java.lang.InterruptedException
-
poll
public E poll(long timeout, java.util.concurrent.TimeUnit unit) throws java.lang.InterruptedException
- Specified by:
pollin interfacejava.util.concurrent.BlockingQueue<E>- Throws:
java.lang.InterruptedException
-
drainTo
public int drainTo(java.util.Collection<? super E> c)
- Specified by:
drainToin interfacejava.util.concurrent.BlockingQueue<E>
-
drainTo
public int drainTo(java.util.Collection<? super E> c, int maxElements)
- Specified by:
drainToin interfacejava.util.concurrent.BlockingQueue<E>
-
traversalHead
LinkedTransferQueue.QNode traversalHead()
Return head after performing any outstanding helping steps
-
iterator
public java.util.Iterator<E> iterator()
-
isEmpty
public boolean isEmpty()
-
hasWaitingConsumer
public boolean hasWaitingConsumer()
-
size
public int size()
Returns the number of elements in this queue. If this queue contains more than Integer.MAX_VALUE elements, returns Integer.MAX_VALUE.Beware that, unlike in most collections, this method is NOT a constant-time operation. Because of the asynchronous nature of these queues, determining the current number of elements requires an O(n) traversal.
-
getWaitingConsumerCount
public int getWaitingConsumerCount()
-
remainingCapacity
public int remainingCapacity()
- Specified by:
remainingCapacityin interfacejava.util.concurrent.BlockingQueue<E>
-
-