e3.testsuite.multiprocess_scheduler

Attributes

logger

WorkData

Type that contains all the information needed to do some unit of work.

SomeWorker

Worker subclass to start some unit of work.

JobFactoryCallback

Callback to create a Worker instance from work data.

CollectResultCallback

Callback to extract work result from a worker.

Classes

Worker

Abstract class to represent units of work for the scheduler.

MultiprocessScheduler

Scheduler to dispatch units of work to subprocesses.

Functions

compute_next_dyn_poll(→ float)

Adjust the polling interval.

Module Contents

e3.testsuite.multiprocess_scheduler.logger
class e3.testsuite.multiprocess_scheduler.Worker(uid: str, driver: e3.testsuite.driver.TestDriver, callback_name: str, slot: int, env: e3.env.Env)

Abstract class to represent units of work for the scheduler.

index_generator

Generate unique indexes for each worker.

uid
driver
callback_name
slot
env
index
process

Process that executes this test fragment.

abstractmethod start() e3.os.process.Run

Create and return the subprocess to do the work.

All subclasses must override this.

poll(scheduler: MultiprocessScheduler) bool

Return whether the subprocess is still running.

If it is, the caller should invoke MultiprocessScheduler.collect_result on it.

e3.testsuite.multiprocess_scheduler.WorkData

Type that contains all the information needed to do some unit of work.

e3.testsuite.multiprocess_scheduler.SomeWorker

Worker subclass to start some unit of work.

e3.testsuite.multiprocess_scheduler.JobFactoryCallback

Callback to create a Worker instance from work data.

Arguments are:

  • UID for this unit of work;

  • data for the work to do;

  • slot ID for the new worker.

Returned value is the Worker instance.

e3.testsuite.multiprocess_scheduler.CollectResultCallback

Callback to extract work result from a worker.

class e3.testsuite.multiprocess_scheduler.MultiprocessScheduler(dag: e3.collection.dag.DAG, job_factory: JobFactoryCallback, collect_result: CollectResultCallback, jobs: int = 0, dyn_poll_interval: bool = True)

Bases: Generic[WorkData, SomeWorker]

Scheduler to dispatch units of work to subprocesses.

parallelism
dag
workers: List[SomeWorker | None]

List of active workers. Indexes in this list correspond to slot IDs passed to workers: self.workers[N].slot == N for all present wor,kers. When the worker is done, we just replace it with None, and when a slot is None we can create a new worker for it.

iterator

Iterator to get ready-to-run units of work.

job_factory
collect_result
active_workers = 0

Equivalent to the number of non-None slots in self.workers.

poll_interval = 0.1

Time (in seconds) to wait between each round of worker polling.

dyn_poll_interval = True
no_free_item = False

True if there is work waiting to be executed, False if all work to be scheduled depends on work that hasn’t completed.

no_work_left = False

True if we processed all items from self.iterator (i.e. we got a StopIteration exception from it).

property has_free_slots: bool

Return whether there is a free slot to spawn a worker.

spawn_worker(uid: str, data: WorkData, slot: int) None

Create a worker and assign it to the given slot.

release_worker(slot: int) None

Release a worker, freeing the corresponding slot.

run() None

Run the loop to execute all units of work.

poll() None
e3.testsuite.multiprocess_scheduler.compute_next_dyn_poll(poll_counter: int, poll_interval: float) float

Adjust the polling interval.