Class UnicastSubject<T>
- java.lang.Object
-
- io.reactivex.rxjava3.core.Observable<T>
-
- io.reactivex.rxjava3.subjects.Subject<T>
-
- io.reactivex.rxjava3.subjects.UnicastSubject<T>
-
- Type Parameters:
T- the value type received and emitted by this Subject subclass
- All Implemented Interfaces:
ObservableSource<T>,Observer<T>
public final class UnicastSubject<T> extends Subject<T>
A Subject that queues up events until a singleObserversubscribes to it, replays those events to it until theObservercatches up and then switches to relaying events live to this singleObserveruntil thisUnicastSubjectterminates or theObserverdisposes.
Note that
UnicastSubjectholds an unbounded internal buffer.This subject does not have a public constructor by design; a new empty instance of this
UnicastSubjectcan be created via the followingcreatemethods that allow specifying the retention policy for items:create()- creates an empty, unboundedUnicastSubjectthat caches all items and the terminal event it receives.create(int)- creates an empty, unboundedUnicastSubjectwith a hint about how many total items one expects to retain.create(boolean)- creates an empty, unboundedUnicastSubjectthat optionally delays an error it receives and replays it after the regular items have been emitted.create(int, Runnable)- creates an empty, unboundedUnicastSubjectwith a hint about how many total items one expects to retain and a callback that will be called exactly once when theUnicastSubjectgets terminated or the singleObserverdisposes.create(int, Runnable, boolean)- creates an empty, unboundedUnicastSubjectwith a hint about how many total items one expects to retain and a callback that will be called exactly once when theUnicastSubjectgets terminated or the singleObserverdisposes and optionally delays an error it receives and replays it after the regular items have been emitted.
If more than one
Observerattempts to subscribe to thisUnicastSubject, they will receive anIllegalStateExceptionindicating the single-use-only nature of thisUnicastSubject, even if theUnicastSubjectalready terminated with an error.Since a
Subjectis conceptionally derived from theProcessortype in the Reactive Streams specification,nulls are not allowed (Rule 2.13) as parameters toonNext(Object)andonError(Throwable). Such calls will result in aNullPointerExceptionbeing thrown and the subject's state is not changed.Since a
UnicastSubjectis anObservable, it does not support backpressure.When this
UnicastSubjectis terminated viaonError(Throwable)the current or late singleObservermay receive theThrowablebefore any available items could be emitted. To make sure an onError event is delivered to theObserverafter the normal items, create aUnicastSubjectwith thecreate(boolean)orcreate(int, Runnable, boolean)factory methods.Even though
UnicastSubjectimplements theObserverinterface, callingonSubscribeis not required (Rule 2.12) if the subject is used as a standalone source. However, callingonSubscribeafter theUnicastSubjectreached its terminal state will result in the givenDisposablebeing disposed immediately.Calling
onNext(Object),onError(Throwable)andonComplete()is required to be serialized (called from the same thread or called non-overlappingly from different threads through external means of serialization). TheSubject.toSerialized()method available to allSubjects provides such serialization and also protects against reentrance (i.e., when a downstreamObserverconsuming this subject also wants to callonNext(Object)on this subject recursively).This
UnicastSubjectsupports the standard state-peeking methodshasComplete(),hasThrowable(),getThrowable()andhasObservers().- Scheduler:
UnicastSubjectdoes not operate by default on a particularSchedulerand the singleObservergets notified on the thread the respectiveonXXXmethods were invoked.- Error handling:
- When the
onError(Throwable)is called, theUnicastSubjectenters into a terminal state and emits the sameThrowableinstance to the current singleObserver. During this emission, if the singleObservers disposes its respectiveDisposable, theThrowableis delivered to the global error handler viaRxJavaPlugins.onError(Throwable). If there were noObservers subscribed to thisUnicastSubjectwhen theonError()was called, the global error handler is not invoked.
Example usage:
UnicastSubject<Integer> subject = UnicastSubject.create(); TestObserver<Integer> to1 = subject.test(); // fresh UnicastSubjects are empty to1.assertEmpty(); TestObserver<Integer> to2 = subject.test(); // A UnicastSubject only allows one Observer during its lifetime to2.assertFailure(IllegalStateException.class); subject.onNext(1); to1.assertValue(1); subject.onNext(2); to1.assertValues(1, 2); subject.onComplete(); to1.assertResult(1, 2); // ---------------------------------------------------- UnicastSubject<Integer> subject2 = UnicastSubject.create(); // a UnicastSubject caches events until its single Observer subscribes subject2.onNext(1); subject2.onNext(2); subject2.onComplete(); TestObserver<Integer> to3 = subject2.test(); // the cached events are emitted in order to3.assertResult(1, 2);- Since:
- 2.0
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description (package private) classUnicastSubject.UnicastQueueDisposable
-
Field Summary
Fields Modifier and Type Field Description (package private) booleandelayErrordeliver onNext events before error event.(package private) booleandisposedIndicates the single observer has cancelled.(package private) booleandoneIndicates the source has terminated.(package private) java.util.concurrent.atomic.AtomicReference<Observer<? super T>>downstreamThe single Observer.(package private) booleanenableOperatorFusion(package private) java.lang.ThrowableerrorThe terminal error if not null.(package private) java.util.concurrent.atomic.AtomicBooleanonceSet to 1 atomically for the first and only Subscriber.(package private) java.util.concurrent.atomic.AtomicReference<java.lang.Runnable>onTerminateThe optional callback when the Subject gets cancelled or terminates.(package private) SpscLinkedArrayQueue<T>queueThe queue that buffers the source events.(package private) BasicIntQueueDisposable<T>wipThe wip counter and QueueDisposable surface.
-
Constructor Summary
Constructors Constructor Description UnicastSubject(int capacityHint, java.lang.Runnable onTerminate, boolean delayError)Creates an UnicastSubject with the given capacity hint, delay error flag and callback for when the Subject is terminated normally or its single Subscriber cancels.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description static <T> @NonNull UnicastSubject<T>create()Creates an UnicastSubject with an internal buffer capacity hint 16.static <T> @NonNull UnicastSubject<T>create(boolean delayError)Creates an UnicastSubject with an internal buffer capacity hint 16 and given delay error flag.static <T> @NonNull UnicastSubject<T>create(int capacityHint)Creates an UnicastSubject with the given internal buffer capacity hint.static <T> @NonNull UnicastSubject<T>create(int capacityHint, @NonNull java.lang.Runnable onTerminate)Creates an UnicastSubject with the given internal buffer capacity hint and a callback for the case when the single Subscriber cancels its subscription or the subject is terminated.static <T> @NonNull UnicastSubject<T>create(int capacityHint, @NonNull java.lang.Runnable onTerminate, boolean delayError)Creates an UnicastSubject with the given internal buffer capacity hint, delay error flag and a callback for the case when the single Observer disposes itsDisposableor the subject is terminated.(package private) voiddoTerminate()(package private) voiddrain()(package private) voiddrainFused(Observer<? super T> a)(package private) voiddrainNormal(Observer<? super T> a)(package private) voiderrorOrComplete(Observer<? super T> a)(package private) booleanfailedFast(SimpleQueue<T> q, Observer<? super T> a)@Nullable java.lang.ThrowablegetThrowable()Returns the error that caused the Subject to terminate or null if the Subject hasn't terminated yet.booleanhasComplete()Returns true if the subject has reached a terminal state through a complete event.booleanhasObservers()Returns true if the subject has any Observers.booleanhasThrowable()Returns true if the subject has reached a terminal state through an error event.voidonComplete()Notifies theObserverthat theObservablehas finished sending push-based notifications.voidonError(java.lang.Throwable t)Notifies theObserverthat theObservablehas experienced an error condition.voidonNext(T t)Provides theObserverwith a new item to observe.voidonSubscribe(Disposable d)Provides theObserverwith the means of cancelling (disposing) the connection (channel) with theObservablein both synchronous (from withinObserver.onNext(Object)) and asynchronous manner.protected voidsubscribeActual(Observer<? super T> observer)Operator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingObservers.-
Methods inherited from class io.reactivex.rxjava3.subjects.Subject
toSerialized
-
Methods inherited from class io.reactivex.rxjava3.core.Observable
all, amb, ambArray, ambWith, any, blockingFirst, blockingFirst, blockingForEach, blockingForEach, blockingIterable, blockingIterable, blockingLast, blockingLast, blockingLatest, blockingMostRecent, blockingNext, blockingSingle, blockingSingle, blockingStream, blockingStream, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferSize, cache, cacheWithInitialCapacity, cast, collect, collect, collectInto, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatestArray, combineLatestArray, combineLatestArrayDelayError, combineLatestArrayDelayError, combineLatestDelayError, combineLatestDelayError, compose, concat, concat, concat, concat, concat, concat, concatArray, concatArrayDelayError, concatArrayEager, concatArrayEager, concatArrayEagerDelayError, concatArrayEagerDelayError, concatDelayError, concatDelayError, concatDelayError, concatEager, concatEager, concatEager, concatEager, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatMap, concatMap, concatMap, concatMapCompletable, concatMapCompletable, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapEager, concatMapEager, concatMapEagerDelayError, concatMapEagerDelayError, concatMapIterable, concatMapMaybe, concatMapMaybe, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapSingle, concatMapSingle, concatMapSingleDelayError, concatMapSingleDelayError, concatMapSingleDelayError, concatMapStream, concatWith, concatWith, concatWith, concatWith, contains, count, create, debounce, debounce, debounce, debounce, defaultIfEmpty, defer, delay, delay, delay, delay, delay, delay, delaySubscription, delaySubscription, delaySubscription, dematerialize, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterNext, doAfterTerminate, doFinally, doOnComplete, doOnDispose, doOnEach, doOnEach, doOnError, doOnLifecycle, doOnNext, doOnSubscribe, doOnTerminate, elementAt, elementAt, elementAtOrError, empty, error, error, filter, first, firstElement, firstOrError, firstOrErrorStage, firstStage, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMapCompletable, flatMapCompletable, flatMapIterable, flatMapIterable, flatMapMaybe, flatMapMaybe, flatMapSingle, flatMapSingle, flatMapStream, forEach, forEachWhile, forEachWhile, forEachWhile, fromAction, fromArray, fromCallable, fromCompletable, fromCompletionStage, fromFuture, fromFuture, fromIterable, fromMaybe, fromOptional, fromPublisher, fromRunnable, fromSingle, fromStream, fromSupplier, generate, generate, generate, generate, generate, groupBy, groupBy, groupBy, groupBy, groupBy, groupJoin, hide, ignoreElements, interval, interval, interval, interval, intervalRange, intervalRange, isEmpty, join, just, just, just, just, just, just, just, just, just, just, last, lastElement, lastOrError, lastOrErrorStage, lastStage, lift, map, mapOptional, materialize, merge, merge, merge, merge, merge, merge, merge, merge, mergeArray, mergeArray, mergeArrayDelayError, mergeArrayDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeWith, mergeWith, mergeWith, mergeWith, never, observeOn, observeOn, observeOn, ofType, onErrorComplete, onErrorComplete, onErrorResumeNext, onErrorResumeWith, onErrorReturn, onErrorReturnItem, onTerminateDetach, publish, publish, range, rangeLong, reduce, reduce, reduceWith, repeat, repeat, repeatUntil, repeatWhen, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, retry, retry, retry, retry, retry, retryUntil, retryWhen, safeSubscribe, sample, sample, sample, sample, sample, sample, sample, scan, scan, scanWith, sequenceEqual, sequenceEqual, sequenceEqual, sequenceEqual, serialize, share, single, singleElement, singleOrError, singleOrErrorStage, singleStage, skip, skip, skip, skipLast, skipLast, skipLast, skipLast, skipLast, skipLast, skipUntil, skipWhile, sorted, sorted, startWith, startWith, startWith, startWith, startWithArray, startWithItem, startWithIterable, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeWith, switchIfEmpty, switchMap, switchMap, switchMapCompletable, switchMapCompletableDelayError, switchMapDelayError, switchMapDelayError, switchMapMaybe, switchMapMaybeDelayError, switchMapSingle, switchMapSingleDelayError, switchOnNext, switchOnNext, switchOnNextDelayError, switchOnNextDelayError, take, take, take, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeUntil, takeUntil, takeWhile, test, test, throttleFirst, throttleFirst, throttleFirst, throttleLast, throttleLast, throttleLast, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleWithTimeout, throttleWithTimeout, throttleWithTimeout, timeInterval, timeInterval, timeInterval, timeInterval, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timer, timer, timestamp, timestamp, timestamp, timestamp, to, toFlowable, toFuture, toList, toList, toList, toMap, toMap, toMap, toMultimap, toMultimap, toMultimap, toMultimap, toSortedList, toSortedList, toSortedList, toSortedList, unsafeCreate, unsubscribeOn, using, using, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, wrap, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipArray, zipWith, zipWith, zipWith, zipWith
-
-
-
-
Field Detail
-
queue
final SpscLinkedArrayQueue<T> queue
The queue that buffers the source events.
-
downstream
final java.util.concurrent.atomic.AtomicReference<Observer<? super T>> downstream
The single Observer.
-
onTerminate
final java.util.concurrent.atomic.AtomicReference<java.lang.Runnable> onTerminate
The optional callback when the Subject gets cancelled or terminates.
-
delayError
final boolean delayError
deliver onNext events before error event.
-
disposed
volatile boolean disposed
Indicates the single observer has cancelled.
-
done
volatile boolean done
Indicates the source has terminated.
-
error
java.lang.Throwable error
The terminal error if not null. Must be set before writing to done and read after done == true.
-
once
final java.util.concurrent.atomic.AtomicBoolean once
Set to 1 atomically for the first and only Subscriber.
-
wip
final BasicIntQueueDisposable<T> wip
The wip counter and QueueDisposable surface.
-
enableOperatorFusion
boolean enableOperatorFusion
-
-
Constructor Detail
-
UnicastSubject
UnicastSubject(int capacityHint, java.lang.Runnable onTerminate, boolean delayError)Creates an UnicastSubject with the given capacity hint, delay error flag and callback for when the Subject is terminated normally or its single Subscriber cancels.History: 2.0.8 - experimental
- Parameters:
capacityHint- the capacity hint for the internal, unbounded queueonTerminate- the callback to run when the Subject is terminated or cancelled, null not alloweddelayError- deliver pending onNext events before onError- Since:
- 2.2
-
-
Method Detail
-
create
@CheckReturnValue @NonNull public static <T> @NonNull UnicastSubject<T> create()
Creates an UnicastSubject with an internal buffer capacity hint 16.- Type Parameters:
T- the value type- Returns:
- an UnicastSubject instance
-
create
@CheckReturnValue @NonNull public static <T> @NonNull UnicastSubject<T> create(int capacityHint)
Creates an UnicastSubject with the given internal buffer capacity hint.- Type Parameters:
T- the value type- Parameters:
capacityHint- the hint to size the internal unbounded buffer- Returns:
- an UnicastSubject instance
- Throws:
java.lang.IllegalArgumentException- ifcapacityHintis non-positive
-
create
@CheckReturnValue @NonNull public static <T> @NonNull UnicastSubject<T> create(int capacityHint, @NonNull @NonNull java.lang.Runnable onTerminate)
Creates an UnicastSubject with the given internal buffer capacity hint and a callback for the case when the single Subscriber cancels its subscription or the subject is terminated.The callback, if not null, is called exactly once and non-overlapped with any active replay.
- Type Parameters:
T- the value type- Parameters:
capacityHint- the hint to size the internal unbounded bufferonTerminate- the callback to run when the Subject is terminated or cancelled, null not allowed- Returns:
- an UnicastSubject instance
- Throws:
java.lang.NullPointerException- ifonTerminateisnulljava.lang.IllegalArgumentException- ifcapacityHintis non-positive
-
create
@CheckReturnValue @NonNull public static <T> @NonNull UnicastSubject<T> create(int capacityHint, @NonNull @NonNull java.lang.Runnable onTerminate, boolean delayError)
Creates an UnicastSubject with the given internal buffer capacity hint, delay error flag and a callback for the case when the single Observer disposes itsDisposableor the subject is terminated.The callback, if not null, is called exactly once and non-overlapped with any active replay.
History: 2.0.8 - experimental
- Type Parameters:
T- the value type- Parameters:
capacityHint- the hint to size the internal unbounded bufferonTerminate- the callback to run when the Subject is terminated or cancelled, null not alloweddelayError- deliver pending onNext events before onError- Returns:
- an UnicastSubject instance
- Throws:
java.lang.NullPointerException- ifonTerminateisnulljava.lang.IllegalArgumentException- ifcapacityHintis non-positive- Since:
- 2.2
-
create
@CheckReturnValue @NonNull public static <T> @NonNull UnicastSubject<T> create(boolean delayError)
Creates an UnicastSubject with an internal buffer capacity hint 16 and given delay error flag.The callback, if not null, is called exactly once and non-overlapped with any active replay.
History: 2.0.8 - experimental
- Type Parameters:
T- the value type- Parameters:
delayError- deliver pending onNext events before onError- Returns:
- an UnicastSubject instance
- Since:
- 2.2
-
subscribeActual
protected void subscribeActual(Observer<? super T> observer)
Description copied from class:ObservableOperator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingObservers.There is no need to call any of the plugin hooks on the current
Observableinstance or theObserver; all hooks and basic safeguards have been applied byObservable.subscribe(Observer)before this method gets called.- Specified by:
subscribeActualin classObservable<T>- Parameters:
observer- the incomingObserver, nevernull
-
doTerminate
void doTerminate()
-
onSubscribe
public void onSubscribe(Disposable d)
Description copied from interface:ObserverProvides theObserverwith the means of cancelling (disposing) the connection (channel) with theObservablein both synchronous (from withinObserver.onNext(Object)) and asynchronous manner.- Parameters:
d- theDisposableinstance whoseDisposable.dispose()can be called anytime to cancel the connection
-
onNext
public void onNext(T t)
Description copied from interface:ObserverProvides theObserverwith a new item to observe.The
Observablemay call this method 0 or more times.The
Observablewill not call this method again after it calls eitherObserver.onComplete()orObserver.onError(java.lang.Throwable).- Parameters:
t- the item emitted by the Observable
-
onError
public void onError(java.lang.Throwable t)
Description copied from interface:ObserverNotifies theObserverthat theObservablehas experienced an error condition.If the
Observablecalls this method, it will not thereafter callObserver.onNext(T)orObserver.onComplete().- Parameters:
t- the exception encountered by the Observable
-
onComplete
public void onComplete()
Description copied from interface:ObserverNotifies theObserverthat theObservablehas finished sending push-based notifications.The
Observablewill not call this method if it callsObserver.onError(java.lang.Throwable).
-
failedFast
boolean failedFast(SimpleQueue<T> q, Observer<? super T> a)
-
drain
void drain()
-
hasObservers
@CheckReturnValue public boolean hasObservers()
Description copied from class:SubjectReturns true if the subject has any Observers.The method is thread-safe.
- Specified by:
hasObserversin classSubject<T>- Returns:
- true if the subject has any Observers
-
getThrowable
@Nullable @CheckReturnValue public @Nullable java.lang.Throwable getThrowable()
Description copied from class:SubjectReturns the error that caused the Subject to terminate or null if the Subject hasn't terminated yet.The method is thread-safe.
- Specified by:
getThrowablein classSubject<T>- Returns:
- the error that caused the Subject to terminate or null if the Subject hasn't terminated yet
-
hasThrowable
@CheckReturnValue public boolean hasThrowable()
Description copied from class:SubjectReturns true if the subject has reached a terminal state through an error event.The method is thread-safe.
- Specified by:
hasThrowablein classSubject<T>- Returns:
- true if the subject has reached a terminal state through an error event
- See Also:
Subject.getThrowable(),Subject.hasComplete()
-
hasComplete
@CheckReturnValue public boolean hasComplete()
Description copied from class:SubjectReturns true if the subject has reached a terminal state through a complete event.The method is thread-safe.
- Specified by:
hasCompletein classSubject<T>- Returns:
- true if the subject has reached a terminal state through a complete event
- See Also:
Subject.hasThrowable()
-
-