Class UnicastProcessor<T>
- java.lang.Object
-
- io.reactivex.rxjava3.core.Flowable<T>
-
- io.reactivex.rxjava3.processors.FlowableProcessor<T>
-
- io.reactivex.rxjava3.processors.UnicastProcessor<T>
-
- Type Parameters:
T- the value type received and emitted by this Processor subclass
- All Implemented Interfaces:
FlowableSubscriber<T>,org.reactivestreams.Processor<T,T>,org.reactivestreams.Publisher<T>,org.reactivestreams.Subscriber<T>
public final class UnicastProcessor<@NonNull T> extends FlowableProcessor<T>
AFlowableProcessorvariant that queues up events until a singleSubscribersubscribes to it, replays those events to it until theSubscribercatches up and then switches to relaying events live to this singleSubscriberuntil thisUnicastProcessorterminates or theSubscribercancels its subscription.
This processor does not have a public constructor by design; a new empty instance of this
UnicastProcessorcan be created via the followingcreatemethods that allow specifying the retention policy for items:create()- creates an empty, unboundedUnicastProcessorthat caches all items and the terminal event it receives.create(int)- creates an empty, unboundedUnicastProcessorwith a hint about how many total items one expects to retain.create(boolean)- creates an empty, unboundedUnicastProcessorthat optionally delays an error it receives and replays it after the regular items have been emitted.create(int, Runnable)- creates an empty, unboundedUnicastProcessorwith a hint about how many total items one expects to retain and a callback that will be called exactly once when theUnicastProcessorgets terminated or the singleSubscribercancels.create(int, Runnable, boolean)- creates an empty, unboundedUnicastProcessorwith a hint about how many total items one expects to retain and a callback that will be called exactly once when theUnicastProcessorgets terminated or the singleSubscribercancels and optionally delays an error it receives and replays it after the regular items have been emitted.
If more than one
Subscriberattempts to subscribe to this Processor, they will receive anIllegalStateExceptionif thisUnicastProcessorhasn't terminated yet, or the Subscribers receive the terminal event (error or completion) if this Processor has terminated.The
UnicastProcessorbuffers notifications and replays them to the singleSubscriberas requested, for which it holds upstream items an unbounded internal buffer until they can be emitted.Since a
UnicastProcessoris a Reactive StreamsProcessor,nulls are not allowed (Rule 2.13) as parameters toonNext(Object)andonError(Throwable). Such calls will result in aNullPointerExceptionbeing thrown and the processor's state is not changed.Since a
UnicastProcessoris aFlowableas well as aFlowableProcessor, it honors the downstream backpressure but consumes an upstream source in an unbounded manner (requestingLong.MAX_VALUE).When this
UnicastProcessoris terminated viaonError(Throwable)the current or late singleSubscribermay receive theThrowablebefore any available items could be emitted. To make sure anonErrorevent is delivered to theSubscriberafter the normal items, create aUnicastProcessorwith thecreate(boolean)orcreate(int, Runnable, boolean)factory methods.Even though
UnicastProcessorimplements theSubscriberinterface, callingonSubscribeis not required (Rule 2.12) if the processor is used as a standalone source. However, callingonSubscribeafter theUnicastProcessorreached its terminal state will result in the givenSubscriptionbeing canceled immediately.Calling
onNext(Object),onError(Throwable)andonComplete()is required to be serialized (called from the same thread or called non-overlappingly from different threads through external means of serialization). TheFlowableProcessor.toSerialized()method available to allFlowableProcessors provides such serialization and also protects against reentrance (i.e., when a downstreamSubscriberconsuming this processor also wants to callonNext(Object)on this processor recursively).This
UnicastProcessorsupports the standard state-peeking methodshasComplete(),hasThrowable(),getThrowable()andhasSubscribers().- Backpressure:
UnicastProcessorhonors the downstream backpressure but consumes an upstream source (if any) in an unbounded manner (requestingLong.MAX_VALUE).- Scheduler:
UnicastProcessordoes not operate by default on a particularSchedulerand the singleSubscribergets notified on the thread the respectiveonXXXmethods were invoked.- Error handling:
- When the
onError(Throwable)is called, theUnicastProcessorenters into a terminal state and emits the sameThrowableinstance to the current singleSubscriber. During this emission, if the singleSubscribers cancels its respectiveSubscriptions, theThrowableis delivered to the global error handler viaRxJavaPlugins.onError(Throwable). If there were noSubscribers subscribed to thisUnicastProcessorwhen theonError()was called, the global error handler is not invoked.
Example usage:
UnicastProcessor<Integer> processor = UnicastProcessor.create(); TestSubscriber<Integer> ts1 = processor.test(); // fresh UnicastProcessors are empty ts1.assertEmpty(); TestSubscriber<Integer> ts2 = processor.test(); // A UnicastProcessor only allows one Subscriber during its lifetime ts2.assertFailure(IllegalStateException.class); processor.onNext(1); ts1.assertValue(1); processor.onNext(2); ts1.assertValues(1, 2); processor.onComplete(); ts1.assertResult(1, 2); // ---------------------------------------------------- UnicastProcessor<Integer> processor2 = UnicastProcessor.create(); // a UnicastProcessor caches events until its single Subscriber subscribes processor2.onNext(1); processor2.onNext(2); processor2.onComplete(); TestSubscriber<Integer> ts3 = processor2.test(); // the cached events are emitted in order ts3.assertResult(1, 2);- Since:
- 2.0
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description (package private) classUnicastProcessor.UnicastQueueSubscription
-
Field Summary
Fields Modifier and Type Field Description (package private) booleancancelled(package private) booleandelayError(package private) booleandone(package private) java.util.concurrent.atomic.AtomicReference<org.reactivestreams.Subscriber<? super T>>downstream(package private) booleanenableOperatorFusion(package private) java.lang.Throwableerror(package private) java.util.concurrent.atomic.AtomicBooleanonce(package private) java.util.concurrent.atomic.AtomicReference<java.lang.Runnable>onTerminate(package private) SpscLinkedArrayQueue<T>queue(package private) java.util.concurrent.atomic.AtomicLongrequested(package private) BasicIntQueueSubscription<T>wip
-
Constructor Summary
Constructors Constructor Description UnicastProcessor(int capacityHint, java.lang.Runnable onTerminate, boolean delayError)Creates an UnicastProcessor with the given capacity hint and callback for when the Processor is terminated normally or its single Subscriber cancels.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description (package private) booleancheckTerminated(boolean failFast, boolean d, boolean empty, org.reactivestreams.Subscriber<? super @NonNull T> a, SpscLinkedArrayQueue<@NonNull T> q)static <T> @NonNull UnicastProcessor<T>create()Creates an UnicastSubject with an internal buffer capacity hint 16.static <T> @NonNull UnicastProcessor<T>create(boolean delayError)Creates an UnicastProcessor with default internal buffer capacity hint and delay error flag.static <T> @NonNull UnicastProcessor<T>create(int capacityHint)Creates an UnicastProcessor with the given internal buffer capacity hint.static <T> @NonNull UnicastProcessor<T>create(int capacityHint, @NonNull java.lang.Runnable onTerminate)Creates an UnicastProcessor with the given internal buffer capacity hint and a callback for the case when the single Subscriber cancels its subscription or the processor is terminated.static <T> @NonNull UnicastProcessor<T>create(int capacityHint, @NonNull java.lang.Runnable onTerminate, boolean delayError)Creates an UnicastProcessor with the given internal buffer capacity hint, delay error flag and a callback for the case when the single Subscriber cancels its subscription or the processor is terminated.(package private) voiddoTerminate()(package private) voiddrain()(package private) voiddrainFused(org.reactivestreams.Subscriber<? super @NonNull T> a)(package private) voiddrainRegular(org.reactivestreams.Subscriber<? super @NonNull T> a)@Nullable java.lang.ThrowablegetThrowable()Returns the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor hasn't terminated yet.booleanhasComplete()Returns true if the FlowableProcessor has reached a terminal state through a complete event.booleanhasSubscribers()Returns true if the FlowableProcessor has subscribers.booleanhasThrowable()Returns true if the FlowableProcessor has reached a terminal state through an error event.voidonComplete()voidonError(java.lang.Throwable t)voidonNext(@NonNull T t)voidonSubscribe(org.reactivestreams.Subscription s)Implementors of this method should make sure everything that needs to be visible inSubscriber.onNext(Object)is established before callingSubscription.request(long).protected voidsubscribeActual(org.reactivestreams.Subscriber<? super @NonNull T> s)Operator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingSubscribers.-
Methods inherited from class io.reactivex.rxjava3.processors.FlowableProcessor
toSerialized
-
Methods inherited from class io.reactivex.rxjava3.core.Flowable
all, amb, ambArray, ambWith, any, blockingFirst, blockingFirst, blockingForEach, blockingForEach, blockingIterable, blockingIterable, blockingLast, blockingLast, blockingLatest, blockingMostRecent, blockingNext, blockingSingle, blockingSingle, blockingStream, blockingStream, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferSize, cache, cacheWithInitialCapacity, cast, collect, collect, collectInto, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatestArray, combineLatestArray, combineLatestArrayDelayError, combineLatestArrayDelayError, combineLatestDelayError, combineLatestDelayError, compose, concat, concat, concat, concat, concat, concat, concatArray, concatArrayDelayError, concatArrayEager, concatArrayEager, concatArrayEagerDelayError, concatArrayEagerDelayError, concatDelayError, concatDelayError, concatDelayError, concatEager, concatEager, concatEager, concatEager, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatMap, concatMap, concatMap, concatMapCompletable, concatMapCompletable, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapEager, concatMapEager, concatMapEagerDelayError, concatMapEagerDelayError, concatMapIterable, concatMapIterable, concatMapMaybe, concatMapMaybe, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapSingle, concatMapSingle, concatMapSingleDelayError, concatMapSingleDelayError, concatMapSingleDelayError, concatMapStream, concatMapStream, concatWith, concatWith, concatWith, concatWith, contains, count, create, debounce, debounce, debounce, debounce, defaultIfEmpty, defer, delay, delay, delay, delay, delay, delay, delaySubscription, delaySubscription, delaySubscription, dematerialize, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterNext, doAfterTerminate, doFinally, doOnCancel, doOnComplete, doOnEach, doOnEach, doOnError, doOnLifecycle, doOnNext, doOnRequest, doOnSubscribe, doOnTerminate, elementAt, elementAt, elementAtOrError, empty, error, error, filter, first, firstElement, firstOrError, firstOrErrorStage, firstStage, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMapCompletable, flatMapCompletable, flatMapIterable, flatMapIterable, flatMapIterable, flatMapIterable, flatMapMaybe, flatMapMaybe, flatMapSingle, flatMapSingle, flatMapStream, flatMapStream, forEach, forEachWhile, forEachWhile, forEachWhile, fromAction, fromArray, fromCallable, fromCompletable, fromCompletionStage, fromFuture, fromFuture, fromIterable, fromMaybe, fromObservable, fromOptional, fromPublisher, fromRunnable, fromSingle, fromStream, fromSupplier, generate, generate, generate, generate, generate, groupBy, groupBy, groupBy, groupBy, groupBy, groupBy, groupJoin, hide, ignoreElements, interval, interval, interval, interval, intervalRange, intervalRange, isEmpty, join, just, just, just, just, just, just, just, just, just, just, last, lastElement, lastOrError, lastOrErrorStage, lastStage, lift, map, mapOptional, materialize, merge, merge, merge, merge, merge, merge, merge, merge, mergeArray, mergeArray, mergeArrayDelayError, mergeArrayDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeWith, mergeWith, mergeWith, mergeWith, never, observeOn, observeOn, observeOn, ofType, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureDrop, onBackpressureDrop, onBackpressureLatest, onBackpressureLatest, onBackpressureReduce, onBackpressureReduce, onErrorComplete, onErrorComplete, onErrorResumeNext, onErrorResumeWith, onErrorReturn, onErrorReturnItem, onTerminateDetach, parallel, parallel, parallel, publish, publish, publish, publish, range, rangeLong, rebatchRequests, reduce, reduce, reduceWith, repeat, repeat, repeatUntil, repeatWhen, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, retry, retry, retry, retry, retry, retryUntil, retryWhen, safeSubscribe, sample, sample, sample, sample, sample, sample, sample, scan, scan, scanWith, sequenceEqual, sequenceEqual, sequenceEqual, sequenceEqual, serialize, share, single, singleElement, singleOrError, singleOrErrorStage, singleStage, skip, skip, skip, skipLast, skipLast, skipLast, skipLast, skipLast, skipLast, skipUntil, skipWhile, sorted, sorted, startWith, startWith, startWith, startWith, startWithArray, startWithItem, startWithIterable, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeOn, subscribeWith, switchIfEmpty, switchMap, switchMap, switchMapCompletable, switchMapCompletableDelayError, switchMapDelayError, switchMapDelayError, switchMapMaybe, switchMapMaybeDelayError, switchMapSingle, switchMapSingleDelayError, switchOnNext, switchOnNext, switchOnNextDelayError, switchOnNextDelayError, take, take, take, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeUntil, takeUntil, takeWhile, test, test, test, throttleFirst, throttleFirst, throttleFirst, throttleLast, throttleLast, throttleLast, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleWithTimeout, throttleWithTimeout, throttleWithTimeout, timeInterval, timeInterval, timeInterval, timeInterval, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timer, timer, timestamp, timestamp, timestamp, timestamp, to, toFuture, toList, toList, toList, toMap, toMap, toMap, toMultimap, toMultimap, toMultimap, toMultimap, toObservable, toSortedList, toSortedList, toSortedList, toSortedList, unsafeCreate, unsubscribeOn, using, using, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipArray, zipWith, zipWith, zipWith, zipWith
-
-
-
-
Field Detail
-
queue
final SpscLinkedArrayQueue<T> queue
-
onTerminate
final java.util.concurrent.atomic.AtomicReference<java.lang.Runnable> onTerminate
-
delayError
final boolean delayError
-
done
volatile boolean done
-
error
java.lang.Throwable error
-
downstream
final java.util.concurrent.atomic.AtomicReference<org.reactivestreams.Subscriber<? super T>> downstream
-
cancelled
volatile boolean cancelled
-
once
final java.util.concurrent.atomic.AtomicBoolean once
-
wip
final BasicIntQueueSubscription<T> wip
-
requested
final java.util.concurrent.atomic.AtomicLong requested
-
enableOperatorFusion
boolean enableOperatorFusion
-
-
Constructor Detail
-
UnicastProcessor
UnicastProcessor(int capacityHint, java.lang.Runnable onTerminate, boolean delayError)Creates an UnicastProcessor with the given capacity hint and callback for when the Processor is terminated normally or its single Subscriber cancels.History: 2.0.8 - experimental
- Parameters:
capacityHint- the capacity hint for the internal, unbounded queueonTerminate- the callback to run when the Processor is terminated or cancelled, null not alloweddelayError- deliver pending onNext events before onError- Since:
- 2.2
-
-
Method Detail
-
create
@CheckReturnValue @NonNull public static <T> @NonNull UnicastProcessor<T> create()
Creates an UnicastSubject with an internal buffer capacity hint 16.- Type Parameters:
T- the value type- Returns:
- an UnicastSubject instance
-
create
@CheckReturnValue @NonNull public static <T> @NonNull UnicastProcessor<T> create(int capacityHint)
Creates an UnicastProcessor with the given internal buffer capacity hint.- Type Parameters:
T- the value type- Parameters:
capacityHint- the hint to size the internal unbounded buffer- Returns:
- an UnicastProcessor instance
- Throws:
java.lang.IllegalArgumentException- ifcapacityHintis non-positive
-
create
@CheckReturnValue @NonNull public static <T> @NonNull UnicastProcessor<T> create(boolean delayError)
Creates an UnicastProcessor with default internal buffer capacity hint and delay error flag.History: 2.0.8 - experimental
- Type Parameters:
T- the value type- Parameters:
delayError- deliver pending onNext events before onError- Returns:
- an UnicastProcessor instance
- Since:
- 2.2
-
create
@CheckReturnValue @NonNull public static <T> @NonNull UnicastProcessor<T> create(int capacityHint, @NonNull @NonNull java.lang.Runnable onTerminate)
Creates an UnicastProcessor with the given internal buffer capacity hint and a callback for the case when the single Subscriber cancels its subscription or the processor is terminated.The callback, if not null, is called exactly once and non-overlapped with any active replay.
- Type Parameters:
T- the value type- Parameters:
capacityHint- the hint to size the internal unbounded bufferonTerminate- the non null callback- Returns:
- an UnicastProcessor instance
- Throws:
java.lang.NullPointerException- ifonTerminateisnulljava.lang.IllegalArgumentException- ifcapacityHintis non-positive
-
create
@CheckReturnValue @NonNull public static <T> @NonNull UnicastProcessor<T> create(int capacityHint, @NonNull @NonNull java.lang.Runnable onTerminate, boolean delayError)
Creates an UnicastProcessor with the given internal buffer capacity hint, delay error flag and a callback for the case when the single Subscriber cancels its subscription or the processor is terminated.The callback, if not null, is called exactly once and non-overlapped with any active replay.
History: 2.0.8 - experimental
- Type Parameters:
T- the value type- Parameters:
capacityHint- the hint to size the internal unbounded bufferonTerminate- the non null callbackdelayError- deliver pending onNext events before onError- Returns:
- an UnicastProcessor instance
- Throws:
java.lang.NullPointerException- ifonTerminateisnulljava.lang.IllegalArgumentException- ifcapacityHintis non-positive- Since:
- 2.2
-
doTerminate
void doTerminate()
-
drain
void drain()
-
checkTerminated
boolean checkTerminated(boolean failFast, boolean d, boolean empty, org.reactivestreams.Subscriber<? super @NonNull T> a, SpscLinkedArrayQueue<@NonNull T> q)
-
onSubscribe
public void onSubscribe(org.reactivestreams.Subscription s)
Description copied from interface:FlowableSubscriberImplementors of this method should make sure everything that needs to be visible inSubscriber.onNext(Object)is established before callingSubscription.request(long). In practice this means no initialization should happen after therequest()call and additional behavior is thread safe in respect toonNext.
-
onError
public void onError(java.lang.Throwable t)
-
onComplete
public void onComplete()
-
subscribeActual
protected void subscribeActual(org.reactivestreams.Subscriber<? super @NonNull T> s)
Description copied from class:FlowableOperator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingSubscribers.There is no need to call any of the plugin hooks on the current
Flowableinstance or theSubscriber; all hooks and basic safeguards have been applied byFlowable.subscribe(Subscriber)before this method gets called.- Specified by:
subscribeActualin classFlowable<T>- Parameters:
s- the incomingSubscriber, nevernull
-
hasSubscribers
@CheckReturnValue public boolean hasSubscribers()
Description copied from class:FlowableProcessorReturns true if the FlowableProcessor has subscribers.The method is thread-safe.
- Specified by:
hasSubscribersin classFlowableProcessor<T>- Returns:
- true if the FlowableProcessor has subscribers
-
getThrowable
@Nullable @CheckReturnValue public @Nullable java.lang.Throwable getThrowable()
Description copied from class:FlowableProcessorReturns the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor hasn't terminated yet.The method is thread-safe.
- Specified by:
getThrowablein classFlowableProcessor<T>- Returns:
- the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor hasn't terminated yet
-
hasComplete
@CheckReturnValue public boolean hasComplete()
Description copied from class:FlowableProcessorReturns true if the FlowableProcessor has reached a terminal state through a complete event.The method is thread-safe.
- Specified by:
hasCompletein classFlowableProcessor<T>- Returns:
- true if the FlowableProcessor has reached a terminal state through a complete event
- See Also:
FlowableProcessor.hasThrowable()
-
hasThrowable
@CheckReturnValue public boolean hasThrowable()
Description copied from class:FlowableProcessorReturns true if the FlowableProcessor has reached a terminal state through an error event.The method is thread-safe.
- Specified by:
hasThrowablein classFlowableProcessor<T>- Returns:
- true if the FlowableProcessor has reached a terminal state through an error event
- See Also:
FlowableProcessor.getThrowable(),FlowableProcessor.hasComplete()
-
-