Class BehaviorSubject<T>
- java.lang.Object
-
- io.reactivex.rxjava3.core.Observable<T>
-
- io.reactivex.rxjava3.subjects.Subject<T>
-
- io.reactivex.rxjava3.subjects.BehaviorSubject<T>
-
- Type Parameters:
T- the type of item expected to be observed by the Subject
- All Implemented Interfaces:
ObservableSource<T>,Observer<T>
public final class BehaviorSubject<T> extends Subject<T>
Subject that emits the most recent item it has observed and all subsequent observed items to each subscribedObserver.
This subject does not have a public constructor by design; a new empty instance of this
BehaviorSubjectcan be created via thecreate()method and a new non-empty instance can be created viacreateDefault(Object)(named as such to avoid overload resolution conflict withObservable.createthat creates an Observable, not aBehaviorSubject).Since a
Subjectis conceptionally derived from theProcessortype in the Reactive Streams specification,nulls are not allowed (Rule 2.13) as default initial values increateDefault(Object)or as parameters toonNext(Object)andonError(Throwable). Such calls will result in aNullPointerExceptionbeing thrown and the subject's state is not changed.Since a
BehaviorSubjectis anObservable, it does not support backpressure.When this
BehaviorSubjectis terminated viaonError(Throwable)oronComplete(), the last observed item (if any) is cleared and lateObservers only receive the respective terminal event.The
BehaviorSubjectdoes not support clearing its cached value (to appear empty again), however, the effect can be achieved by using a special item and making sureObservers subscribe through a filter whose predicate filters out this special item:BehaviorSubject<Integer> subject = BehaviorSubject.create(); final Integer EMPTY = Integer.MIN_VALUE; Observable<Integer> observable = subject.filter(v -> v != EMPTY); TestObserver<Integer> to1 = observable.test(); subject.onNext(1); // this will "clear" the cache subject.onNext(EMPTY); TestObserver<Integer> to2 = observable.test(); subject.onNext(2); subject.onComplete(); // to1 received both non-empty items to1.assertResult(1, 2); // to2 received only 2 even though the current item was EMPTY // when it got subscribed to2.assertResult(2); // Observers coming after the subject was terminated receive // no items and only the onComplete event in this case. observable.test().assertResult();Even though
BehaviorSubjectimplements theObserverinterface, callingonSubscribeis not required (Rule 2.12) if the subject is used as a standalone source. However, callingonSubscribeafter theBehaviorSubjectreached its terminal state will result in the givenDisposablebeing disposed immediately.Calling
onNext(Object),onError(Throwable)andonComplete()is required to be serialized (called from the same thread or called non-overlappingly from different threads through external means of serialization). TheSubject.toSerialized()method available to allSubjects provides such serialization and also protects against reentrance (i.e., when a downstreamObserverconsuming this subject also wants to callonNext(Object)on this subject recursively).This
BehaviorSubjectsupports the standard state-peeking methodshasComplete(),hasThrowable(),getThrowable()andhasObservers()as well as means to read the latest observed value in a non-blocking and thread-safe manner viahasValue()orgetValue().- Scheduler:
BehaviorSubjectdoes not operate by default on a particularSchedulerand theObservers get notified on the thread the respectiveonXXXmethods were invoked.- Error handling:
- When the
onError(Throwable)is called, theBehaviorSubjectenters into a terminal state and emits the sameThrowableinstance to the last set ofObservers. During this emission, if one or moreObservers dispose their respectiveDisposables, theThrowableis delivered to the global error handler viaRxJavaPlugins.onError(Throwable)(multiple times if multipleObservers cancel at once). If there were noObservers subscribed to thisBehaviorSubjectwhen theonError()was called, the global error handler is not invoked.
Example usage:
// observer will receive all 4 events (including "default"). BehaviorSubject<Object> subject = BehaviorSubject.createDefault("default"); subject.subscribe(observer); subject.onNext("one"); subject.onNext("two"); subject.onNext("three"); // observer will receive the "one", "two" and "three" events, but not "zero" BehaviorSubject<Object> subject = BehaviorSubject.create(); subject.onNext("zero"); subject.onNext("one"); subject.subscribe(observer); subject.onNext("two"); subject.onNext("three"); // observer will receive only onComplete BehaviorSubject<Object> subject = BehaviorSubject.create(); subject.onNext("zero"); subject.onNext("one"); subject.onComplete(); subject.subscribe(observer); // observer will receive only onError BehaviorSubject<Object> subject = BehaviorSubject.create(); subject.onNext("zero"); subject.onNext("one"); subject.onError(new RuntimeException("error")); subject.subscribe(observer);
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description (package private) static classBehaviorSubject.BehaviorDisposable<T>
-
Field Summary
Fields Modifier and Type Field Description (package private) static BehaviorSubject.BehaviorDisposable[]EMPTY(package private) longindex(package private) java.util.concurrent.locks.ReadWriteLocklock(package private) java.util.concurrent.atomic.AtomicReference<BehaviorSubject.BehaviorDisposable<T>[]>observers(package private) java.util.concurrent.locks.LockreadLock(package private) java.util.concurrent.atomic.AtomicReference<java.lang.Throwable>terminalEvent(package private) static BehaviorSubject.BehaviorDisposable[]TERMINATED(package private) java.util.concurrent.atomic.AtomicReference<java.lang.Object>value(package private) java.util.concurrent.locks.LockwriteLock
-
Constructor Summary
Constructors Constructor Description BehaviorSubject(T defaultValue)Constructs an empty BehaviorSubject.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description (package private) booleanadd(BehaviorSubject.BehaviorDisposable<T> rs)static <T> @NonNull BehaviorSubject<T>create()Creates aBehaviorSubjectwithout a default item.static <@NonNull T>
@NonNull BehaviorSubject<T>createDefault(@NonNull T defaultValue)Creates aBehaviorSubjectthat emits the last item it observed and all subsequent items to eachObserverthat subscribes to it.@Nullable java.lang.ThrowablegetThrowable()Returns the error that caused the Subject to terminate or null if the Subject hasn't terminated yet.TgetValue()Returns a single value the Subject currently has or null if no such value exists.booleanhasComplete()Returns true if the subject has reached a terminal state through a complete event.booleanhasObservers()Returns true if the subject has any Observers.booleanhasThrowable()Returns true if the subject has reached a terminal state through an error event.booleanhasValue()Returns true if the subject has any value.voidonComplete()Notifies theObserverthat theObservablehas finished sending push-based notifications.voidonError(java.lang.Throwable t)Notifies theObserverthat theObservablehas experienced an error condition.voidonNext(T t)Provides theObserverwith a new item to observe.voidonSubscribe(Disposable d)Provides theObserverwith the means of cancelling (disposing) the connection (channel) with theObservablein both synchronous (from withinObserver.onNext(Object)) and asynchronous manner.(package private) voidremove(BehaviorSubject.BehaviorDisposable<T> rs)(package private) voidsetCurrent(java.lang.Object o)protected voidsubscribeActual(Observer<? super T> observer)Operator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingObservers.(package private) intsubscriberCount()(package private) BehaviorSubject.BehaviorDisposable<T>[]terminate(java.lang.Object terminalValue)-
Methods inherited from class io.reactivex.rxjava3.subjects.Subject
toSerialized
-
Methods inherited from class io.reactivex.rxjava3.core.Observable
all, amb, ambArray, ambWith, any, blockingFirst, blockingFirst, blockingForEach, blockingForEach, blockingIterable, blockingIterable, blockingLast, blockingLast, blockingLatest, blockingMostRecent, blockingNext, blockingSingle, blockingSingle, blockingStream, blockingStream, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferSize, cache, cacheWithInitialCapacity, cast, collect, collect, collectInto, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatestArray, combineLatestArray, combineLatestArrayDelayError, combineLatestArrayDelayError, combineLatestDelayError, combineLatestDelayError, compose, concat, concat, concat, concat, concat, concat, concatArray, concatArrayDelayError, concatArrayEager, concatArrayEager, concatArrayEagerDelayError, concatArrayEagerDelayError, concatDelayError, concatDelayError, concatDelayError, concatEager, concatEager, concatEager, concatEager, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatMap, concatMap, concatMap, concatMapCompletable, concatMapCompletable, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapEager, concatMapEager, concatMapEagerDelayError, concatMapEagerDelayError, concatMapIterable, concatMapMaybe, concatMapMaybe, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapSingle, concatMapSingle, concatMapSingleDelayError, concatMapSingleDelayError, concatMapSingleDelayError, concatMapStream, concatWith, concatWith, concatWith, concatWith, contains, count, create, debounce, debounce, debounce, debounce, defaultIfEmpty, defer, delay, delay, delay, delay, delay, delay, delaySubscription, delaySubscription, delaySubscription, dematerialize, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterNext, doAfterTerminate, doFinally, doOnComplete, doOnDispose, doOnEach, doOnEach, doOnError, doOnLifecycle, doOnNext, doOnSubscribe, doOnTerminate, elementAt, elementAt, elementAtOrError, empty, error, error, filter, first, firstElement, firstOrError, firstOrErrorStage, firstStage, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMapCompletable, flatMapCompletable, flatMapIterable, flatMapIterable, flatMapMaybe, flatMapMaybe, flatMapSingle, flatMapSingle, flatMapStream, forEach, forEachWhile, forEachWhile, forEachWhile, fromAction, fromArray, fromCallable, fromCompletable, fromCompletionStage, fromFuture, fromFuture, fromIterable, fromMaybe, fromOptional, fromPublisher, fromRunnable, fromSingle, fromStream, fromSupplier, generate, generate, generate, generate, generate, groupBy, groupBy, groupBy, groupBy, groupBy, groupJoin, hide, ignoreElements, interval, interval, interval, interval, intervalRange, intervalRange, isEmpty, join, just, just, just, just, just, just, just, just, just, just, last, lastElement, lastOrError, lastOrErrorStage, lastStage, lift, map, mapOptional, materialize, merge, merge, merge, merge, merge, merge, merge, merge, mergeArray, mergeArray, mergeArrayDelayError, mergeArrayDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeWith, mergeWith, mergeWith, mergeWith, never, observeOn, observeOn, observeOn, ofType, onErrorComplete, onErrorComplete, onErrorResumeNext, onErrorResumeWith, onErrorReturn, onErrorReturnItem, onTerminateDetach, publish, publish, range, rangeLong, reduce, reduce, reduceWith, repeat, repeat, repeatUntil, repeatWhen, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, retry, retry, retry, retry, retry, retryUntil, retryWhen, safeSubscribe, sample, sample, sample, sample, sample, sample, sample, scan, scan, scanWith, sequenceEqual, sequenceEqual, sequenceEqual, sequenceEqual, serialize, share, single, singleElement, singleOrError, singleOrErrorStage, singleStage, skip, skip, skip, skipLast, skipLast, skipLast, skipLast, skipLast, skipLast, skipUntil, skipWhile, sorted, sorted, startWith, startWith, startWith, startWith, startWithArray, startWithItem, startWithIterable, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeWith, switchIfEmpty, switchMap, switchMap, switchMapCompletable, switchMapCompletableDelayError, switchMapDelayError, switchMapDelayError, switchMapMaybe, switchMapMaybeDelayError, switchMapSingle, switchMapSingleDelayError, switchOnNext, switchOnNext, switchOnNextDelayError, switchOnNextDelayError, take, take, take, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeUntil, takeUntil, takeWhile, test, test, throttleFirst, throttleFirst, throttleFirst, throttleLast, throttleLast, throttleLast, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleWithTimeout, throttleWithTimeout, throttleWithTimeout, timeInterval, timeInterval, timeInterval, timeInterval, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timer, timer, timestamp, timestamp, timestamp, timestamp, to, toFlowable, toFuture, toList, toList, toList, toMap, toMap, toMap, toMultimap, toMultimap, toMultimap, toMultimap, toSortedList, toSortedList, toSortedList, toSortedList, unsafeCreate, unsubscribeOn, using, using, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, wrap, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipArray, zipWith, zipWith, zipWith, zipWith
-
-
-
-
Field Detail
-
value
final java.util.concurrent.atomic.AtomicReference<java.lang.Object> value
-
observers
final java.util.concurrent.atomic.AtomicReference<BehaviorSubject.BehaviorDisposable<T>[]> observers
-
EMPTY
static final BehaviorSubject.BehaviorDisposable[] EMPTY
-
TERMINATED
static final BehaviorSubject.BehaviorDisposable[] TERMINATED
-
lock
final java.util.concurrent.locks.ReadWriteLock lock
-
readLock
final java.util.concurrent.locks.Lock readLock
-
writeLock
final java.util.concurrent.locks.Lock writeLock
-
terminalEvent
final java.util.concurrent.atomic.AtomicReference<java.lang.Throwable> terminalEvent
-
index
long index
-
-
Constructor Detail
-
BehaviorSubject
BehaviorSubject(T defaultValue)
Constructs an empty BehaviorSubject.- Parameters:
defaultValue- the initial value, not null (verified)- Since:
- 2.0
-
-
Method Detail
-
create
@CheckReturnValue @NonNull public static <T> @NonNull BehaviorSubject<T> create()
Creates aBehaviorSubjectwithout a default item.- Type Parameters:
T- the type of item the Subject will emit- Returns:
- the constructed
BehaviorSubject
-
createDefault
@CheckReturnValue @NonNull public static <@NonNull T> @NonNull BehaviorSubject<T> createDefault(@NonNull T defaultValue)
Creates aBehaviorSubjectthat emits the last item it observed and all subsequent items to eachObserverthat subscribes to it.- Type Parameters:
T- the type of item the Subject will emit- Parameters:
defaultValue- the item that will be emitted first to anyObserveras long as theBehaviorSubjecthas not yet observed any items from its sourceObservable- Returns:
- the constructed
BehaviorSubject - Throws:
java.lang.NullPointerException- ifdefaultValueisnull
-
subscribeActual
protected void subscribeActual(Observer<? super T> observer)
Description copied from class:ObservableOperator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingObservers.There is no need to call any of the plugin hooks on the current
Observableinstance or theObserver; all hooks and basic safeguards have been applied byObservable.subscribe(Observer)before this method gets called.- Specified by:
subscribeActualin classObservable<T>- Parameters:
observer- the incomingObserver, nevernull
-
onSubscribe
public void onSubscribe(Disposable d)
Description copied from interface:ObserverProvides theObserverwith the means of cancelling (disposing) the connection (channel) with theObservablein both synchronous (from withinObserver.onNext(Object)) and asynchronous manner.- Parameters:
d- theDisposableinstance whoseDisposable.dispose()can be called anytime to cancel the connection
-
onNext
public void onNext(T t)
Description copied from interface:ObserverProvides theObserverwith a new item to observe.The
Observablemay call this method 0 or more times.The
Observablewill not call this method again after it calls eitherObserver.onComplete()orObserver.onError(java.lang.Throwable).- Parameters:
t- the item emitted by the Observable
-
onError
public void onError(java.lang.Throwable t)
Description copied from interface:ObserverNotifies theObserverthat theObservablehas experienced an error condition.If the
Observablecalls this method, it will not thereafter callObserver.onNext(T)orObserver.onComplete().- Parameters:
t- the exception encountered by the Observable
-
onComplete
public void onComplete()
Description copied from interface:ObserverNotifies theObserverthat theObservablehas finished sending push-based notifications.The
Observablewill not call this method if it callsObserver.onError(java.lang.Throwable).
-
hasObservers
@CheckReturnValue public boolean hasObservers()
Description copied from class:SubjectReturns true if the subject has any Observers.The method is thread-safe.
- Specified by:
hasObserversin classSubject<T>- Returns:
- true if the subject has any Observers
-
subscriberCount
@CheckReturnValue int subscriberCount()
-
getThrowable
@Nullable @CheckReturnValue public @Nullable java.lang.Throwable getThrowable()
Description copied from class:SubjectReturns the error that caused the Subject to terminate or null if the Subject hasn't terminated yet.The method is thread-safe.
- Specified by:
getThrowablein classSubject<T>- Returns:
- the error that caused the Subject to terminate or null if the Subject hasn't terminated yet
-
getValue
@Nullable @CheckReturnValue public T getValue()
Returns a single value the Subject currently has or null if no such value exists.The method is thread-safe.
- Returns:
- a single value the Subject currently has or null if no such value exists
-
hasComplete
@CheckReturnValue public boolean hasComplete()
Description copied from class:SubjectReturns true if the subject has reached a terminal state through a complete event.The method is thread-safe.
- Specified by:
hasCompletein classSubject<T>- Returns:
- true if the subject has reached a terminal state through a complete event
- See Also:
Subject.hasThrowable()
-
hasThrowable
@CheckReturnValue public boolean hasThrowable()
Description copied from class:SubjectReturns true if the subject has reached a terminal state through an error event.The method is thread-safe.
- Specified by:
hasThrowablein classSubject<T>- Returns:
- true if the subject has reached a terminal state through an error event
- See Also:
Subject.getThrowable(),Subject.hasComplete()
-
hasValue
@CheckReturnValue public boolean hasValue()
Returns true if the subject has any value.The method is thread-safe.
- Returns:
- true if the subject has any value
-
add
boolean add(BehaviorSubject.BehaviorDisposable<T> rs)
-
remove
void remove(BehaviorSubject.BehaviorDisposable<T> rs)
-
terminate
BehaviorSubject.BehaviorDisposable<T>[] terminate(java.lang.Object terminalValue)
-
setCurrent
void setCurrent(java.lang.Object o)
-
-