Module wpool

Worker pool main interface.

Behaviours: application.

Description

Worker pool main interface.

Use functions provided by this module to manage your pools of workers.

Starting the application

Worker Pool is an Erlang application that can be started using the functions in the application module. For convenience, wpool:start/0 and wpool:stop/0 are also provided.

Starting a Pool

To start a new worker pool, you can either

Stopping a Pool

To stop a pool, just use wpool:stop_pool/1 or wpool:stop_sup_pool/1 according to how you started the pool.

Using the Workers

Since the workers are gen_servers, messages can be called or casted to them. To do that you can use wpool:call and wpool:cast as you would use the equivalent functions on gen_server.

Choosing a Strategy

Beyond the regular parameters for gen_server, wpool also provides an extra optional parameter Strategy The strategy used to pick up the worker to perform the task. If not provided, the result of wpool:default_strategy/0 is used.

The available strategies are defined in the t:wpool:strategy/0 type.

Watching a Pool

Wpool provides a way to get live statistics about a pool. To do that, you can use wpool:stats/1.

Data Types

callbacks()

callbacks() = [module()]

Initial list of callback modules implementing wpool_process_callbacks to be called on certain worker events.

This options will only work if the enable_callbacks() is set to true. Callbacks can be added and removed later by wpool_pool:add_callback_module/2 and wpool_pool:remove_callback_module/2.

custom_strategy()

custom_strategy() = fun((atom()) -> Atom::atom())

A callback that gets the pool name and returns a worker's name.

enable_callbacks()

enable_callbacks() = boolean()

A boolean value determining if event_manager should be started for callback modules.

Defaults to false.

enable_queues()

enable_queues() = boolean()

A boolean value determining if queue_manager should be started for queueing requests.

Defaults to true. Note that disabling this will disable available_worker and next_available_worker strategies.

max_overrun_warnings()

max_overrun_warnings() = infinity | pos_integer()

The maximum number of overrun warnings emitted before killing the worker with a delayed task.

If this parameter is set to a value other than infinity the rounds of warnings become equally timed (i.e. with overrun_warning = 1000 and max_overrun_warnings = 5 the task would be killed after 5 seconds of execution).

The default value for this setting is infinity, i.e., delayed tasks are not killed.

NOTE: As the worker is being killed it might cause worker's messages to be missing if you are using a worker stategy other than available_worker (see worker strategy() below).

name()

name() = atom()

Name of the pool

option()

option() = {workers, workers()} | {worker, worker()} | {worker_opt, [worker_opt()]} | {strategy, supervisor_strategy()} | {worker_shutdown, worker_shutdown()} | {overrun_handler, overrun_handler() | [overrun_handler()]} | {overrun_warning, overrun_warning()} | {max_overrun_warnings, max_overrun_warnings()} | {pool_sup_intensity, pool_sup_intensity()} | {pool_sup_shutdown, pool_sup_shutdown()} | {pool_sup_period, pool_sup_period()} | {queue_type, queue_type()} | {enable_callbacks, enable_callbacks()} | {enable_queues, enable_queues()} | {callbacks, callbacks()}

Options that can be provided to a new pool.

child_spec/2, start_pool/2, start_sup_pool/2 are the callbacks that take a list of these options as a parameter.

options()

options() = #{workers => workers(), worker => worker(), worker_opt => [worker_opt()], strategy => supervisor_strategy(), worker_shutdown => worker_shutdown(), overrun_handler => overrun_handler() | [overrun_handler()], overrun_warning => overrun_warning(), max_overrun_warnings => max_overrun_warnings(), pool_sup_intensity => pool_sup_intensity(), pool_sup_shutdown => pool_sup_shutdown(), pool_sup_period => pool_sup_period(), queue_type => queue_type(), enable_callbacks => enable_callbacks(), enable_queues => enable_queues(), callbacks => callbacks(), term() => term()}

overrun_handler()

overrun_handler() = {Module::module(), Fun::atom()}

The module and function to call when a task is overrun

The default value for this setting is {logger, warning}. The function must be of arity 1, and it will be called asModule:Fun(Args) where Args is a proplist with the following reported values:

overrun_warning()

overrun_warning() = infinity | pos_integer()

The number of milliseconds after which a task is considered overrun i.e., delayed.

A warning is emitted using overrun_handler().

The task is monitored until it is finished, thus more than one warning might be emitted for a single task.

The rounds of warnings are not equally timed, an exponential backoff algorithm is used instead: after each warning the overrun time is doubled (i.e. with overrun_warning = 1000 warnings would be emitted after 1000, 2000, 4000, 8000 ...).

The default value for this setting is infinity, i.e., no warnings are emitted.

pool_sup_intensity()

pool_sup_intensity() = non_neg_integer()

The supervision intensity to use over the supervisor that supervises the workers.

Defaults to 5. See wpool_pool for more details.

pool_sup_period()

pool_sup_period() = non_neg_integer()

The supervision period to use over the supervisor that supervises the workers.

Defaults to 60. See wpool_pool for more details.

pool_sup_shutdown()

pool_sup_shutdown() = brutal_kill | timeout()

