Class DisruptorBlockingQueue<E>
java.lang.Object
com.conversantmedia.util.concurrent.MultithreadConcurrentQueue<E>
com.conversantmedia.util.concurrent.DisruptorBlockingQueue<E>
- All Implemented Interfaces:
ConcurrentQueue<E>, Serializable, Iterable<E>, Collection<E>, BlockingQueue<E>, Queue<E>
public final class DisruptorBlockingQueue<E>
extends MultithreadConcurrentQueue<E>
implements Serializable, Iterable<E>, Collection<E>, BlockingQueue<E>, Queue<E>
This is a lock free blocking queue that implements
a fixed length queue backed by a ring buffer. Access to the ring buffer
is sequenced by iterating a pair of atomic sequence numbers. One is
for the head and another for the tail.
When a particular thread would like to append to the queue, it obtains the
sequence number for the tail. When the thread is ready to commit changes,
a machine compare and set is used to prove that the sequence number matches
the expected value. In other words, no other thread has modified the sequence.
If the sequence number does not match, the operation fails. If the
sequence number matches expectation the thread can continue to operate
on the queue's ring buffer without contention. This check cleverly
avoids any synchronization thus the moniker "lock free." The lack
of synchronization results in significant performance advantages.
For consumers, access to the back of the ring is controlled by a memory
barrier mechanism, namely the "volatile" keyword. Spin locks are employed
to ensure the ring tail cursor is up to date prior to updating it. Once the
ring cursor is updated, the reader/consumer can be assured that there
is data available to read. The consumer thread then employs a
mechanism similar to the producer to validate access to the ring.
A sequence number for the head of the ring is obtained and when
the reader would like to commit the change to the buffer it
uses the machine compare and set to prove that no other thread
has modified the ring in the interim.
This pattern of access is roughly an order of magnitude faster than ArrayBlockingQueue.
It is roughly 2x faster than LinkedTransferQueue for similar operations/conditions.
Given that LinkedTransferQueue is "state of the art" in terms of Java performance,
it is clear that the Disruptor mechanism offers advantages over other
strategies.
The only memory allocation in this object occurs at object creation and in the clone
and drainTo methods. Otherwise, no garbage collection will ever be triggered by
calls to the disruptor queue.
The drainTo method implements an efficient "batch" mechanism, and may be
used to safely claim all of the available queue entries. Drain will not
perform as well when it is dealing with contention from other reader threads.
Overall the disruptor pattern is weak in dealing with massive thread contention,
however efforts have been made to deal with that case here. As always,
one should test their intended strategy.
- See Also:
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionprivate final classprivate final classprivate final classprivate final classprivate final classprivate final classprivate final class -
Field Summary
Fields -
Constructor Summary
ConstructorsConstructorDescriptionDisruptorBlockingQueue(int capacity) Construct a blocking queue of the given fixed capacity.DisruptorBlockingQueue(int capacity, SpinPolicy spinPolicy) Construct a blocking queue with a given fixed capacityDisruptorBlockingQueue(int capacity, Collection<? extends E> c) Construct a blocking queue of the given fixed capacity -
Method Summary
Modifier and TypeMethodDescriptionbooleanbooleanaddAll(Collection<? extends E> c) voidclear()clear the queue of all elementsbooleancontainsAll(Collection<?> c) intdrainTo(Collection<? super E> c) intdrainTo(Collection<? super E> c, int maxElements) element()private booleanisFull()iterator()final booleanAdd element t to the ringbooleanfinal Epoll()remove the first element from the queue and return itvoidintremove()intreturn all elements in the queue to the provided array, up to the size of the provided array.booleanProvided for compatibility with the BlockingQueue interface only.booleanremoveAll(Collection<?> c) booleanretainAll(Collection<?> c) take()Object[]toArray()<T> T[]toArray(T[] a) Methods inherited from class MultithreadConcurrentQueue
capacity, contains, isEmpty, peek, size, sumToAvoidOptimizationMethods inherited from class Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface BlockingQueue
containsMethods inherited from interface Collection
equals, hashCode, isEmpty, parallelStream, removeIf, size, spliterator, stream, toArray
-
Field Details
-
queueNotFullCondition
-
queueNotEmptyCondition
-
-
Constructor Details
-
DisruptorBlockingQueue
public DisruptorBlockingQueue(int capacity) Construct a blocking queue of the given fixed capacity.
Note: actual capacity will be the next power of two larger than capacity.- Parameters:
capacity- maximum capacity of this queue
-
DisruptorBlockingQueue
Construct a blocking queue with a given fixed capacity
Note: actual capacity will be the next power of two larger than capacity. Waiting locking may be used in servers that are tuned for it, waiting locking provides a high performance locking implementation which is approximately a factor of 2 improvement in throughput (40M/s for 1-1 thread transfers) However waiting locking is more CPU aggressive and causes servers that may be configured with far too many threads to show very high load averages. This is probably not as detrimental as it is annoying.- Parameters:
capacity- - the queue capacity, suggest using a power of 2spinPolicy- - determine the level of cpu aggressiveness in waiting
-
DisruptorBlockingQueue
Construct a blocking queue of the given fixed capacity
Note: actual capacity will be the next power of two larger than capacity.
The values from the collection, c, are appended to the queue in iteration order. If the number of elements in the collection exceeds the actual capacity, then the additional elements overwrite the previous ones until all elements have been written once.- Parameters:
capacity- maximum capacity of this queuec- A collection to use to populate inital values
-
-
Method Details
-
offer
Description copied from interface:ConcurrentQueueAdd element t to the ring- Specified by:
offerin interfaceBlockingQueue<E>- Specified by:
offerin interfaceConcurrentQueue<E>- Specified by:
offerin interfaceQueue<E>- Overrides:
offerin classMultithreadConcurrentQueue<E>- Parameters:
e- - element to offer- Returns:
- boolean - true if the operation succeeded
-
poll
Description copied from interface:ConcurrentQueueremove the first element from the queue and return it- Specified by:
pollin interfaceConcurrentQueue<E>- Specified by:
pollin interfaceQueue<E>- Overrides:
pollin classMultithreadConcurrentQueue<E>- Returns:
- T
-
remove
Description copied from interface:ConcurrentQueuereturn all elements in the queue to the provided array, up to the size of the provided array.- Specified by:
removein interfaceConcurrentQueue<E>- Overrides:
removein classMultithreadConcurrentQueue<E>- Parameters:
e- - The element array- Returns:
- int - the number of elements added to t
-
remove
-
element
-
put
- Specified by:
putin interfaceBlockingQueue<E>- Throws:
InterruptedException
-
offer
- Specified by:
offerin interfaceBlockingQueue<E>- Throws:
InterruptedException
-
take
- Specified by:
takein interfaceBlockingQueue<E>- Throws:
InterruptedException
-
poll
- Specified by:
pollin interfaceBlockingQueue<E>- Throws:
InterruptedException
-
clear
public void clear()Description copied from interface:ConcurrentQueueclear the queue of all elements- Specified by:
clearin interfaceCollection<E>- Specified by:
clearin interfaceConcurrentQueue<E>- Overrides:
clearin classMultithreadConcurrentQueue<E>
-
remainingCapacity
public int remainingCapacity()- Specified by:
remainingCapacityin interfaceBlockingQueue<E>
-
drainTo
- Specified by:
drainToin interfaceBlockingQueue<E>
-
drainTo
- Specified by:
drainToin interfaceBlockingQueue<E>
-
toArray
- Specified by:
toArrayin interfaceCollection<E>
-
toArray
public <T> T[] toArray(T[] a) - Specified by:
toArrayin interfaceCollection<E>
-
add
- Specified by:
addin interfaceBlockingQueue<E>- Specified by:
addin interfaceCollection<E>- Specified by:
addin interfaceQueue<E>
-
remove
Provided for compatibility with the BlockingQueue interface only.
This interface has been fixed to be properly concurrent, but will block the entire queue, it should not be used!- Specified by:
removein interfaceBlockingQueue<E>- Specified by:
removein interfaceCollection<E>
-
containsAll
- Specified by:
containsAllin interfaceCollection<E>
-
addAll
- Specified by:
addAllin interfaceCollection<E>
-
removeAll
- Specified by:
removeAllin interfaceCollection<E>
-
retainAll
- Specified by:
retainAllin interfaceCollection<E>
-
iterator
-
isFull
private boolean isFull()
-