e3.testsuite.multiprocess_scheduler
Attributes
Type that contains all the information needed to do some unit of work. |
|
Worker subclass to start some unit of work. |
|
Callback to create a Worker instance from work data. |
|
Callback to extract work result from a worker. |
Classes
Abstract class to represent units of work for the scheduler. |
|
Scheduler to dispatch units of work to subprocesses. |
Functions
|
Adjust the polling interval. |
Module Contents
- e3.testsuite.multiprocess_scheduler.logger
- class e3.testsuite.multiprocess_scheduler.Worker(uid: str, driver: e3.testsuite.driver.TestDriver, callback_name: str, slot: int, env: e3.env.Env)
Abstract class to represent units of work for the scheduler.
- index_generator
Generate unique indexes for each worker.
- uid
- driver
- callback_name
- slot
- env
- index
- process
Process that executes this test fragment.
- abstractmethod start() e3.os.process.Run
Create and return the subprocess to do the work.
All subclasses must override this.
- poll(scheduler: MultiprocessScheduler) bool
Return whether the subprocess is still running.
If it is, the caller should invoke MultiprocessScheduler.collect_result on it.
- e3.testsuite.multiprocess_scheduler.WorkData
Type that contains all the information needed to do some unit of work.
- e3.testsuite.multiprocess_scheduler.SomeWorker
Worker subclass to start some unit of work.
- e3.testsuite.multiprocess_scheduler.JobFactoryCallback
Callback to create a Worker instance from work data.
Arguments are:
UID for this unit of work;
data for the work to do;
slot ID for the new worker.
Returned value is the Worker instance.
- e3.testsuite.multiprocess_scheduler.CollectResultCallback
Callback to extract work result from a worker.
- class e3.testsuite.multiprocess_scheduler.MultiprocessScheduler(dag: e3.collection.dag.DAG, job_factory: JobFactoryCallback, collect_result: CollectResultCallback, jobs: int = 0, dyn_poll_interval: bool = True)
Bases:
Generic[WorkData,SomeWorker]Scheduler to dispatch units of work to subprocesses.
- parallelism
- dag
- workers: List[SomeWorker | None]
List of active workers. Indexes in this list correspond to slot IDs passed to workers: self.workers[N].slot == N for all present wor,kers. When the worker is done, we just replace it with None, and when a slot is None we can create a new worker for it.
- iterator
Iterator to get ready-to-run units of work.
- job_factory
- collect_result
- active_workers = 0
Equivalent to the number of non-None slots in
self.workers.
- poll_interval = 0.1
Time (in seconds) to wait between each round of worker polling.
- dyn_poll_interval = True
- no_free_item = False
True if there is work waiting to be executed, False if all work to be scheduled depends on work that hasn’t completed.
- no_work_left = False
True if we processed all items from
self.iterator(i.e. we got aStopIterationexception from it).
- property has_free_slots: bool
Return whether there is a free slot to spawn a worker.
- spawn_worker(uid: str, data: WorkData, slot: int) None
Create a worker and assign it to the given slot.
- release_worker(slot: int) None
Release a worker, freeing the corresponding slot.
- run() None
Run the loop to execute all units of work.
- poll() None
- e3.testsuite.multiprocess_scheduler.compute_next_dyn_poll(poll_counter: int, poll_interval: float) float
Adjust the polling interval.