Class Agent<T>

java.lang.Object
groovy.concurrent.Agent<T>
Type Parameters:
T - the value type

public final class Agent<T> extends Object
A thread-safe mutable-value container inspired by Clojure's agents and GPars' Agent.

An Agent wraps a value that can be read by any thread but modified only through serialised update functions. Updates are queued and applied one at a time on a dedicated executor, guaranteeing that the value is never corrupted by concurrent writes.

Reading the current value via get() is non-blocking and returns a snapshot. Sending an update via send(Function) is also non-blocking — the function is queued and applied asynchronously. Use sendAndGet(Function) to obtain an Awaitable that completes with the new value after the update is applied.


 // Groovy:
 def counter = Agent.create(0)
 counter.send { it + 1 }
 counter.send { it + 1 }
 assert await(counter.getAsync()) == 2

 // Java:
 Agent<Integer> counter = Agent.create(0);
 counter.send(n -> n + 1);
 Awaitable<Integer> result = counter.sendAndGet(n -> n + 1);
 
Since:
6.0.0
See Also:
  • Field Details

    • DEFAULT_CHANGES_BUFFER

      public static final int DEFAULT_CHANGES_BUFFER
      Default per-subscriber buffer size for changes().
      See Also:
  • Method Details

    • create

      public static <T> Agent<T> create(T initialValue)
      Creates an agent with the given initial value, using a single-thread executor for serialised updates.
      Type Parameters:
      T - the value type
      Parameters:
      initialValue - the starting value
      Returns:
      a new agent
    • create

      public static <T> Agent<T> create(T initialValue, Pool pool)
      Creates an agent backed by the given pool for update execution. Updates are still serialised (only one at a time), but they run on the pool's threads.
      Type Parameters:
      T - the value type
      Parameters:
      initialValue - the starting value
      pool - the pool to use for updates
      Returns:
      a new agent
    • get

      public T get()
      Returns the current value. This is a non-blocking snapshot read.
      Returns:
      the current value
    • getAsync

      public Awaitable<T> getAsync()
      Returns the current value as an Awaitable. The awaitable completes after all previously queued updates have been applied, ensuring a consistent read.
      Returns:
      an awaitable holding the value after pending updates
    • send

      public void send(Function<T,T> updateFn)
      Queues an update function to be applied to the current value. The function receives the current value and returns the new value.

      Updates are applied asynchronously and serialised: only one update runs at a time.

      Parameters:
      updateFn - a function from current value to new value
    • sendAndGet

      public Awaitable<T> sendAndGet(Function<T,T> updateFn)
      Queues an update function and returns an Awaitable that completes with the new value after the update is applied.
      Parameters:
      updateFn - a function from current value to new value
      Returns:
      an awaitable holding the new value
    • shutdown

      public void shutdown()
      Shuts down the agent's update executor. No further updates will be accepted. Pending updates are executed before shutdown completes. The changes publisher (if any subscribers attached) is closed after pending updates drain, signalling onComplete to all live subscribers. Calling shutdown() more than once is a no-op.
    • changes

      public Flow.Publisher<T> changes()
      Returns a Flow.Publisher that emits the agent's value after every successful update. The publisher is hot and per-subscriber:
      • Subscribers see only changes that occur after they subscribe; the current value at subscription time is not replayed.
      • Each subscriber gets an independent buffer (default 256 items).
      • Slow subscribers drop the most recent value rather than blocking the agent's update thread. Values already buffered are delivered in order; only newly-offered values that cannot fit are discarded.
      • Closes (signals onComplete) when shutdown() is called. If changes() is first called after shutdown(), the returned publisher is already closed and subscribers receive onComplete immediately.

      Typical use:

      
       for await (newValue in agent.changes()) {
           log.info "Agent value is now {}", newValue
       }
       
      Returns:
      a hot publisher of state transitions
      Since:
      6.0.0
    • toString

      public String toString()
      Returns the current value in a diagnostic form.
      Overrides:
      toString in class Object
      Returns:
      the current value description