Class ObservableCache<T>
java.lang.Object
io.reactivex.rxjava3.core.Observable<T>
io.reactivex.rxjava3.internal.operators.observable.AbstractObservableWithUpstream<T,T>
io.reactivex.rxjava3.internal.operators.observable.ObservableCache<T>
- Type Parameters:
T- the source element type
- All Implemented Interfaces:
ObservableSource<T>, Observer<T>, HasUpstreamObservableSource<T>
public final class ObservableCache<T>
extends AbstractObservableWithUpstream<T,T>
implements Observer<T>
An observable which auto-connects to another observable, caches the elements
from that observable but allows terminating the connection and completing the cache.
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescription(package private) static final classHosts the downstream consumer and its current requested and replay states.(package private) static final classRepresents a segment of the cached item list as part of a linked-node-list structure. -
Field Summary
FieldsModifier and TypeFieldDescription(package private) final intThe number of items per cached nodes.(package private) booleanTrue if the source has terminated.(package private) static final ObservableCache.CacheDisposable[]A shared instance of an empty array of observers to avoid creating a new empty array when all observers dispose.(package private) ThrowableIfobserversisTERMINATED, this holds the terminal error if not null.(package private) final ObservableCache.Node<T> The starting point of the cached items.(package private) final AtomicReference<ObservableCache.CacheDisposable<T>[]> The current known array of observer state to notify.(package private) final AtomicBooleanThe subscription to the source should happen at most once.(package private) longThe total number of elements in the list available for reads.(package private) ObservableCache.Node<T> The current tail of the linked structure holding the items.(package private) intHow many items have been put into the tail node so far.(package private) static final ObservableCache.CacheDisposable[]A shared instance indicating the source has no more events and there is no need to remember observers anymore.Fields inherited from class AbstractObservableWithUpstream
source -
Constructor Summary
ConstructorsConstructorDescriptionObservableCache(Observable<T> source, int capacityHint) Constructs an empty, non-connected cache. -
Method Summary
Modifier and TypeMethodDescription(package private) voidadd(ObservableCache.CacheDisposable<T> consumer) Atomically adds the consumer to theobserverscopy-on-write array if the source has not yet terminated.(package private) longReturns the number of events currently cached.(package private) booleanReturns true if there are observers subscribed to this observable.(package private) booleanCheck if this cached observable is connected to its source.voidNotifies theObserverthat theObservablehas finished sending push-based notifications.voidNotifies theObserverthat theObservablehas experienced an error condition.voidProvides theObserverwith a new item to observe.voidProvides theObserverwith the means of cancelling (disposing) the connection (channel) with theObservablein both synchronous (from withinObserver.onNext(Object)) and asynchronous manner.(package private) voidremove(ObservableCache.CacheDisposable<T> consumer) Atomically removes the consumer from theobserverscopy-on-write array.(package private) voidreplay(ObservableCache.CacheDisposable<T> consumer) Replays the contents of this cache to the given consumer based on its current state and number of items requested by it.protected voidsubscribeActual(Observer<? super T> t) Operator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingObservers.Methods inherited from class AbstractObservableWithUpstream
sourceMethods inherited from class Observable
all, amb, ambArray, ambWith, any, blockingFirst, blockingFirst, blockingForEach, blockingForEach, blockingIterable, blockingIterable, blockingLast, blockingLast, blockingLatest, blockingMostRecent, blockingNext, blockingSingle, blockingSingle, blockingStream, blockingStream, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferSize, cache, cacheWithInitialCapacity, cast, collect, collect, collectInto, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatestArray, combineLatestArray, combineLatestArrayDelayError, combineLatestArrayDelayError, combineLatestDelayError, combineLatestDelayError, compose, concat, concat, concat, concat, concat, concat, concatArray, concatArrayDelayError, concatArrayEager, concatArrayEager, concatArrayEagerDelayError, concatArrayEagerDelayError, concatDelayError, concatDelayError, concatDelayError, concatEager, concatEager, concatEager, concatEager, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatMap, concatMap, concatMap, concatMapCompletable, concatMapCompletable, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapEager, concatMapEager, concatMapEagerDelayError, concatMapEagerDelayError, concatMapIterable, concatMapMaybe, concatMapMaybe, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapSingle, concatMapSingle, concatMapSingleDelayError, concatMapSingleDelayError, concatMapSingleDelayError, concatMapStream, concatWith, concatWith, concatWith, concatWith, contains, count, create, debounce, debounce, debounce, debounce, defaultIfEmpty, defer, delay, delay, delay, delay, delay, delay, delaySubscription, delaySubscription, delaySubscription, dematerialize, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterNext, doAfterTerminate, doFinally, doOnComplete, doOnDispose, doOnEach, doOnEach, doOnError, doOnLifecycle, doOnNext, doOnSubscribe, doOnTerminate, elementAt, elementAt, elementAtOrError, empty, error, error, filter, first, firstElement, firstOrError, firstOrErrorStage, firstStage, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMapCompletable, flatMapCompletable, flatMapIterable, flatMapIterable, flatMapMaybe, flatMapMaybe, flatMapSingle, flatMapSingle, flatMapStream, forEach, forEachWhile, forEachWhile, forEachWhile, fromAction, fromArray, fromCallable, fromCompletable, fromCompletionStage, fromFuture, fromFuture, fromIterable, fromMaybe, fromOptional, fromPublisher, fromRunnable, fromSingle, fromStream, fromSupplier, generate, generate, generate, generate, generate, groupBy, groupBy, groupBy, groupBy, groupBy, groupJoin, hide, ignoreElements, interval, interval, interval, interval, intervalRange, intervalRange, isEmpty, join, just, just, just, just, just, just, just, just, just, just, last, lastElement, lastOrError, lastOrErrorStage, lastStage, lift, map, mapOptional, materialize, merge, merge, merge, merge, merge, merge, merge, merge, mergeArray, mergeArray, mergeArrayDelayError, mergeArrayDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeWith, mergeWith, mergeWith, mergeWith, never, observeOn, observeOn, observeOn, ofType, onErrorComplete, onErrorComplete, onErrorResumeNext, onErrorResumeWith, onErrorReturn, onErrorReturnItem, onTerminateDetach, publish, publish, range, rangeLong, reduce, reduce, reduceWith, repeat, repeat, repeatUntil, repeatWhen, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, retry, retry, retry, retry, retry, retryUntil, retryWhen, safeSubscribe, sample, sample, sample, sample, sample, sample, sample, scan, scan, scanWith, sequenceEqual, sequenceEqual, sequenceEqual, sequenceEqual, serialize, share, single, singleElement, singleOrError, singleOrErrorStage, singleStage, skip, skip, skip, skipLast, skipLast, skipLast, skipLast, skipLast, skipLast, skipUntil, skipWhile, sorted, sorted, startWith, startWith, startWith, startWith, startWithArray, startWithItem, startWithIterable, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeWith, switchIfEmpty, switchMap, switchMap, switchMapCompletable, switchMapCompletableDelayError, switchMapDelayError, switchMapDelayError, switchMapMaybe, switchMapMaybeDelayError, switchMapSingle, switchMapSingleDelayError, switchOnNext, switchOnNext, switchOnNextDelayError, switchOnNextDelayError, take, take, take, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeUntil, takeUntil, takeWhile, test, test, throttleFirst, throttleFirst, throttleFirst, throttleLast, throttleLast, throttleLast, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleWithTimeout, throttleWithTimeout, throttleWithTimeout, timeInterval, timeInterval, timeInterval, timeInterval, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timer, timer, timestamp, timestamp, timestamp, timestamp, to, toFlowable, toFuture, toList, toList, toList, toMap, toMap, toMap, toMultimap, toMultimap, toMultimap, toMultimap, toSortedList, toSortedList, toSortedList, toSortedList, unsafeCreate, unsubscribeOn, using, using, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, wrap, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipArray, zipWith, zipWith, zipWith, zipWith
-
Field Details
-
once
The subscription to the source should happen at most once. -
capacityHint
final int capacityHintThe number of items per cached nodes. -
observers
The current known array of observer state to notify. -
EMPTY
A shared instance of an empty array of observers to avoid creating a new empty array when all observers dispose. -
TERMINATED
A shared instance indicating the source has no more events and there is no need to remember observers anymore. -
size
volatile long sizeThe total number of elements in the list available for reads. -
head
The starting point of the cached items. -
tail
ObservableCache.Node<T> tailThe current tail of the linked structure holding the items. -
tailOffset
int tailOffsetHow many items have been put into the tail node so far. -
error
Throwable errorIfobserversisTERMINATED, this holds the terminal error if not null. -
done
volatile boolean doneTrue if the source has terminated.
-
-
Constructor Details
-
ObservableCache
Constructs an empty, non-connected cache.- Parameters:
source- the source to subscribe to for the first incoming observercapacityHint- the number of items expected (reduce allocation frequency)
-
-
Method Details
-
subscribeActual
Description copied from class:ObservableOperator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingObservers.There is no need to call any of the plugin hooks on the current
Observableinstance or theObserver; all hooks and basic safeguards have been applied byObservable.subscribe(Observer)before this method gets called.- Specified by:
subscribeActualin classObservable<T>- Parameters:
t- the incomingObserver, nevernull
-
isConnected
boolean isConnected()Check if this cached observable is connected to its source.- Returns:
- true if already connected
-
hasObservers
boolean hasObservers()Returns true if there are observers subscribed to this observable.- Returns:
- true if the cache has observers
-
cachedEventCount
long cachedEventCount()Returns the number of events currently cached.- Returns:
- the number of currently cached event count
-
add
Atomically adds the consumer to theobserverscopy-on-write array if the source has not yet terminated.- Parameters:
consumer- the consumer to add
-
remove
Atomically removes the consumer from theobserverscopy-on-write array.- Parameters:
consumer- the consumer to remove
-
replay
Replays the contents of this cache to the given consumer based on its current state and number of items requested by it.- Parameters:
consumer- the consumer to continue replaying items to
-
onSubscribe
Description copied from interface:ObserverProvides theObserverwith the means of cancelling (disposing) the connection (channel) with theObservablein both synchronous (from withinObserver.onNext(Object)) and asynchronous manner.- Specified by:
onSubscribein interfaceObserver<T>- Parameters:
d- theDisposableinstance whoseDisposable.dispose()can be called anytime to cancel the connection
-
onNext
Description copied from interface:ObserverProvides theObserverwith a new item to observe.The
Observablemay call this method 0 or more times.The
Observablewill not call this method again after it calls eitherObserver.onComplete()orObserver.onError(Throwable). -
onError
Description copied from interface:ObserverNotifies theObserverthat theObservablehas experienced an error condition.If the
Observablecalls this method, it will not thereafter callObserver.onNext(T)orObserver.onComplete(). -
onComplete
public void onComplete()Description copied from interface:ObserverNotifies theObserverthat theObservablehas finished sending push-based notifications.The
Observablewill not call this method if it callsObserver.onError(Throwable).- Specified by:
onCompletein interfaceObserver<T>
-