Class BroadcastChannel<T>

java.lang.Object
groovy.concurrent.BroadcastChannel<T>
Type Parameters:
T - the value type

public final class BroadcastChannel<T> extends Object
A one-to-many broadcast channel where each value sent is delivered to all subscribers.

Unlike AsyncChannel (point-to-point, each value consumed by one receiver), a BroadcastChannel delivers every value to every subscriber that has called subscribe().


 def broadcast = BroadcastChannel.create()
 def sub1 = broadcast.subscribe()
 def sub2 = broadcast.subscribe()

 async {
     broadcast.send('hello')
     broadcast.send('world')
     broadcast.close()
 }

 // Both subscribers receive both values
 for await (msg in sub1) { println "Sub1: $msg" }
 for await (msg in sub2) { println "Sub2: $msg" }
 

Inspired by GPars' DataflowBroadcast.

Since:
6.0.0
See Also:
  • Method Details

    • create

      public static <T> BroadcastChannel<T> create()
      Creates a new broadcast channel.
      Type Parameters:
      T - the value type
      Returns:
      a new BroadcastChannel
    • subscribe

      public AsyncChannel<T> subscribe()
      Creates a new subscriber channel. The returned AsyncChannel will receive all values sent to this broadcast from this point forward. Each subscriber is independent — values are buffered per subscriber.
      Returns:
      a new subscriber channel
    • subscribe

      public AsyncChannel<T> subscribe(int bufferSize)
      Creates a new subscriber channel with the specified buffer capacity.
      Parameters:
      bufferSize - the buffer capacity for this subscriber
      Returns:
      a new subscriber channel
    • send

      public Awaitable<Void> send(T value)
      Sends a value to all current subscribers.
      Parameters:
      value - the value to broadcast
      Returns:
      an Awaitable that completes when all subscribers have accepted the value
      Throws:
      ChannelClosedException - if the broadcast channel is closed
    • close

      public void close()
      Closes this broadcast channel and all subscriber channels.
    • isClosed

      public boolean isClosed()
      Returns true if this broadcast channel has been closed.
    • getSubscriberCount

      public int getSubscriberCount()
      Returns the number of current subscribers.
    • asPublisher

      public Flow.Publisher<T> asPublisher()
      Returns a Flow.Publisher view of this broadcast channel. Each call to Flow.Publisher.subscribe(Flow.Subscriber) on the returned publisher creates a new AsyncChannel subscriber under the hood, draining values to the downstream subscriber according to its requested demand.

      Semantics:

      • Cold per-subscribe binding: each subscription starts seeing values from the moment it subscribes (consistent with subscribe()).
      • Backpressure: respects request(n); the worker blocks the broadcast send when no demand exists (sender-side backpressure).
      • Cancellation: closes this subscriber's channel and removes it from the broadcast's subscriber set.
      • Completion: signals onComplete when the broadcast channel is closed and the per-subscriber buffer drained.

      Backpressure policy (important). This bridge uses lossless, sender-gated backpressure: send(Object) awaits delivery to every live subscriber, and each per-subscriber channel has a bounded buffer (default 16). A subscriber that never calls request(n), or that requests slowly, will fill its buffer; once full, the subscriber's channel suspends its backing send, which in turn stalls BroadcastChannel.send(...) for all subscribers. In other words, the slowest subscriber controls producer throughput.

      This is intentional and matches the point-to-point semantics of subscribe(): values are neither dropped nor reordered. If you need decoupled per-subscriber policies (drop-newest, drop-oldest, latest-only, or unbounded buffering), wrap the publisher with a Reactive Streams operator of your choice, or use a subscriber that drains promptly with request(Long.MAX_VALUE).

      Returns:
      a Flow.Publisher backed by per-subscriber channels
      Since:
      6.0.0