.. _design:
Design and execution model
=================================
This page describes **what runs where** when you call :meth:`matensemble.pipeline.Pipeline.submit`,
how MatEnsemble talks to Flux, and how results move between tasks. It complements the
step-by-step tutorials in :doc:`tutorials`.
Runtime prerequisites
---------------------
MatEnsemble assumes you are already inside a **Flux allocation** (or another environment
where ``flux.Flux()`` can attach to a running broker). Typical patterns on HPC systems:
* Submit an interactive or batch job that runs ``flux start`` (or your site’s equivalent)
and then launches your Python driver inside that session.
* The workflow driver process imports MatEnsemble, builds a :class:`~matensemble.pipeline.Pipeline`,
and calls :meth:`~matensemble.pipeline.Pipeline.submit`.
The Python package on PyPI does **not** replace the need for **flux-core** / sched binaries
provided by your center. The ``flux`` optional dependency installs the **Python bindings**
(``flux-python``) that talk to those libraries. See :doc:`installation`.
Objects you interact with
-------------------------
:class:`~matensemble.pipeline.Pipeline`
Builder for a directed acyclic graph (DAG) of :class:`~matensemble.chore.Chore` instances.
Calling ``@pipe.chore``-decorated functions records delayed work; :meth:`~matensemble.pipeline.Pipeline.exec`
adds shell/executable work.
:class:`~matensemble.chore.Chore`
Immutable specification for a single Flux submission: command vector, resource request,
working directory, and (for Python chores) pickels callable by value into the **registry**
:class:`~matensemble.manager.FluxManager`
Created when you call :meth:`~matensemble.pipeline.Pipeline.submit`. It owns queues of ready,
blocked, and running chore IDs, tracks free cores/GPUs from Flux, and drives the main scheduling loop.
:class:`~matensemble.fluxlet.Fluxlet`
Thin wrapper that turns a :class:`~matensemble.chore.Chore` into a Flux ``JobspecV1`` and submits it
through a :class:`flux.job.FluxExecutor`.
``matensemble.runtime_worker``
A normal Python module launched **as the Flux job command** for PYTHON-type chores. It unpickles
the chore, imports your function, substitutes dependency results, runs the function, and writes
``result.pickle``.
Workflow directory layout
-------------------------
When you construct :class:`~matensemble.pipeline.Pipeline`, it picks a timestamped root under your
chosen base directory (by default, the current working directory):
.. code-block:: text
/
└── matensemble_workflow-YYYYMMDD_HHMMSS/
├── status.json # atomically updated for the dashboard / monitoring
├── matensemble_workflow.log # detailed text log from the ``matensemble`` logger
└── out/
├── registry/ # pickled chore callables
│ ├── Callable name
│ ├── Callable name
│ └── ...
└── /
├── stdout
├── stderr
├── chore.pickle # Pickled chore object
├── metadata.json # Metadata of the chore in JSON for debugging
└── result.pickle # Python chore return value (pickle)
The string ```` is :meth:`pathlib.Path.cwd` unless you pass ``basedir=`` to :class:`~matensemble.pipeline.Pipeline`.
The workflow folder name uses a compact timestamp.
DAG construction and ordering
------------------------------
Edges in the DAG are derived solely from :class:`~matensemble.model.OutputReference` placeholders
embedded in a Python chore’s positional or keyword arguments (including nested tuples, lists, dicts,
and non-class dataclass instances). :meth:`matensemble.pipeline.Pipeline.exec` does **not** currently
accept dependency references; treat executable chores as root tasks unless you wrap shell work inside
a Python chore.
Before submit, MatEnsemble:
#. Builds a :class:`networkx.DiGraph` with an edge ``upstream → downstream`` for each dependency.
#. Verifies that every referenced chore ID exists.
#. Rejects cycles (topological sort must succeed).
The :class:`~matensemble.manager.FluxManager` receives chores in topological order, but **submission**
order is additionally constrained by live resource availability (cores and GPUs).
Resource accounting
-------------------
Each chore declares :class:`~matensemble.model.Resources`:
* ``num_tasks`` — Flux task count for the chore.
* ``cores_per_task`` — CPU cores per task.
* ``gpus_per_task`` — GPUs per task (may be zero).
The manager estimates **needed** cores and GPUs as ``num_tasks * cores_per_task`` and
``num_tasks * gpus_per_task``, and compares against:
* The **total** allocation (all chores must fit in the worst case—oversized chores are marked invalid).
* The **currently free** counts reported by Flux after rank 0 is drained for the broker.
GPU affinity shell options are only applied when ``gpus_per_task > 0`` and GPU affinity is enabled
on submit.
Main scheduling loop (“super loop”)
-----------------------------------
Roughly each iteration:
#. Refresh Flux free resource counts.
#. Write ``status.json`` and a log line with pending / running / completed / failed counts.
#. Drain the **ready** queue and submit every chore that fits; defer the rest to the back of the queue.
#. Wait up to ``buffer_time`` seconds for at least one Flux future to complete (strategy-dependent).
#. For each finished future: interpret exit code / exceptions, update dependents, and (in adaptive
mode) try to submit more work immediately.
The two built-in strategies are :class:`~matensemble.strategy.AdaptiveStrategy` and
:class:`~matensemble.strategy.NonAdaptiveStrategy`; see :doc:`reference` for behavioral differences.
Failure propagation
-------------------
If a chore fails submission, raises in the Flux future wrapper, or returns a non-zero process exit code,
MatEnsemble records a failure and **cascades** to all transitive dependents so the workflow cannot deadlock.
Downstream chores receive failure reason ``dependency_failed`` with an ``upstream`` chore ID in the internal
failure list. Check per-chore ``stderr`` for the detailed MatEnsemble annotations written by
:class:`~matensemble.strategy`.
Dashboard (optional)
--------------------
Pass ``dashboard=True`` to :meth:`~matensemble.pipeline.Pipeline.submit`. A FastAPI + uvicorn thread
serves static assets and ``GET /api/status`` on **port 8000**. On a cluster you typically **SSH tunnel**
from your laptop to the compute node running the driver—for example:
.. code-block:: bash
ssh -L 8000::8000 @
Use the exact hostname of the node where your workflow process runs; the snippet above is only illustrative.