The shutdown option to be used over the supervisor that supervises the workers.

Defaults to brutal_kill. See wpool_process_sup for more details.

queue_type()

queue_type() = fifo | lifo

Order in which requests will be stored and handled by workers.

This option can take values lifo or fifo. Defaults to fifo.

run()

run(Result) = fun((name() | pid(), timeout()) -> Result)

A function to run with a given worker.

It can be used to enable APIs that hide the gen_server behind a complex logic that might for example curate parameters or run side-effects, for example, supervisor.

For example:
   Opts =
       #{workers => 3,
         worker_shutdown => infinity,
         worker => {supervisor, {Name, ModuleCallback, Args}}},
         %% Note that the supervisor's `init/1' callback takes such 3-tuple.
  {ok, Pid} = wpool:start_sup_pool(pool_of_supervisors, Opts),
 
  ...
 
  Run = fun(Sup, _) -> supervisor:start_child(Sup, Params) end,
  {ok, Pid} = wpool:run(pool_of_supervisors, Run, next_worker),

stats()

stats() = [{pool, name()} | {supervisor, pid()} | {options, [option()] | options()} | {size, non_neg_integer()} | {next_worker, pos_integer()} | {total_message_queue_len, non_neg_integer()} | {workers, [{pos_integer(), worker_stats()}]}]

strategy()

strategy() = best_worker | random_worker | next_worker | available_worker | next_available_worker | {hash_worker, term()} | custom_strategy()

Strategy to use when choosing a worker.

best_worker

Picks the worker with the shortest queue of messages. Loosely based on this article: https://lethain.com/load-balancing-across-erlang-process-groups/.

This strategy is usually useful when your workers always perform the same task, or tasks with expectedly similar runtimes.

random_worker

Just picks a random worker. This strategy is the fastest one to select a worker. It's ideal if your workers will perform many short tasks.

next_worker

Picks the next worker in a round-robin fashion. This ensures an evenly distribution of tasks.

available_worker

Instead of just picking one of the workers in the queue and sending the request to it, this strategy queues the request and waits until a worker is available to perform it. That may render the worker selection part of the process much slower (thus generating the need for an additional parameter: Worker_Timeout that controls how many milliseconds the client is willing to spend in that, regardless of the global Timeout for the call).

This strategy ensures that, if a worker crashes, no messages are lost in its message queue. It also ensures that, if a task takes too long, that doesn't block other tasks since, as soon as other worker is free it can pick up the next task in the list.

next_available_worker

In a way, this strategy behaves like available_worker in the sense that it will pick the first worker that it can find which is not running any task at the moment, but the difference is that it will fail if all workers are busy.

{hash_worker, Key}

This strategy takes a Key and selects a worker using erlang:phash2/2. This ensures that tasks classified under the same key will be delivered to the same worker, which is useful to classify events by key and work on them sequentially on the worker, distributing different keys across different workers.

custom_strategy()

A callback that gets the pool name and returns a worker's name.

supervisor_strategy()

supervisor_strategy() = supervisor:sup_flags()

Supervision strategy to use over the individual workers.

Defaults to {one_for_one, 5, 60}. See wpool_process_sup for more details.

worker()

worker() = {Module::module(), InitArg::term()}

The gen_server module and the arguments to pass to the init callback.

This is the module that each worker will run and the InitArgs to use on the corresponding start_link call used to initiate it.

The default value for this setting is {wpool_worker, undefined}. That means that if you don't provide a worker implementation, the pool will be generated with this default one. See wpool_worker for details.

worker_opt()

worker_opt() = gen_server:start_opt()

Server options that will be passed to each gen_server worker.

These are the same as described at the gen_server documentation.

worker_shutdown()

worker_shutdown() = brutal_kill | timeout()

The shutdown option to be used over the individual workers.

Defaults to 5000. See wpool_process_sup for more details.

worker_stats()

worker_stats() = [{messsage_queue_len, non_neg_integer()} | {memory, pos_integer()}]

Statistics about a worker in a pool.

workers()

workers() = pos_integer()

The number of workers in the pool.

The default value for this setting is 100

Function Index

broadcall/3Calls all the workers within the given pool async and waits for the responses synchronously.
broadcast/2Casts a message to all the workers within the given pool.
call/2Equivalent to call(Sup, Call, default_strategy()).
call/3Equivalent to call(Sup, Call, Strategy, 5000).
call/4Picks a server and issues the call to it.
cast/2Equivalent to cast(Sup, Cast, default_strategy()).
cast/3Picks a server and issues the cast to it.
child_spec/2Builds a child specification to pass to a supervisor.
default_strategy/0Default strategy.
get_workers/1Retrieves the list of worker registered names.
run/2Equivalent to run(Sup, Run, default_strategy()).
run/3Equivalent to run(Sup, Run, Strategy, 5000).
run/4Picks a server and issues the run to it.
send_request/2Equivalent to send_request(Sup, Call, default_strategy(), 5000).
send_request/3Equivalent to send_request(Sup, Call, Strategy, 5000).
send_request/4Picks a server and issues the call to it.
start/0Starts the application.
start_pool/1Equivalent to start_pool(Name, []).
start_pool/2Starts (and links) a pool of N wpool_processes.
start_sup_pool/1Equivalent to start_sup_pool(Name, []).
start_sup_pool/2Starts a pool of N wpool_processes supervised by wpool_sup
stats/0Retrieves a snapshot of statistics for all pools.
stats/1Retrieves a snapshot of statistics for a a given pool.
stop/0Stops the application.
stop_pool/1Stops a pool that doesn't belong to wpool_sup.
stop_sup_pool/1Stops a pool supervised by wpool_sup supervision tree.

