Class AsyncSubject<T>
- java.lang.Object
-
- io.reactivex.rxjava3.core.Observable<T>
-
- io.reactivex.rxjava3.subjects.Subject<T>
-
- io.reactivex.rxjava3.subjects.AsyncSubject<T>
-
- Type Parameters:
T- the value type
- All Implemented Interfaces:
ObservableSource<T>,Observer<T>
public final class AsyncSubject<T> extends Subject<T>
A Subject that emits the very last value followed by a completion event or the received error to Observers.
This subject does not have a public constructor by design; a new empty instance of this
AsyncSubjectcan be created via thecreate()method.Since a
Subjectis conceptionally derived from theProcessortype in the Reactive Streams specification,nulls are not allowed (Rule 2.13) as parameters toonNext(Object)andonError(Throwable). Such calls will result in aNullPointerExceptionbeing thrown and the subject's state is not changed.Since an
AsyncSubjectis anObservable, it does not support backpressure.When this
AsyncSubjectis terminated viaonError(Throwable), the last observed item (if any) is cleared and lateObservers only receive theonErrorevent.The
AsyncSubjectcaches the latest item internally and it emits this item only whenonCompleteis called. Therefore, it is not recommended to use thisSubjectwith infinite or never-completing sources.Even though
AsyncSubjectimplements theObserverinterface, callingonSubscribeis not required (Rule 2.12) if the subject is used as a standalone source. However, callingonSubscribeafter theAsyncSubjectreached its terminal state will result in the givenDisposablebeing disposed immediately.Calling
onNext(Object),onError(Throwable)andonComplete()is required to be serialized (called from the same thread or called non-overlappingly from different threads through external means of serialization). TheSubject.toSerialized()method available to allSubjects provides such serialization and also protects against reentrance (i.e., when a downstreamObserverconsuming this subject also wants to callonNext(Object)on this subject recursively). The implementation of onXXX methods are technically thread-safe but non-serialized calls to them may lead to undefined state in the currently subscribed Observers.This
AsyncSubjectsupports the standard state-peeking methodshasComplete(),hasThrowable(),getThrowable()andhasObservers()as well as means to read the very last observed value - after thisAsyncSubjecthas been completed - in a non-blocking and thread-safe manner viahasValue()orgetValue().- Scheduler:
AsyncSubjectdoes not operate by default on a particularSchedulerand theObservers get notified on the thread where the terminatingonErrororonCompletemethods were invoked.- Error handling:
- When the
onError(Throwable)is called, theAsyncSubjectenters into a terminal state and emits the sameThrowableinstance to the last set ofObservers. During this emission, if one or moreObservers dispose their respectiveDisposables, theThrowableis delivered to the global error handler viaRxJavaPlugins.onError(Throwable)(multiple times if multipleObservers cancel at once). If there were noObservers subscribed to thisAsyncSubjectwhen theonError()was called, the global error handler is not invoked.
Example usage:
AsyncSubject<Object> subject = AsyncSubject.create(); TestObserver<Object> to1 = subject.test(); to1.assertEmpty(); subject.onNext(1); // AsyncSubject only emits when onComplete was called. to1.assertEmpty(); subject.onNext(2); subject.onComplete(); // onComplete triggers the emission of the last cached item and the onComplete event. to1.assertResult(2); TestObserver<Object> to2 = subject.test(); // late Observers receive the last cached item too to2.assertResult(2);
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description (package private) static classAsyncSubject.AsyncDisposable<T>
-
Field Summary
Fields Modifier and Type Field Description (package private) static AsyncSubject.AsyncDisposable[]EMPTY(package private) java.lang.ThrowableerrorWrite before updating subscribers, read after reading subscribers as TERMINATED.(package private) java.util.concurrent.atomic.AtomicReference<AsyncSubject.AsyncDisposable<T>[]>subscribers(package private) static AsyncSubject.AsyncDisposable[]TERMINATED(package private) TvalueWrite before updating subscribers, read after reading subscribers as TERMINATED.
-
Constructor Summary
Constructors Constructor Description AsyncSubject()Constructs an AsyncSubject.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description (package private) booleanadd(AsyncSubject.AsyncDisposable<T> ps)Tries to add the given subscriber to the subscribers array atomically or returns false if the subject has terminated.static <T> @NonNull AsyncSubject<T>create()Creates a new AsyncProcessor.java.lang.ThrowablegetThrowable()Returns the error that caused the Subject to terminate or null if the Subject hasn't terminated yet.TgetValue()Returns a single value the Subject currently has or null if no such value exists.booleanhasComplete()Returns true if the subject has reached a terminal state through a complete event.booleanhasObservers()Returns true if the subject has any Observers.booleanhasThrowable()Returns true if the subject has reached a terminal state through an error event.booleanhasValue()Returns true if the subject has any value.voidonComplete()Notifies theObserverthat theObservablehas finished sending push-based notifications.voidonError(java.lang.Throwable t)Notifies theObserverthat theObservablehas experienced an error condition.voidonNext(T t)Provides theObserverwith a new item to observe.voidonSubscribe(Disposable d)Provides theObserverwith the means of cancelling (disposing) the connection (channel) with theObservablein both synchronous (from withinObserver.onNext(Object)) and asynchronous manner.(package private) voidremove(AsyncSubject.AsyncDisposable<T> ps)Atomically removes the given subscriber if it is subscribed to the subject.protected voidsubscribeActual(Observer<? super T> observer)Operator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingObservers.-
Methods inherited from class io.reactivex.rxjava3.subjects.Subject
toSerialized
-
Methods inherited from class io.reactivex.rxjava3.core.Observable
all, amb, ambArray, ambWith, any, blockingFirst, blockingFirst, blockingForEach, blockingForEach, blockingIterable, blockingIterable, blockingLast, blockingLast, blockingLatest, blockingMostRecent, blockingNext, blockingSingle, blockingSingle, blockingStream, blockingStream, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferSize, cache, cacheWithInitialCapacity, cast, collect, collect, collectInto, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatestArray, combineLatestArray, combineLatestArrayDelayError, combineLatestArrayDelayError, combineLatestDelayError, combineLatestDelayError, compose, concat, concat, concat, concat, concat, concat, concatArray, concatArrayDelayError, concatArrayEager, concatArrayEager, concatArrayEagerDelayError, concatArrayEagerDelayError, concatDelayError, concatDelayError, concatDelayError, concatEager, concatEager, concatEager, concatEager, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatMap, concatMap, concatMap, concatMapCompletable, concatMapCompletable, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapEager, concatMapEager, concatMapEagerDelayError, concatMapEagerDelayError, concatMapIterable, concatMapMaybe, concatMapMaybe, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapSingle, concatMapSingle, concatMapSingleDelayError, concatMapSingleDelayError, concatMapSingleDelayError, concatMapStream, concatWith, concatWith, concatWith, concatWith, contains, count, create, debounce, debounce, debounce, debounce, defaultIfEmpty, defer, delay, delay, delay, delay, delay, delay, delaySubscription, delaySubscription, delaySubscription, dematerialize, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterNext, doAfterTerminate, doFinally, doOnComplete, doOnDispose, doOnEach, doOnEach, doOnError, doOnLifecycle, doOnNext, doOnSubscribe, doOnTerminate, elementAt, elementAt, elementAtOrError, empty, error, error, filter, first, firstElement, firstOrError, firstOrErrorStage, firstStage, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMapCompletable, flatMapCompletable, flatMapIterable, flatMapIterable, flatMapMaybe, flatMapMaybe, flatMapSingle, flatMapSingle, flatMapStream, forEach, forEachWhile, forEachWhile, forEachWhile, fromAction, fromArray, fromCallable, fromCompletable, fromCompletionStage, fromFuture, fromFuture, fromIterable, fromMaybe, fromOptional, fromPublisher, fromRunnable, fromSingle, fromStream, fromSupplier, generate, generate, generate, generate, generate, groupBy, groupBy, groupBy, groupBy, groupBy, groupJoin, hide, ignoreElements, interval, interval, interval, interval, intervalRange, intervalRange, isEmpty, join, just, just, just, just, just, just, just, just, just, just, last, lastElement, lastOrError, lastOrErrorStage, lastStage, lift, map, mapOptional, materialize, merge, merge, merge, merge, merge, merge, merge, merge, mergeArray, mergeArray, mergeArrayDelayError, mergeArrayDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeWith, mergeWith, mergeWith, mergeWith, never, observeOn, observeOn, observeOn, ofType, onErrorComplete, onErrorComplete, onErrorResumeNext, onErrorResumeWith, onErrorReturn, onErrorReturnItem, onTerminateDetach, publish, publish, range, rangeLong, reduce, reduce, reduceWith, repeat, repeat, repeatUntil, repeatWhen, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, retry, retry, retry, retry, retry, retryUntil, retryWhen, safeSubscribe, sample, sample, sample, sample, sample, sample, sample, scan, scan, scanWith, sequenceEqual, sequenceEqual, sequenceEqual, sequenceEqual, serialize, share, single, singleElement, singleOrError, singleOrErrorStage, singleStage, skip, skip, skip, skipLast, skipLast, skipLast, skipLast, skipLast, skipLast, skipUntil, skipWhile, sorted, sorted, startWith, startWith, startWith, startWith, startWithArray, startWithItem, startWithIterable, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeWith, switchIfEmpty, switchMap, switchMap, switchMapCompletable, switchMapCompletableDelayError, switchMapDelayError, switchMapDelayError, switchMapMaybe, switchMapMaybeDelayError, switchMapSingle, switchMapSingleDelayError, switchOnNext, switchOnNext, switchOnNextDelayError, switchOnNextDelayError, take, take, take, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeUntil, takeUntil, takeWhile, test, test, throttleFirst, throttleFirst, throttleFirst, throttleLast, throttleLast, throttleLast, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleWithTimeout, throttleWithTimeout, throttleWithTimeout, timeInterval, timeInterval, timeInterval, timeInterval, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timer, timer, timestamp, timestamp, timestamp, timestamp, to, toFlowable, toFuture, toList, toList, toList, toMap, toMap, toMap, toMultimap, toMultimap, toMultimap, toMultimap, toSortedList, toSortedList, toSortedList, toSortedList, unsafeCreate, unsubscribeOn, using, using, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, wrap, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipArray, zipWith, zipWith, zipWith, zipWith
-
-
-
-
Field Detail
-
EMPTY
static final AsyncSubject.AsyncDisposable[] EMPTY
-
TERMINATED
static final AsyncSubject.AsyncDisposable[] TERMINATED
-
subscribers
final java.util.concurrent.atomic.AtomicReference<AsyncSubject.AsyncDisposable<T>[]> subscribers
-
error
java.lang.Throwable error
Write before updating subscribers, read after reading subscribers as TERMINATED.
-
value
T value
Write before updating subscribers, read after reading subscribers as TERMINATED.
-
-
Method Detail
-
create
@CheckReturnValue @NonNull public static <T> @NonNull AsyncSubject<T> create()
Creates a new AsyncProcessor.- Type Parameters:
T- the value type to be received and emitted- Returns:
- the new AsyncProcessor instance
-
onSubscribe
public void onSubscribe(Disposable d)
Description copied from interface:ObserverProvides theObserverwith the means of cancelling (disposing) the connection (channel) with theObservablein both synchronous (from withinObserver.onNext(Object)) and asynchronous manner.- Parameters:
d- theDisposableinstance whoseDisposable.dispose()can be called anytime to cancel the connection
-
onNext
public void onNext(T t)
Description copied from interface:ObserverProvides theObserverwith a new item to observe.The
Observablemay call this method 0 or more times.The
Observablewill not call this method again after it calls eitherObserver.onComplete()orObserver.onError(java.lang.Throwable).- Parameters:
t- the item emitted by the Observable
-
onError
public void onError(java.lang.Throwable t)
Description copied from interface:ObserverNotifies theObserverthat theObservablehas experienced an error condition.If the
Observablecalls this method, it will not thereafter callObserver.onNext(T)orObserver.onComplete().- Parameters:
t- the exception encountered by the Observable
-
onComplete
public void onComplete()
Description copied from interface:ObserverNotifies theObserverthat theObservablehas finished sending push-based notifications.The
Observablewill not call this method if it callsObserver.onError(java.lang.Throwable).
-
hasObservers
@CheckReturnValue public boolean hasObservers()
Description copied from class:SubjectReturns true if the subject has any Observers.The method is thread-safe.
- Specified by:
hasObserversin classSubject<T>- Returns:
- true if the subject has any Observers
-
hasThrowable
@CheckReturnValue public boolean hasThrowable()
Description copied from class:SubjectReturns true if the subject has reached a terminal state through an error event.The method is thread-safe.
- Specified by:
hasThrowablein classSubject<T>- Returns:
- true if the subject has reached a terminal state through an error event
- See Also:
Subject.getThrowable(),Subject.hasComplete()
-
hasComplete
@CheckReturnValue public boolean hasComplete()
Description copied from class:SubjectReturns true if the subject has reached a terminal state through a complete event.The method is thread-safe.
- Specified by:
hasCompletein classSubject<T>- Returns:
- true if the subject has reached a terminal state through a complete event
- See Also:
Subject.hasThrowable()
-
getThrowable
@CheckReturnValue public java.lang.Throwable getThrowable()
Description copied from class:SubjectReturns the error that caused the Subject to terminate or null if the Subject hasn't terminated yet.The method is thread-safe.
- Specified by:
getThrowablein classSubject<T>- Returns:
- the error that caused the Subject to terminate or null if the Subject hasn't terminated yet
-
subscribeActual
protected void subscribeActual(Observer<? super T> observer)
Description copied from class:ObservableOperator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingObservers.There is no need to call any of the plugin hooks on the current
Observableinstance or theObserver; all hooks and basic safeguards have been applied byObservable.subscribe(Observer)before this method gets called.- Specified by:
subscribeActualin classObservable<T>- Parameters:
observer- the incomingObserver, nevernull
-
add
boolean add(AsyncSubject.AsyncDisposable<T> ps)
Tries to add the given subscriber to the subscribers array atomically or returns false if the subject has terminated.- Parameters:
ps- the subscriber to add- Returns:
- true if successful, false if the subject has terminated
-
remove
void remove(AsyncSubject.AsyncDisposable<T> ps)
Atomically removes the given subscriber if it is subscribed to the subject.- Parameters:
ps- the subject to remove
-
hasValue
@CheckReturnValue public boolean hasValue()
Returns true if the subject has any value.The method is thread-safe.
- Returns:
- true if the subject has any value
-
getValue
@Nullable @CheckReturnValue public T getValue()
Returns a single value the Subject currently has or null if no such value exists.The method is thread-safe.
- Returns:
- a single value the Subject currently has or null if no such value exists
-
-