e3.testsuite.multiprocess_scheduler
===================================

.. py:module:: e3.testsuite.multiprocess_scheduler


Attributes
----------

.. autoapisummary::

   e3.testsuite.multiprocess_scheduler.logger
   e3.testsuite.multiprocess_scheduler.WorkData
   e3.testsuite.multiprocess_scheduler.SomeWorker
   e3.testsuite.multiprocess_scheduler.JobFactoryCallback
   e3.testsuite.multiprocess_scheduler.CollectResultCallback


Classes
-------

.. autoapisummary::

   e3.testsuite.multiprocess_scheduler.Worker
   e3.testsuite.multiprocess_scheduler.MultiprocessScheduler


Functions
---------

.. autoapisummary::

   e3.testsuite.multiprocess_scheduler.compute_next_dyn_poll


Module Contents
---------------

.. py:data:: logger

.. py:class:: Worker(uid: str, driver: e3.testsuite.driver.TestDriver, callback_name: str, slot: int, env: e3.env.Env)

   Abstract class to represent units of work for the scheduler.


   .. py:attribute:: index_generator

      Generate unique indexes for each worker.



   .. py:attribute:: uid


   .. py:attribute:: driver


   .. py:attribute:: callback_name


   .. py:attribute:: slot


   .. py:attribute:: env


   .. py:attribute:: index


   .. py:attribute:: process

      Process that executes this test fragment.



   .. py:method:: start() -> e3.os.process.Run
      :abstractmethod:


      Create and return the subprocess to do the work.

      All subclasses must override this.



   .. py:method:: poll(scheduler: MultiprocessScheduler) -> bool

      Return whether the subprocess is still running.

      If it is, the caller should invoke
      `MultiprocessScheduler.collect_result` on it.



.. py:data:: WorkData

   Type that contains all the information needed to do some unit of work.


.. py:data:: SomeWorker

   Worker subclass to start some unit of work.


.. py:data:: JobFactoryCallback

   Callback to create a Worker instance from work data.

   Arguments are:

   * UID for this unit of work;
   * data for the work to do;
   * slot ID for the new worker.

   Returned value is the Worker instance.


.. py:data:: CollectResultCallback

   Callback to extract work result from a worker.


.. py:class:: MultiprocessScheduler(dag: e3.collection.dag.DAG, job_factory: JobFactoryCallback, collect_result: CollectResultCallback, jobs: int = 0, dyn_poll_interval: bool = True)

   Bases: :py:obj:`Generic`\ [\ :py:obj:`WorkData`\ , :py:obj:`SomeWorker`\ ]


   Scheduler to dispatch units of work to subprocesses.


   .. py:attribute:: parallelism


   .. py:attribute:: dag


   .. py:attribute:: workers
      :type:  List[Optional[SomeWorker]]

      List of active workers. Indexes in this list correspond to slot IDs
      passed to workers: `self.workers[N].slot == N` for all present
      wor,kers. When the worker is done, we just replace it with None, and
      when a slot is None we can create a new worker for it.



   .. py:attribute:: iterator

      Iterator to get ready-to-run units of work.



   .. py:attribute:: job_factory


   .. py:attribute:: collect_result


   .. py:attribute:: active_workers
      :value: 0


      Equivalent to the number of non-None slots in ``self.workers``.



   .. py:attribute:: poll_interval
      :value: 0.1


      Time (in seconds) to wait between each round of worker polling.



   .. py:attribute:: dyn_poll_interval
      :value: True



   .. py:attribute:: no_free_item
      :value: False


      True if there is work waiting to be executed, False if all work to be
      scheduled depends on work that hasn't completed.



   .. py:attribute:: no_work_left
      :value: False


      True if we processed all items from ``self.iterator`` (i.e. we got a
      ``StopIteration`` exception from it).



   .. py:property:: has_free_slots
      :type: bool


      Return whether there is a free slot to spawn a worker.



   .. py:method:: spawn_worker(uid: str, data: WorkData, slot: int) -> None

      Create a worker and assign it to the given slot.



   .. py:method:: release_worker(slot: int) -> None

      Release a worker, freeing the corresponding slot.



   .. py:method:: run() -> None

      Run the loop to execute all units of work.



   .. py:method:: poll() -> None


.. py:function:: compute_next_dyn_poll(poll_counter: int, poll_interval: float) -> float

   Adjust the polling interval.


