Class ReplayProcessor<T>
- java.lang.Object
-
- io.reactivex.rxjava3.core.Flowable<T>
-
- io.reactivex.rxjava3.processors.FlowableProcessor<T>
-
- io.reactivex.rxjava3.processors.ReplayProcessor<T>
-
- Type Parameters:
T- the value type
- All Implemented Interfaces:
FlowableSubscriber<T>,org.reactivestreams.Processor<T,T>,org.reactivestreams.Publisher<T>,org.reactivestreams.Subscriber<T>
public final class ReplayProcessor<@NonNull T> extends FlowableProcessor<T>
Replays events to Subscribers.The
ReplayProcessorsupports the following item retention strategies:create()andcreate(int): retains and replays all events to current and futureSubscribers.
createWithSize(int): retains at most the given number of items and replays only these latest items to newSubscribers.
createWithTime(long, TimeUnit, Scheduler): retains items no older than the specified time and replays them to newSubscribers (which could mean all items age out).
createWithTimeAndSize(long, TimeUnit, Scheduler, int): retains no more than the given number of items which are also no older than the specified time and replays them to newSubscribers (which could mean all items age out).
The
ReplayProcessorcan be created in bounded and unbounded mode. It can be bounded by size (maximum number of elements retained at most) and/or time (maximum age of elements replayed).Since a
ReplayProcessoris a Reactive StreamsProcessor,nulls are not allowed (Rule 2.13) as parameters toonNext(Object)andonError(Throwable). Such calls will result in aNullPointerExceptionbeing thrown and the processor's state is not changed.This
ReplayProcessorrespects the individual backpressure behavior of itsSubscribers but does not coordinate their request amounts towards the upstream (because there might not be any) and consumes the upstream in an unbounded manner (requestingLong.MAX_VALUE). Note thatSubscribers receive a continuous sequence of values after they subscribed even if an individual item gets delayed due to backpressure. Due to concurrency requirements, a size-boundedReplayProcessormay hold strong references to more source emissions than specified.When this
ReplayProcessoris terminated viaonError(Throwable)oronComplete(), lateSubscribers will receive the retained/cached items first (if any) followed by the respective terminal event. If theReplayProcessorhas a time-bound, the age of the retained/cached items are still considered when replaying and thus it may result in no items being emitted before the terminal event.Once an
Subscriberhas subscribed, it will receive items continuously from that point on. Bounds only affect how many past items a newSubscriberwill receive before it catches up with the live event feed.Even though
ReplayProcessorimplements theSubscriberinterface, callingonSubscribeis not required (Rule 2.12) if the processor is used as a standalone source. However, callingonSubscribeafter theReplayProcessorreached its terminal state will result in the givenSubscriptionbeing canceled immediately.Calling
onNext(Object),onError(Throwable)andonComplete()is required to be serialized (called from the same thread or called non-overlappingly from different threads through external means of serialization). TheFlowableProcessor.toSerialized()method available to allFlowableProcessors provides such serialization and also protects against reentrance (i.e., when a downstreamSubscriberconsuming this processor also wants to callonNext(Object)on this processor recursively).This
ReplayProcessorsupports the standard state-peeking methodshasComplete(),hasThrowable(),getThrowable()andhasSubscribers()as well as means to read the retained/cached items in a non-blocking and thread-safe manner viahasValue(),getValue(),getValues()orgetValues(Object[]).Note that due to concurrency requirements, a size- and time-bounded
ReplayProcessormay hold strong references to more source emissions than specified while it isn't terminated yet. Use thecleanupBuffer()to allow such inaccessible items to be cleaned up by GC once no consumer references them anymore.- Backpressure:
- This
ReplayProcessorrespects the individual backpressure behavior of itsSubscribers but does not coordinate their request amounts towards the upstream (because there might not be any) and consumes the upstream in an unbounded manner (requestingLong.MAX_VALUE). Note thatSubscribers receive a continuous sequence of values after they subscribed even if an individual item gets delayed due to backpressure. - Scheduler:
ReplayProcessordoes not operate by default on a particularSchedulerand theSubscribers get notified on the thread the respectiveonXXXmethods were invoked. Time-boundReplayProcessors use the givenSchedulerin theircreatemethods as time source to timestamp of items received for the age checks.- Error handling:
- When the
onError(Throwable)is called, theReplayProcessorenters into a terminal state and emits the sameThrowableinstance to the last set ofSubscribers. During this emission, if one or moreSubscribers cancel their respectiveSubscriptions, theThrowableis delivered to the global error handler viaRxJavaPlugins.onError(Throwable)(multiple times if multipleSubscribers cancel at once). If there were noSubscribers subscribed to thisReplayProcessorwhen theonError()was called, the global error handler is not invoked.
Example usage:
ReplayProcessor<Object> processor = new ReplayProcessor<T>(); processor.onNext("one"); processor.onNext("two"); processor.onNext("three"); processor.onComplete(); // both of the following will get the onNext/onComplete calls from above processor.subscribe(subscriber1); processor.subscribe(subscriber2);
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description (package private) static classReplayProcessor.Node<T>(package private) static interfaceReplayProcessor.ReplayBuffer<T>Abstraction over a buffer that receives events and replays them to individual Subscribers.(package private) static classReplayProcessor.ReplaySubscription<T>(package private) static classReplayProcessor.SizeAndTimeBoundReplayBuffer<T>(package private) static classReplayProcessor.SizeBoundReplayBuffer<T>(package private) static classReplayProcessor.TimedNode<T>(package private) static classReplayProcessor.UnboundedReplayBuffer<T>
-
Field Summary
Fields Modifier and Type Field Description (package private) ReplayProcessor.ReplayBuffer<T>buffer(package private) booleandone(package private) static ReplayProcessor.ReplaySubscription[]EMPTYprivate static java.lang.Object[]EMPTY_ARRAYAn empty array to avoid allocation in getValues().(package private) java.util.concurrent.atomic.AtomicReference<ReplayProcessor.ReplaySubscription<T>[]>subscribers(package private) static ReplayProcessor.ReplaySubscription[]TERMINATED
-
Constructor Summary
Constructors Constructor Description ReplayProcessor(ReplayProcessor.ReplayBuffer<@NonNull T> buffer)Constructs a ReplayProcessor with the given custom ReplayBuffer instance.
-
Method Summary
All Methods Static Methods Instance Methods Concrete Methods Modifier and Type Method Description (package private) booleanadd(ReplayProcessor.ReplaySubscription<@NonNull T> rs)voidcleanupBuffer()Makes sure the item cached by the head node in a bounded ReplayProcessor is released (as it is never part of a replay).static <T> @NonNull ReplayProcessor<T>create()Creates an unbounded ReplayProcessor.static <T> @NonNull ReplayProcessor<T>create(int capacityHint)Creates an unbounded ReplayProcessor with the specified initial buffer capacity.(package private) static <T> ReplayProcessor<T>createUnbounded()Creates an unbounded ReplayProcessor with the bounded-implementation for testing purposes.static <T> @NonNull ReplayProcessor<T>createWithSize(int maxSize)Creates a size-bounded ReplayProcessor.static <T> @NonNull ReplayProcessor<T>createWithTime(long maxAge, @NonNull java.util.concurrent.TimeUnit unit, @NonNull Scheduler scheduler)Creates a time-bounded ReplayProcessor.static <T> @NonNull ReplayProcessor<T>createWithTimeAndSize(long maxAge, @NonNull java.util.concurrent.TimeUnit unit, @NonNull Scheduler scheduler, int maxSize)Creates a time- and size-bounded ReplayProcessor.@Nullable java.lang.ThrowablegetThrowable()Returns the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor hasn't terminated yet.TgetValue()Returns the latest value this processor has or null if no such value exists.java.lang.Object[]getValues()Returns an Object array containing snapshot all values of this processor.T[]getValues(@NonNull T[] array)Returns a typed array containing a snapshot of all values of this processor.booleanhasComplete()Returns true if the FlowableProcessor has reached a terminal state through a complete event.booleanhasSubscribers()Returns true if the FlowableProcessor has subscribers.booleanhasThrowable()Returns true if the FlowableProcessor has reached a terminal state through an error event.booleanhasValue()Returns true if this processor has any value.voidonComplete()voidonError(java.lang.Throwable t)voidonNext(@NonNull T t)voidonSubscribe(org.reactivestreams.Subscription s)Implementors of this method should make sure everything that needs to be visible inSubscriber.onNext(Object)is established before callingSubscription.request(long).(package private) voidremove(ReplayProcessor.ReplaySubscription<@NonNull T> rs)(package private) intsize()protected voidsubscribeActual(org.reactivestreams.Subscriber<? super @NonNull T> s)Operator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingSubscribers.(package private) intsubscriberCount()-
Methods inherited from class io.reactivex.rxjava3.processors.FlowableProcessor
toSerialized
-
Methods inherited from class io.reactivex.rxjava3.core.Flowable
all, amb, ambArray, ambWith, any, blockingFirst, blockingFirst, blockingForEach, blockingForEach, blockingIterable, blockingIterable, blockingLast, blockingLast, blockingLatest, blockingMostRecent, blockingNext, blockingSingle, blockingSingle, blockingStream, blockingStream, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, blockingSubscribe, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferSize, cache, cacheWithInitialCapacity, cast, collect, collect, collectInto, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatestArray, combineLatestArray, combineLatestArrayDelayError, combineLatestArrayDelayError, combineLatestDelayError, combineLatestDelayError, compose, concat, concat, concat, concat, concat, concat, concatArray, concatArrayDelayError, concatArrayEager, concatArrayEager, concatArrayEagerDelayError, concatArrayEagerDelayError, concatDelayError, concatDelayError, concatDelayError, concatEager, concatEager, concatEager, concatEager, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatEagerDelayError, concatMap, concatMap, concatMap, concatMapCompletable, concatMapCompletable, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapCompletableDelayError, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapEager, concatMapEager, concatMapEagerDelayError, concatMapEagerDelayError, concatMapIterable, concatMapIterable, concatMapMaybe, concatMapMaybe, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapMaybeDelayError, concatMapSingle, concatMapSingle, concatMapSingleDelayError, concatMapSingleDelayError, concatMapSingleDelayError, concatMapStream, concatMapStream, concatWith, concatWith, concatWith, concatWith, contains, count, create, debounce, debounce, debounce, debounce, defaultIfEmpty, defer, delay, delay, delay, delay, delay, delay, delaySubscription, delaySubscription, delaySubscription, dematerialize, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterNext, doAfterTerminate, doFinally, doOnCancel, doOnComplete, doOnEach, doOnEach, doOnError, doOnLifecycle, doOnNext, doOnRequest, doOnSubscribe, doOnTerminate, elementAt, elementAt, elementAtOrError, empty, error, error, filter, first, firstElement, firstOrError, firstOrErrorStage, firstStage, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMap, flatMapCompletable, flatMapCompletable, flatMapIterable, flatMapIterable, flatMapIterable, flatMapIterable, flatMapMaybe, flatMapMaybe, flatMapSingle, flatMapSingle, flatMapStream, flatMapStream, forEach, forEachWhile, forEachWhile, forEachWhile, fromAction, fromArray, fromCallable, fromCompletable, fromCompletionStage, fromFuture, fromFuture, fromIterable, fromMaybe, fromObservable, fromOptional, fromPublisher, fromRunnable, fromSingle, fromStream, fromSupplier, generate, generate, generate, generate, generate, groupBy, groupBy, groupBy, groupBy, groupBy, groupBy, groupJoin, hide, ignoreElements, interval, interval, interval, interval, intervalRange, intervalRange, isEmpty, join, just, just, just, just, just, just, just, just, just, just, last, lastElement, lastOrError, lastOrErrorStage, lastStage, lift, map, mapOptional, materialize, merge, merge, merge, merge, merge, merge, merge, merge, mergeArray, mergeArray, mergeArrayDelayError, mergeArrayDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeDelayError, mergeWith, mergeWith, mergeWith, mergeWith, never, observeOn, observeOn, observeOn, ofType, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureDrop, onBackpressureDrop, onBackpressureLatest, onBackpressureLatest, onBackpressureReduce, onBackpressureReduce, onErrorComplete, onErrorComplete, onErrorResumeNext, onErrorResumeWith, onErrorReturn, onErrorReturnItem, onTerminateDetach, parallel, parallel, parallel, publish, publish, publish, publish, range, rangeLong, rebatchRequests, reduce, reduce, reduceWith, repeat, repeat, repeatUntil, repeatWhen, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, replay, retry, retry, retry, retry, retry, retryUntil, retryWhen, safeSubscribe, sample, sample, sample, sample, sample, sample, sample, scan, scan, scanWith, sequenceEqual, sequenceEqual, sequenceEqual, sequenceEqual, serialize, share, single, singleElement, singleOrError, singleOrErrorStage, singleStage, skip, skip, skip, skipLast, skipLast, skipLast, skipLast, skipLast, skipLast, skipUntil, skipWhile, sorted, sorted, startWith, startWith, startWith, startWith, startWithArray, startWithItem, startWithIterable, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeOn, subscribeWith, switchIfEmpty, switchMap, switchMap, switchMapCompletable, switchMapCompletableDelayError, switchMapDelayError, switchMapDelayError, switchMapMaybe, switchMapMaybeDelayError, switchMapSingle, switchMapSingleDelayError, switchOnNext, switchOnNext, switchOnNextDelayError, switchOnNextDelayError, take, take, take, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeLast, takeUntil, takeUntil, takeWhile, test, test, test, throttleFirst, throttleFirst, throttleFirst, throttleLast, throttleLast, throttleLast, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleLatest, throttleWithTimeout, throttleWithTimeout, throttleWithTimeout, timeInterval, timeInterval, timeInterval, timeInterval, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timer, timer, timestamp, timestamp, timestamp, timestamp, to, toFuture, toList, toList, toList, toMap, toMap, toMap, toMultimap, toMultimap, toMultimap, toMultimap, toObservable, toSortedList, toSortedList, toSortedList, toSortedList, unsafeCreate, unsubscribeOn, using, using, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, window, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, withLatestFrom, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipArray, zipWith, zipWith, zipWith, zipWith
-
-
-
-
Field Detail
-
EMPTY_ARRAY
private static final java.lang.Object[] EMPTY_ARRAY
An empty array to avoid allocation in getValues().
-
buffer
final ReplayProcessor.ReplayBuffer<T> buffer
-
done
boolean done
-
subscribers
final java.util.concurrent.atomic.AtomicReference<ReplayProcessor.ReplaySubscription<T>[]> subscribers
-
EMPTY
static final ReplayProcessor.ReplaySubscription[] EMPTY
-
TERMINATED
static final ReplayProcessor.ReplaySubscription[] TERMINATED
-
-
Constructor Detail
-
ReplayProcessor
ReplayProcessor(ReplayProcessor.ReplayBuffer<@NonNull T> buffer)
Constructs a ReplayProcessor with the given custom ReplayBuffer instance.- Parameters:
buffer- the ReplayBuffer instance, not null (not verified)
-
-
Method Detail
-
create
@CheckReturnValue @NonNull public static <T> @NonNull ReplayProcessor<T> create()
Creates an unbounded ReplayProcessor.The internal buffer is backed by an
ArrayListand starts with an initial capacity of 16. Once the number of items reaches this capacity, it will grow as necessary (usually by 50%). However, as the number of items grows, this causes frequent array reallocation and copying, and may hurt performance and latency. This can be avoided with thecreate(int)overload which takes an initial capacity parameter and can be tuned to reduce the array reallocation frequency as needed.- Type Parameters:
T- the type of items observed and emitted by the ReplayProcessor- Returns:
- the created ReplayProcessor
-
create
@CheckReturnValue @NonNull public static <T> @NonNull ReplayProcessor<T> create(int capacityHint)
Creates an unbounded ReplayProcessor with the specified initial buffer capacity.Use this method to avoid excessive array reallocation while the internal buffer grows to accommodate new items. For example, if you know that the buffer will hold 32k items, you can ask the
ReplayProcessorto preallocate its internal array with a capacity to hold that many items. Once the items start to arrive, the internal array won't need to grow, creating less garbage and no overhead due to frequent array-copying.- Type Parameters:
T- the type of items observed and emitted by this type of processor- Parameters:
capacityHint- the initial buffer capacity- Returns:
- the created processor
- Throws:
java.lang.IllegalArgumentException- ifcapacityHintis non-positive
-
createWithSize
@CheckReturnValue @NonNull public static <T> @NonNull ReplayProcessor<T> createWithSize(int maxSize)
Creates a size-bounded ReplayProcessor.In this setting, the
ReplayProcessorholds at mostsizeitems in its internal buffer and discards the oldest item.When
Subscribers subscribe to a terminatedReplayProcessor, they are guaranteed to see at mostsizeonNextevents followed by a termination event.If a
Subscribersubscribes while theReplayProcessoris active, it will observe all items in the buffer at that point in time and each item observed afterwards, even if the buffer evicts items due to the size constraint in the mean time. In other words, once aSubscribersubscribes, it will receive items without gaps in the sequence.- Type Parameters:
T- the type of items observed and emitted by this type of processor- Parameters:
maxSize- the maximum number of buffered items- Returns:
- the created processor
- Throws:
java.lang.IllegalArgumentException- ifmaxSizeis non-positive
-
createUnbounded
@CheckReturnValue static <T> ReplayProcessor<T> createUnbounded()
Creates an unbounded ReplayProcessor with the bounded-implementation for testing purposes.This variant behaves like the regular unbounded
ReplayProcessorcreated viacreate()but uses the structures of the bounded-implementation. This is by no means intended for the replacement of the original, array-backed and unboundedReplayProcessordue to the additional overhead of the linked-list based internal buffer. The sole purpose is to allow testing and reasoning about the behavior of the bounded implementations without the interference of the eviction policies.- Type Parameters:
T- the type of items observed and emitted by this type of processor- Returns:
- the created processor
-
createWithTime
@CheckReturnValue @NonNull public static <T> @NonNull ReplayProcessor<T> createWithTime(long maxAge, @NonNull @NonNull java.util.concurrent.TimeUnit unit, @NonNull @NonNull Scheduler scheduler)
Creates a time-bounded ReplayProcessor.In this setting, the
ReplayProcessorinternally tags each observed item with a timestamp value supplied by theSchedulerand keeps only those whose age is less than the supplied time value converted to milliseconds. For example, an item arrives at T=0 and the max age is set to 5; at T>=5 this first item is then evicted by any subsequent item or termination event, leaving the buffer empty.Once the processor is terminated,
Subscribers subscribing to it will receive items that remained in the buffer after the terminal event, regardless of their age.If a
Subscribersubscribes while theReplayProcessoris active, it will observe only those items from within the buffer that have an age less than the specified time, and each item observed thereafter, even if the buffer evicts items due to the time constraint in the mean time. In other words, once aSubscribersubscribes, it observes items without gaps in the sequence except for any outdated items at the beginning of the sequence.Note that terminal notifications (
onErrorandonComplete) trigger eviction as well. For example, with a max age of 5, the first item is observed at T=0, then anonCompletenotification arrives at T=10. If aSubscribersubscribes at T=11, it will find an emptyReplayProcessorwith just anonCompletenotification.- Type Parameters:
T- the type of items observed and emitted by this type of processor- Parameters:
maxAge- the maximum age of the contained itemsunit- the time unit oftimescheduler- theSchedulerthat provides the current time- Returns:
- the created processor
- Throws:
java.lang.NullPointerException- ifunitorschedulerisnulljava.lang.IllegalArgumentException- ifmaxAgeis non-positive
-
createWithTimeAndSize
@CheckReturnValue @NonNull public static <T> @NonNull ReplayProcessor<T> createWithTimeAndSize(long maxAge, @NonNull @NonNull java.util.concurrent.TimeUnit unit, @NonNull @NonNull Scheduler scheduler, int maxSize)
Creates a time- and size-bounded ReplayProcessor.In this setting, the
ReplayProcessorinternally tags each received item with a timestamp value supplied by theSchedulerand holds at mostsizeitems in its internal buffer. It evicts items from the start of the buffer if their age becomes less-than or equal to the supplied age in milliseconds or the buffer reaches itssizelimit.When
Subscribers subscribe to a terminatedReplayProcessor, they observe the items that remained in the buffer after the terminal notification, regardless of their age, but at mostsizeitems.If a
Subscribersubscribes while theReplayProcessoris active, it will observe only those items from within the buffer that have age less than the specified time and each subsequent item, even if the buffer evicts items due to the time constraint in the mean time. In other words, once aSubscribersubscribes, it observes items without gaps in the sequence except for the outdated items at the beginning of the sequence.Note that terminal notifications (
onErrorandonComplete) trigger eviction as well. For example, with a max age of 5, the first item is observed at T=0, then anonCompletenotification arrives at T=10. If aSubscribersubscribes at T=11, it will find an emptyReplayProcessorwith just anonCompletenotification.- Type Parameters:
T- the type of items observed and emitted by this type of processor- Parameters:
maxAge- the maximum age of the contained itemsunit- the time unit oftimemaxSize- the maximum number of buffered itemsscheduler- theSchedulerthat provides the current time- Returns:
- the created processor
- Throws:
java.lang.NullPointerException- ifunitorschedulerisnulljava.lang.IllegalArgumentException- ifmaxAgeormaxSizeis non-positive
-
subscribeActual
protected void subscribeActual(org.reactivestreams.Subscriber<? super @NonNull T> s)
Description copied from class:FlowableOperator implementations (both source and intermediate) should implement this method that performs the necessary business logic and handles the incomingSubscribers.There is no need to call any of the plugin hooks on the current
Flowableinstance or theSubscriber; all hooks and basic safeguards have been applied byFlowable.subscribe(Subscriber)before this method gets called.- Specified by:
subscribeActualin classFlowable<T>- Parameters:
s- the incomingSubscriber, nevernull
-
onSubscribe
public void onSubscribe(org.reactivestreams.Subscription s)
Description copied from interface:FlowableSubscriberImplementors of this method should make sure everything that needs to be visible inSubscriber.onNext(Object)is established before callingSubscription.request(long). In practice this means no initialization should happen after therequest()call and additional behavior is thread safe in respect toonNext.
-
onError
public void onError(java.lang.Throwable t)
-
onComplete
public void onComplete()
-
hasSubscribers
@CheckReturnValue public boolean hasSubscribers()
Description copied from class:FlowableProcessorReturns true if the FlowableProcessor has subscribers.The method is thread-safe.
- Specified by:
hasSubscribersin classFlowableProcessor<T>- Returns:
- true if the FlowableProcessor has subscribers
-
subscriberCount
@CheckReturnValue int subscriberCount()
-
getThrowable
@Nullable @CheckReturnValue public @Nullable java.lang.Throwable getThrowable()
Description copied from class:FlowableProcessorReturns the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor hasn't terminated yet.The method is thread-safe.
- Specified by:
getThrowablein classFlowableProcessor<T>- Returns:
- the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor hasn't terminated yet
-
cleanupBuffer
public void cleanupBuffer()
Makes sure the item cached by the head node in a bounded ReplayProcessor is released (as it is never part of a replay).By default, live bounded buffers will remember one item before the currently receivable one to ensure subscribers can always receive a continuous sequence of items. A terminated ReplayProcessor automatically releases this inaccessible item.
The method must be called sequentially, similar to the standard
onXXXmethods.History: 2.1.11 - experimental
- Since:
- 2.2
-
getValue
@CheckReturnValue public T getValue()
Returns the latest value this processor has or null if no such value exists.The method is thread-safe.
- Returns:
- the latest value this processor currently has or null if no such value exists
-
getValues
@CheckReturnValue public java.lang.Object[] getValues()
Returns an Object array containing snapshot all values of this processor.The method is thread-safe.
- Returns:
- the array containing the snapshot of all values of this processor
-
getValues
@CheckReturnValue public T[] getValues(@NonNull T[] array)
Returns a typed array containing a snapshot of all values of this processor.The method follows the conventions of Collection.toArray by setting the array element after the last value to null (if the capacity permits).
The method is thread-safe.
- Parameters:
array- the target array to copy values into if it fits- Returns:
- the given array if the values fit into it or a new array containing all values
-
hasComplete
@CheckReturnValue public boolean hasComplete()
Description copied from class:FlowableProcessorReturns true if the FlowableProcessor has reached a terminal state through a complete event.The method is thread-safe.
- Specified by:
hasCompletein classFlowableProcessor<T>- Returns:
- true if the FlowableProcessor has reached a terminal state through a complete event
- See Also:
FlowableProcessor.hasThrowable()
-
hasThrowable
@CheckReturnValue public boolean hasThrowable()
Description copied from class:FlowableProcessorReturns true if the FlowableProcessor has reached a terminal state through an error event.The method is thread-safe.
- Specified by:
hasThrowablein classFlowableProcessor<T>- Returns:
- true if the FlowableProcessor has reached a terminal state through an error event
- See Also:
FlowableProcessor.getThrowable(),FlowableProcessor.hasComplete()
-
hasValue
@CheckReturnValue public boolean hasValue()
Returns true if this processor has any value.The method is thread-safe.
- Returns:
- true if the processor has any value
-
size
@CheckReturnValue int size()
-
add
boolean add(ReplayProcessor.ReplaySubscription<@NonNull T> rs)
-
remove
void remove(ReplayProcessor.ReplaySubscription<@NonNull T> rs)
-
-