Class MulticastProcessor<T>
- java.lang.Object
-
- io.reactivex.rxjava3.core.Flowable<T>
-
- io.reactivex.rxjava3.processors.FlowableProcessor<T>
-
- io.reactivex.rxjava3.processors.MulticastProcessor<T>
-
- Type Parameters:
T- the input and output value type
- All Implemented Interfaces:
FlowableSubscriber<T>,org.reactivestreams.Processor<T,T>,org.reactivestreams.Publisher<T>,org.reactivestreams.Subscriber<T>
@BackpressureSupport(FULL) @SchedulerSupport("none") public final class MulticastProcessor<@NonNull T> extends FlowableProcessor<T>
AFlowableProcessorimplementation that coordinates downstream requests through a front-buffer and stable-prefetching, optionally canceling the upstream if all subscribers have cancelled.
This processor does not have a public constructor by design; a new empty instance of this
MulticastProcessorcan be created via the followingcreatemethods that allow configuring it:create(): create an emptyMulticastProcessorwithFlowable.bufferSize()prefetch amount and no reference counting behavior.create(int): create an emptyMulticastProcessorwith the given prefetch amount and no reference counting behavior.create(boolean): create an emptyMulticastProcessorwithFlowable.bufferSize()prefetch amount and an optional reference counting behavior.create(int, boolean): create an emptyMulticastProcessorwith the given prefetch amount and an optional reference counting behavior.
When the reference counting behavior is enabled, the
MulticastProcessorcancels its upstream when allSubscribers have cancelled. LateSubscribers will then be immediately completed.Because
MulticastProcessorimplements theSubscriberinterface, callingonSubscribeis mandatory (Rule 2.12). IfMulticastProcessorshould run standalone, i.e., without subscribing theMulticastProcessorto anotherPublisher, usestart()orstartUnbounded()methods to initialize the internal buffer. Failing to do so will lead to aNullPointerExceptionat runtime.Use
offer(Object)to try and offer/emit items but don't fail if the internal buffer is full.A
MulticastProcessoris aProcessortype in the Reactive Streams specification,nulls are not allowed (Rule 2.13) as parameters toonSubscribe(Subscription),offer(Object),onNext(Object)andonError(Throwable). Such calls will result in aNullPointerExceptionbeing thrown and the processor's state is not changed.Since a
MulticastProcessoris aFlowable, it supports backpressure. The backpressure from the currently subscribedSubscribers are coordinated by emitting upstream items only if all of thoseSubscribers have requested at least one item. This behavior is also called lockstep-mode because even if someSubscribers can take any number of items, otherSubscribers requesting less or infrequently will slow down the overall throughput of the flow.Calling
onNext(Object),offer(Object),onError(Throwable)andonComplete()is required to be serialized (called from the same thread or called non-overlappingly from different threads through external means of serialization). TheFlowableProcessor.toSerialized()method available to allFlowableProcessors provides such serialization and also protects against reentrance (i.e., when a downstreamSubscriberconsuming this processor also wants to callonNext(Object)on this processor recursively).This
MulticastProcessorsupports the standard state-peeking methodshasComplete(),hasThrowable(),getThrowable()andhasSubscribers(). This processor doesn't allow peeking into its buffer.When this
MulticastProcessoris terminated viaonError(Throwable)oronComplete(), all previously signaled but not yet consumed items will be still available toSubscribers and the respective terminal even is only emitted when all previous items have been successfully delivered toSubscribers. If there are noSubscribers, the remaining items will be buffered indefinitely.The
MulticastProcessordoes not support clearing its cached events (to appear empty again).- Backpressure:
- The backpressure from the currently subscribed
Subscribers are coordinated by emitting upstream items only if all of thoseSubscribers have requested at least one item. This behavior is also called lockstep-mode because even if someSubscribers can take any number of items, otherSubscribers requesting less or infrequently will slow down the overall throughput of the flow. - Scheduler:
MulticastProcessordoes not operate by default on a particularSchedulerand theSubscribers get notified on an arbitrary thread in a serialized fashion.
Example:
MulticastProcessor<Integer> mp = Flowable.range(1, 10) .subscribeWith(MulticastProcessor.create()); mp.test().assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); // -------------------- MulticastProcessor<Integer> mp2 = MulticastProcessor.create(4); mp2.start(); assertTrue(mp2.offer(1)); assertTrue(mp2.offer(2)); assertTrue(mp2.offer(3)); assertTrue(mp2.offer(4)); assertFalse(mp2.offer(5)); mp2.onComplete(); mp2.test().assertResult(1, 2, 3, 4);History: 2.1.14 - experimental
- Since:
- 2.2
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description (package private) static classMulticastProcessor.MulticastSubscription<T>
-
Field Summary
Fields Modifier and Type Field Description (package private) intbufferSize(package private) intconsumed(package private) booleandone(package private) static MulticastProcessor.MulticastSubscription[]EMPTY(package private) java.lang.Throwableerror(package private) intfusionMode(package private) intlimit(package private) SimpleQueue<T>queue(package private) booleanrefcount(package private) java.util.concurrent.atomic.AtomicReference<MulticastProcessor.MulticastSubscription<T>[]>subscribers(package private) static MulticastProcessor.MulticastSubscription[]TERMINATED(package private) java.util.concurrent.atomic.AtomicReference<org.reactivestreams.Subscription>upstream(package private) java.util.concurrent.atomic.AtomicIntegerwip
-
Constructor Summary
Constructors Constructor Description MulticastProcessor(int bufferSize, boolean refCount)Constructs a fresh instance with the given prefetch amount and the optional refCount-behavior.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description (package private) booleanadd(MulticastProcessor.MulticastSubscription<@NonNull T> inner)static <T> @NonNull MulticastProcessor<T>create()Constructs a fresh instance with the default Flowable.bufferSize() prefetch amount and no refCount-behavior.static <T> @NonNull MulticastProcessor<T>create(boolean refCount)Constructs a fresh instance with the default Flowable.bufferSize() prefetch amount and the optional refCount-behavior.static <T> @NonNull MulticastProcessor<T>create(int bufferSize)Constructs a fresh instance with the given prefetch amount and no refCount behavior.static <T> @NonNull MulticastProcessor<T>create(int bufferSize, boolean refCount)Constructs a fresh instance with the given prefetch amount and the optional refCount-behavior.(package private) voiddrain()java.lang.ThrowablegetThrowable()Returns the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor hasn't terminated yet.booleanhasComplete()Returns true if the FlowableProcessor has reached a terminal state through a complete event.booleanhasSubscribers()Returns true if the FlowableProcessor has subscribers.booleanhasThrowable()Returns true if the FlowableProcessor has reached a terminal state through an error event.booleanoffer(@NonNull T t)Tries to offer an item into the internal queue and returns false if the queue is full.voidonComplete()voidonError(@NonNull java.lang.Throwable t)voidonNext(@NonNull T t)voidonSubscribe(@NonNull org.reactivestreams.Subscription s)Implementors of this method should make sure everything that needs to be visible inSubscriber.onNext(Object)is established before callingSubscription.request(long).(package private) voidremove(MulticastProcessor.MulticastSubscription<@NonNull T> inner)voidstart()Initializes this Processor by setting an upstream Subscription that ignores request amounts, uses a fixed buffer and allows using the onXXX and offer methods afterwards.voidstartUnbounded()Initializes this Processor by setting an upstream Subscription that ignores request amounts, uses an unbounded buffer and allows using the onXXX and offer methods afterwards.protected voidsubscribeActual(@NonNull org.reactivestreams.Subscriber<? super @NonNull T> s)Operator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingSubscribers.-
Methods inherited from class io.reactivex.rxjava3.processors.FlowableProcessor
toSerialized
-
Methods inherited from class io.reactivex.rxjava3.core.Flowable
all, amb, ambArray, ambWith, any, blockingFirst, blockingFirst, blockingForEach, blockingForEach, blockingIterable, blockingIterable, blockingLast, blockingLast, blockingLatest, blockingMostRecent, blockingNext, blockingSingle, blockingSingle, blockingStream, blockingStream, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferSize, cache, cacheWithInitialCapacity, cast, collect, collect, collectInto, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatestArray, combineLatestArray, combineLatestArrayDelayError, combineLatestArrayDelayError, combineLatestDelayError, combineLatestDelayError, compose, concat, concat, concat, concat, concat, concat, concatArray, concatArrayDelayError, concatArrayEager, concatArrayEager, concatArrayEagerDelayError, concatArrayEagerDelayError, concatDelayError, concatDelayError, concatDelayError, concatEager, concatEager, concatEager, concatEager, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatMap, concatMap, concatMap, concatMapCompletable, concatMapCompletable, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapEager, concatMapEager, concatMapEagerDelayError, concatMapEagerDelayError, concatMapIterable, concatMapIterable, concatMapMaybe, concatMapMaybe, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapSingle, concatMapSingle, concatMapSingleDelayError, concatMapSingleDelayError, concatMapSingleDelayError, concatMapStream, concatMapStream, concatWith, concatWith, concatWith, concatWith, contains, count, create, debounce, debounce, debounce, debounce, defaultIfEmpty, defer, delay, delay, delay, delay, delay, delay, delaySubscription, delaySubscription, delaySubscription, dematerialize, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterNext, doAfterTerminate, doFinally, doOnCancel, doOnComplete, doOnEach, doOnEach, doOnError, doOnLifecycle, doOnNext, doOnRequest, doOnSubscribe, doOnTerminate, elementAt, elementAt, elementAtOrError, empty, error, error, filter, first, firstElement, firstOrError, firstOrErrorStage, firstStage, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMapCompletable, flatMapCompletable, flatMapIterable, flatMapIterable, flatMapIterable, flatMapIterable, flatMapMaybe, flatMapMaybe, flatMapSingle, flatMapSingle, flatMapStream, flatMapStream, forEach, forEachWhile, forEachWhile, forEachWhile, fromAction, fromArray, fromCallable, fromCompletable, fromCompletionStage, fromFuture, fromFuture, fromIterable, fromMaybe, fromObservable, fromOptional, fromPublisher, fromRunnable, fromSingle, fromStream, fromSupplier, generate, generate, generate, generate, generate, groupBy, groupBy, groupBy, groupBy, groupBy, groupBy, groupJoin, hide, ignoreElements, interval, interval, interval, interval, intervalRange, intervalRange, isEmpty, join, just, just, just, just, just, just, just, just, just, just, last, lastElement, lastOrError, lastOrErrorStage, lastStage, lift, map, mapOptional, materialize, merge, merge, merge, merge, merge, merge, merge, merge, mergeArray, mergeArray, mergeArrayDelayError, mergeArrayDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeWith, mergeWith, mergeWith, mergeWith, never, observeOn, observeOn, observeOn, ofType, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureDrop, onBackpressureDrop, onBackpressureLatest, onBackpressureLatest, onBackpressureReduce, onBackpressureReduce, onErrorComplete, onErrorComplete, onErrorResumeNext, onErrorResumeWith, onErrorReturn, onErrorReturnItem, onTerminateDetach, parallel, parallel, parallel, publish, publish, publish, publish, range, rangeLong, rebatchRequests, reduce, reduce, reduceWith, repeat, repeat, repeatUntil, repeatWhen, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, retry, retry, retry, retry, retry, retryUntil, retryWhen, safeSubscribe, sample, sample, sample, sample, sample, sample, sample, scan, scan, scanWith, sequenceEqual, sequenceEqual, sequenceEqual, sequenceEqual, serialize, share, single, singleElement, singleOrError, singleOrErrorStage, singleStage, skip, skip, skip, skipLast, skipLast, skipLast, skipLast, skipLast, skipLast, skipUntil, skipWhile, sorted, sorted, startWith, startWith, startWith, startWith, startWithArray, startWithItem, startWithIterable, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeOn, subscribeWith, switchIfEmpty, switchMap, switchMap, switchMapCompletable, switchMapCompletableDelayError, switchMapDelayError, switchMapDelayError, switchMapMaybe, switchMapMaybeDelayError, switchMapSingle, switchMapSingleDelayError, switchOnNext, switchOnNext, switchOnNextDelayError, switchOnNextDelayError, take, take, take, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeUntil, takeUntil, takeWhile, test, test, test, throttleFirst, throttleFirst, throttleFirst, throttleLast, throttleLast, throttleLast, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleWithTimeout, throttleWithTimeout, throttleWithTimeout, timeInterval, timeInterval, timeInterval, timeInterval, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timer, timer, timestamp, timestamp, timestamp, timestamp, to, toFuture, toList, toList, toList, toMap, toMap, toMap, toMultimap, toMultimap, toMultimap, toMultimap, toObservable, toSortedList, toSortedList, toSortedList, toSortedList, unsafeCreate, unsubscribeOn, using, using, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipArray, zipWith, zipWith, zipWith, zipWith
-
-
-
-
Field Detail
-
wip
final java.util.concurrent.atomic.AtomicInteger wip
-
upstream
final java.util.concurrent.atomic.AtomicReference<org.reactivestreams.Subscription> upstream
-
subscribers
final java.util.concurrent.atomic.AtomicReference<MulticastProcessor.MulticastSubscription<T>[]> subscribers
-
bufferSize
final int bufferSize
-
limit
final int limit
-
refcount
final boolean refcount
-
queue
volatile SimpleQueue<T> queue
-
done
volatile boolean done
-
error
volatile java.lang.Throwable error
-
consumed
int consumed
-
fusionMode
int fusionMode
-
EMPTY
static final MulticastProcessor.MulticastSubscription[] EMPTY
-
TERMINATED
static final MulticastProcessor.MulticastSubscription[] TERMINATED
-
-
Constructor Detail
-
MulticastProcessor
MulticastProcessor(int bufferSize, boolean refCount)Constructs a fresh instance with the given prefetch amount and the optional refCount-behavior.- Parameters:
bufferSize- the prefetch amountrefCount- if true and if all Subscribers have canceled, the upstream is cancelled
-
-
Method Detail
-
create
@CheckReturnValue @NonNull public static <T> @NonNull MulticastProcessor<T> create()
Constructs a fresh instance with the default Flowable.bufferSize() prefetch amount and no refCount-behavior.- Type Parameters:
T- the input and output value type- Returns:
- the new MulticastProcessor instance
-
create
@CheckReturnValue @NonNull public static <T> @NonNull MulticastProcessor<T> create(boolean refCount)
Constructs a fresh instance with the default Flowable.bufferSize() prefetch amount and the optional refCount-behavior.- Type Parameters:
T- the input and output value type- Parameters:
refCount- if true and if all Subscribers have canceled, the upstream is cancelled- Returns:
- the new MulticastProcessor instance
-
create
@CheckReturnValue @NonNull public static <T> @NonNull MulticastProcessor<T> create(int bufferSize)
Constructs a fresh instance with the given prefetch amount and no refCount behavior.- Type Parameters:
T- the input and output value type- Parameters:
bufferSize- the prefetch amount- Returns:
- the new MulticastProcessor instance
- Throws:
java.lang.IllegalArgumentException- ifbufferSizeis non-positive
-
create
@CheckReturnValue @NonNull public static <T> @NonNull MulticastProcessor<T> create(int bufferSize, boolean refCount)
Constructs a fresh instance with the given prefetch amount and the optional refCount-behavior.- Type Parameters:
T- the input and output value type- Parameters:
bufferSize- the prefetch amountrefCount- if true and if all Subscribers have canceled, the upstream is cancelled- Returns:
- the new MulticastProcessor instance
- Throws:
java.lang.IllegalArgumentException- ifbufferSizeis non-positive
-
start
public void start()
Initializes this Processor by setting an upstream Subscription that ignores request amounts, uses a fixed buffer and allows using the onXXX and offer methods afterwards.
-
startUnbounded
public void startUnbounded()
Initializes this Processor by setting an upstream Subscription that ignores request amounts, uses an unbounded buffer and allows using the onXXX and offer methods afterwards.
-
onSubscribe
public void onSubscribe(@NonNull @NonNull org.reactivestreams.Subscription s)
Description copied from interface:FlowableSubscriberImplementors of this method should make sure everything that needs to be visible inSubscriber.onNext(Object)is established before callingSubscription.request(long). In practice this means no initialization should happen after therequest()call and additional behavior is thread safe in respect toonNext.
-
offer
@CheckReturnValue public boolean offer(@NonNull @NonNull T t)
Tries to offer an item into the internal queue and returns false if the queue is full.- Parameters:
t- the item to offer, notnull- Returns:
- true if successful, false if the queue is full
- Throws:
java.lang.NullPointerException- iftisnulljava.lang.IllegalStateException- if the processor is in fusion mode
-
onComplete
public void onComplete()
-
hasSubscribers
@CheckReturnValue public boolean hasSubscribers()
Description copied from class:FlowableProcessorReturns true if the FlowableProcessor has subscribers.The method is thread-safe.
- Specified by:
hasSubscribersin classFlowableProcessor<T>- Returns:
- true if the FlowableProcessor has subscribers
-
hasThrowable
@CheckReturnValue public boolean hasThrowable()
Description copied from class:FlowableProcessorReturns true if the FlowableProcessor has reached a terminal state through an error event.The method is thread-safe.
- Specified by:
hasThrowablein classFlowableProcessor<T>- Returns:
- true if the FlowableProcessor has reached a terminal state through an error event
- See Also:
FlowableProcessor.getThrowable(),FlowableProcessor.hasComplete()
-
hasComplete
@CheckReturnValue public boolean hasComplete()
Description copied from class:FlowableProcessorReturns true if the FlowableProcessor has reached a terminal state through a complete event.The method is thread-safe.
- Specified by:
hasCompletein classFlowableProcessor<T>- Returns:
- true if the FlowableProcessor has reached a terminal state through a complete event
- See Also:
FlowableProcessor.hasThrowable()
-
getThrowable
@CheckReturnValue public java.lang.Throwable getThrowable()
Description copied from class:FlowableProcessorReturns the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor hasn't terminated yet.The method is thread-safe.
- Specified by:
getThrowablein classFlowableProcessor<T>- Returns:
- the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor hasn't terminated yet
-
subscribeActual
protected void subscribeActual(@NonNull @NonNull org.reactivestreams.Subscriber<? super @NonNull T> s)
Description copied from class:FlowableOperator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingSubscribers.There is no need to call any of the plugin hooks on the current
Flowableinstance or theSubscriber; all hooks and basic safeguards have been applied byFlowable.subscribe(Subscriber)before this method gets called.- Specified by:
subscribeActualin classFlowable<T>- Parameters:
s- the incomingSubscriber, nevernull
-
add
boolean add(MulticastProcessor.MulticastSubscription<@NonNull T> inner)
-
remove
void remove(MulticastProcessor.MulticastSubscription<@NonNull T> inner)
-
drain
void drain()
-
-