Function Details

broadcall/3

broadcall(Sup::wpool:name(), Call::term(), Timeout::timeout()) -> {[Replies::term()], [Errors::term()]}

Calls all the workers within the given pool async and waits for the responses synchronously.

If one worker times out, the entire call is considered timed-out.

broadcast/2

broadcast(Sup::wpool:name(), Cast::term()) -> ok

Casts a message to all the workers within the given pool.

NOTE: These messages don't get queued, they go straight to the worker's message queues, so if you're using available_worker strategy to balance the charge and you have some tasks queued up waiting for the next available worker, the broadcast will reach all the workers before the queued up tasks.

call/2

call(Sup::name(), Call::term()) -> term()

Equivalent to call(Sup, Call, default_strategy()).

call/3

call(Sup::name(), Call::term(), Strategy::strategy()) -> term()

Equivalent to call(Sup, Call, Strategy, 5000).

call/4

call(Sup::name(), Call::term(), Fun::strategy(), Timeout::timeout()) -> term()

Picks a server and issues the call to it.

For all strategies except available_worker, Timeout applies only to the time spent on the actual call to the worker, because time spent finding the worker in other strategies is negligible. For available_worker the time used choosing a worker is also considered

cast/2

cast(Sup::name(), Cast::term()) -> ok

Equivalent to cast(Sup, Cast, default_strategy()).

cast/3

cast(Sup::name(), Cast::term(), Fun::strategy()) -> ok

Picks a server and issues the cast to it

child_spec/2

child_spec(Name::name(), Options::[option()] | options()) -> supervisor:child_spec()

Builds a child specification to pass to a supervisor.

default_strategy/0

default_strategy() -> strategy()

Default strategy

get_workers/1

get_workers(Sup::name()) -> [atom()]

Retrieves the list of worker registered names.

This can be useful to manually inspect the workers or do custom work on them.

run/2

run(Sup::name(), Run::run(Result)) -> Result

Equivalent to run(Sup, Run, default_strategy()).

run/3

run(Sup::name(), Run::run(Result), Strategy::strategy()) -> Result

Equivalent to run(Sup, Run, Strategy, 5000).

run/4

run(Sup::name(), Run::run(Result), Fun::strategy(), Timeout::timeout()) -> Result

Picks a server and issues the run to it.

For all strategies except available_worker, Timeout applies only to the time spent on the actual run to the worker, because time spent finding the worker in other strategies is negligible. For available_worker the time used choosing a worker is also considered

send_request/2

send_request(Sup::name(), Call::term()) -> noproc | timeout | gen_server:request_id()

Equivalent to send_request(Sup, Call, default_strategy(), 5000).

send_request/3

send_request(Sup::name(), Call::term(), Strategy::strategy()) -> noproc | timeout | gen_server:request_id()

Equivalent to send_request(Sup, Call, Strategy, 5000).

send_request/4

send_request(Sup::name(), Call::term(), Fun::strategy(), Timeout::timeout()) -> noproc | timeout | gen_server:request_id()

Picks a server and issues the call to it.

Timeout applies only for the time used choosing a worker in the available_worker strategy

start/0

start() -> ok | {error, {already_started, '?MODULE'}}

Starts the application

start_pool/1

start_pool(Name::name()) -> supervisor:startlink_ret()

Equivalent to start_pool(Name, []).

start_pool/2

start_pool(Name::name(), Options::[option()] | options()) -> supervisor:startlink_ret()

Starts (and links) a pool of N wpool_processes. The result pid belongs to a supervisor (in case you want to add it to a supervisor tree)

start_sup_pool/1

start_sup_pool(Name::name()) -> supervisor:startchild_ret()

Equivalent to start_sup_pool(Name, []).

start_sup_pool/2

start_sup_pool(Name::name(), Options::[option()] | options()) -> supervisor:startchild_ret()

Starts a pool of N wpool_processes supervised by wpool_sup

stats/0

stats() -> [stats()]

Retrieves a snapshot of statistics for all pools.

See t:stats/0 for details on the return type.

stats/1

stats(Sup::name()) -> stats()

Retrieves a snapshot of statistics for a a given pool.

See t:stats/0 for details on the return type.

stop/0

stop() -> ok

Stops the application

stop_pool/1

stop_pool(Name::name()) -> true

Stops a pool that doesn't belong to wpool_sup.

stop_sup_pool/1

stop_sup_pool(Name::name()) -> ok

Stops a pool supervised by wpool_sup supervision tree.


Generated by EDoc