Behaviours: application.
Worker pool main interface.
Use functions provided by this module to manage your pools of workers.
Worker Pool is an Erlang application that can be started using the functions in the
application module. For convenience, wpool:start/0 and wpool:stop/0 are also provided.
wpool:child_spec/2 if you want to add the pool under a supervision tree
initialisation;wpool:start_pool/1 or wpool:start_pool/2 if you want to supervise it
yourself;wpool:start_sup_pool/1 or wpool:start_sup_pool/2 if you want the pool to live
under
wpool's supervision tree.
To stop a pool, just use wpool:stop_pool/1 or wpool:stop_sup_pool/1 according to how you
started the pool.
Since the workers are gen_servers, messages can be called or casted to them. To do that
you can use wpool:call and wpool:cast as you would use the equivalent functions on
gen_server.
Beyond the regular parameters for gen_server, wpool also provides an extra optional parameter
Strategy The strategy used to pick up the worker to perform the task. If not provided,
the result of wpool:default_strategy/0 is used.
The available strategies are defined in the t:wpool:strategy/0 type.
wpool:stats/1.
callbacks() = [module()]
Initial list of callback modules implementing wpool_process_callbacks to be
called on certain worker events.
enable_callbacks() is set to true.
Callbacks can be added and removed later by wpool_pool:add_callback_module/2 and
wpool_pool:remove_callback_module/2.
custom_strategy() = fun((atom()) -> Atom::atom())
A callback that gets the pool name and returns a worker's name.
enable_callbacks() = boolean()
A boolean value determining if event_manager should be started for callback modules.
false.
enable_queues() = boolean()
A boolean value determining if queue_manager should be started for queueing requests.
true.
Note that disabling this will disable available_worker and next_available_worker strategies.
max_overrun_warnings() = infinity | pos_integer()
The maximum number of overrun warnings emitted before killing the worker with a delayed task.
If this parameter is set to a value other than infinity the rounds of warnings become equally
timed (i.e. with overrun_warning = 1000 and max_overrun_warnings = 5 the task would be killed
after 5 seconds of execution).
The default value for this setting is infinity, i.e., delayed tasks are not killed.
available_worker (see worker strategy() below).
name() = atom()
Name of the pool
option() = {workers, workers()} | {worker, worker()} | {worker_opt, [worker_opt()]} | {strategy, supervisor_strategy()} | {worker_shutdown, worker_shutdown()} | {overrun_handler, overrun_handler() | [overrun_handler()]} | {overrun_warning, overrun_warning()} | {max_overrun_warnings, max_overrun_warnings()} | {pool_sup_intensity, pool_sup_intensity()} | {pool_sup_shutdown, pool_sup_shutdown()} | {pool_sup_period, pool_sup_period()} | {queue_type, queue_type()} | {enable_callbacks, enable_callbacks()} | {enable_queues, enable_queues()} | {callbacks, callbacks()}
Options that can be provided to a new pool.
child_spec/2, start_pool/2, start_sup_pool/2 are the callbacks
that take a list of these options as a parameter.
options() = #{workers => workers(), worker => worker(), worker_opt => [worker_opt()], strategy => supervisor_strategy(), worker_shutdown => worker_shutdown(), overrun_handler => overrun_handler() | [overrun_handler()], overrun_warning => overrun_warning(), max_overrun_warnings => max_overrun_warnings(), pool_sup_intensity => pool_sup_intensity(), pool_sup_shutdown => pool_sup_shutdown(), pool_sup_period => pool_sup_period(), queue_type => queue_type(), enable_callbacks => enable_callbacks(), enable_queues => enable_queues(), callbacks => callbacks(), term() => term()}
overrun_handler() = {Module::module(), Fun::atom()}
The module and function to call when a task is overrun
The default value for this setting is{logger, warning}. The function must be of
arity 1, and it will be called asModule:Fun(Args) where Args is a proplist with the following
reported values:
{alert, AlertType}: Where AlertType is overrun on regular warnings, or
max_overrun_limit when the worker is about to be killed.{pool, Pool}: The pool name.{worker, Pid}: Pid of the worker.{task, Task}: A description of the task.{runtime, Runtime}: The runtime of the current round.overrun_warning() = infinity | pos_integer()
The number of milliseconds after which a task is considered overrun i.e., delayed.
A warning is emitted using overrun_handler().
The task is monitored until it is finished, thus more than one warning might be emitted for a single task.
The rounds of warnings are not equally timed, an exponential backoff algorithm is used instead:
after each warning the overrun time is doubled (i.e. with overrun_warning = 1000 warnings would
be emitted after 1000, 2000, 4000, 8000 ...).
infinity, i.e., no warnings are emitted.
pool_sup_intensity() = non_neg_integer()
The supervision intensity to use over the supervisor that supervises the workers.
Defaults to5. See wpool_pool for more details.
pool_sup_period() = non_neg_integer()
The supervision period to use over the supervisor that supervises the workers.
Defaults to60. See wpool_pool for more details.
pool_sup_shutdown() = brutal_kill | timeout()
The shutdown option to be used over the supervisor that supervises the workers.
brutal_kill. See wpool_process_sup for more details.
queue_type() = fifo | lifo
Order in which requests will be stored and handled by workers.
This option can take valueslifo or fifo. Defaults to fifo.
run(Result) = fun((name() | pid(), timeout()) -> Result)
A function to run with a given worker.
It can be used to enable APIs that hide the gen_server behind a complex logic
that might for example curate parameters or run side-effects, for example, supervisor.
Opts =
#{workers => 3,
worker_shutdown => infinity,
worker => {supervisor, {Name, ModuleCallback, Args}}},
%% Note that the supervisor's `init/1' callback takes such 3-tuple.
{ok, Pid} = wpool:start_sup_pool(pool_of_supervisors, Opts),
...
Run = fun(Sup, _) -> supervisor:start_child(Sup, Params) end,
{ok, Pid} = wpool:run(pool_of_supervisors, Run, next_worker),
stats() = [{pool, name()} | {supervisor, pid()} | {options, [option()] | options()} | {size, non_neg_integer()} | {next_worker, pos_integer()} | {total_message_queue_len, non_neg_integer()} | {workers, [{pos_integer(), worker_stats()}]}]
strategy() = best_worker | random_worker | next_worker | available_worker | next_available_worker | {hash_worker, term()} | custom_strategy()
Strategy to use when choosing a worker.
best_worker
Picks the worker with the shortest queue of messages. Loosely based on this
article: https://lethain.com/load-balancing-across-erlang-process-groups/.
This strategy is usually useful when your workers always perform the same task, or tasks with expectedly similar runtimes.
random_workerJust picks a random worker. This strategy is the fastest one to select a worker. It's ideal if your workers will perform many short tasks.
next_workerPicks the next worker in a round-robin fashion. This ensures an evenly distribution of tasks.
available_worker
Instead of just picking one of the workers in the queue and sending the request to it, this
strategy queues the request and waits until a worker is available to perform it. That may render
the worker selection part of the process much slower (thus generating the need for an additional
parameter: Worker_Timeout that controls how many milliseconds the client is willing to spend
in that, regardless of the global Timeout for the call).
This strategy ensures that, if a worker crashes, no messages are lost in its message queue. It also ensures that, if a task takes too long, that doesn't block other tasks since, as soon as other worker is free it can pick up the next task in the list.
next_available_worker
In a way, this strategy behaves like available_worker in the sense that it will pick the first
worker that it can find which is not running any task at the moment, but the difference is that
it will fail if all workers are busy.
{hash_worker, Key}
This strategy takes a Key and selects a worker using erlang:phash2/2. This ensures that tasks
classified under the same key will be delivered to the same worker, which is useful to classify
events by key and work on them sequentially on the worker, distributing different keys across
different workers.
custom_strategy()supervisor_strategy() = supervisor:sup_flags()
Supervision strategy to use over the individual workers.
Defaults to{one_for_one, 5, 60}. See wpool_process_sup for more details.
worker() = {Module::module(), InitArg::term()}
The gen_server module and the arguments to pass to the init callback.
This is the module that each worker will run and the InitArgs to use on the corresponding
start_link call used to initiate it.
{wpool_worker, undefined}. That means that if you don't
provide a worker implementation, the pool will be generated with this default one.
See wpool_worker for details.
worker_opt() = gen_server:start_opt()
Server options that will be passed to each gen_server worker.
gen_server documentation.
worker_shutdown() = brutal_kill | timeout()
The shutdown option to be used over the individual workers.
5000. See wpool_process_sup for more details.
worker_stats() = [{messsage_queue_len, non_neg_integer()} | {memory, pos_integer()}]
Statistics about a worker in a pool.
workers() = pos_integer()
The number of workers in the pool.
The default value for this setting is100
| broadcall/3 | Calls all the workers within the given pool async and waits for the responses synchronously. |
| broadcast/2 | Casts a message to all the workers within the given pool. |
| call/2 | Equivalent to call(Sup, Call, default_strategy()).
|
| call/3 | Equivalent to call(Sup, Call, Strategy, 5000).
|
| call/4 | Picks a server and issues the call to it. |
| cast/2 | Equivalent to cast(Sup, Cast, default_strategy()).
|
| cast/3 | Picks a server and issues the cast to it. |
| child_spec/2 | Builds a child specification to pass to a supervisor. |
| default_strategy/0 | Default strategy. |
| get_workers/1 | Retrieves the list of worker registered names. |
| run/2 | Equivalent to run(Sup, Run, default_strategy()).
|
| run/3 | Equivalent to run(Sup, Run, Strategy, 5000).
|
| run/4 | Picks a server and issues the run to it. |
| send_request/2 | Equivalent to send_request(Sup, Call, default_strategy(), 5000).
|
| send_request/3 | Equivalent to send_request(Sup, Call, Strategy, 5000).
|
| send_request/4 | Picks a server and issues the call to it. |
| start/0 | Starts the application. |
| start_pool/1 | Equivalent to start_pool(Name, []).
|
| start_pool/2 | Starts (and links) a pool of N wpool_processes. |
| start_sup_pool/1 | Equivalent to start_sup_pool(Name, []).
|
| start_sup_pool/2 | Starts a pool of N wpool_processes supervised by wpool_sup |
| stats/0 | Retrieves a snapshot of statistics for all pools. |
| stats/1 | Retrieves a snapshot of statistics for a a given pool. |
| stop/0 | Stops the application. |
| stop_pool/1 | Stops a pool that doesn't belong to wpool_sup. |
| stop_sup_pool/1 | Stops a pool supervised by wpool_sup supervision tree. |
broadcall(Sup::wpool:name(), Call::term(), Timeout::timeout()) -> {[Replies::term()], [Errors::term()]}
Calls all the workers within the given pool async and waits for the responses synchronously.
If one worker times out, the entire call is considered timed-out.broadcast(Sup::wpool:name(), Cast::term()) -> ok
Casts a message to all the workers within the given pool.
NOTE: These messages don't get queued, they go straight to the worker's message queues, so if you're using available_worker strategy to balance the charge and you have some tasks queued up waiting for the next available worker, the broadcast will reach all the workers before the queued up tasks.call(Sup::name(), Call::term()) -> term()
Equivalent to call(Sup, Call, default_strategy()).
call(Sup::name(), Call::term(), Strategy::strategy()) -> term()
Equivalent to call(Sup, Call, Strategy, 5000).
call(Sup::name(), Call::term(), Fun::strategy(), Timeout::timeout()) -> term()
Picks a server and issues the call to it.
For all strategies except available_worker, Timeout applies only to the time spent on the actual call to the worker, because time spent finding the worker in other strategies is negligible. For available_worker the time used choosing a worker is also consideredcast(Sup::name(), Cast::term()) -> ok
Equivalent to cast(Sup, Cast, default_strategy()).
cast(Sup::name(), Cast::term(), Fun::strategy()) -> ok
Picks a server and issues the cast to it
child_spec(Name::name(), Options::[option()] | options()) -> supervisor:child_spec()
Builds a child specification to pass to a supervisor.
default_strategy() -> strategy()
Default strategy
get_workers(Sup::name()) -> [atom()]
Retrieves the list of worker registered names.
This can be useful to manually inspect the workers or do custom work on them.Equivalent to run(Sup, Run, default_strategy()).
run(Sup::name(), Run::run(Result), Strategy::strategy()) -> Result
Equivalent to run(Sup, Run, Strategy, 5000).
run(Sup::name(), Run::run(Result), Fun::strategy(), Timeout::timeout()) -> Result
Picks a server and issues the run to it.
For all strategies except available_worker, Timeout applies only to the time spent on the actual run to the worker, because time spent finding the worker in other strategies is negligible. For available_worker the time used choosing a worker is also consideredsend_request(Sup::name(), Call::term()) -> noproc | timeout | gen_server:request_id()
Equivalent to send_request(Sup, Call, default_strategy(), 5000).
send_request(Sup::name(), Call::term(), Strategy::strategy()) -> noproc | timeout | gen_server:request_id()
Equivalent to send_request(Sup, Call, Strategy, 5000).
send_request(Sup::name(), Call::term(), Fun::strategy(), Timeout::timeout()) -> noproc | timeout | gen_server:request_id()
Picks a server and issues the call to it.
Timeout applies only for the time used choosing a worker in the available_worker strategystart() -> ok | {error, {already_started, '?MODULE'}}
Starts the application
start_pool(Name::name()) -> supervisor:startlink_ret()
Equivalent to start_pool(Name, []).
start_pool(Name::name(), Options::[option()] | options()) -> supervisor:startlink_ret()
Starts (and links) a pool of N wpool_processes. The result pid belongs to a supervisor (in case you want to add it to a supervisor tree)
start_sup_pool(Name::name()) -> supervisor:startchild_ret()
Equivalent to start_sup_pool(Name, []).
start_sup_pool(Name::name(), Options::[option()] | options()) -> supervisor:startchild_ret()
Starts a pool of N wpool_processes supervised by wpool_sup
stats() -> [stats()]
Retrieves a snapshot of statistics for all pools.
Seet:stats/0 for details on the return type.
Retrieves a snapshot of statistics for a a given pool.
Seet:stats/0 for details on the return type.
stop() -> ok
Stops the application
stop_pool(Name::name()) -> true
Stops a pool that doesn't belong to wpool_sup.
stop_sup_pool(Name::name()) -> ok
Stops a pool supervised by wpool_sup supervision tree.
Generated by EDoc