Design and execution model

This page describes what runs where when you call matensemble.pipeline.Pipeline.submit(), how MatEnsemble talks to Flux, and how results move between tasks. It complements the step-by-step tutorials in Tutorials.

Runtime prerequisites

MatEnsemble assumes you are already inside a Flux allocation (or another environment where flux.Flux() can attach to a running broker). Typical patterns on HPC systems:

  • Submit an interactive or batch job that runs flux start (or your site’s equivalent) and then launches your Python driver inside that session.

  • The workflow driver process imports MatEnsemble, builds a Pipeline, and calls submit().

The Python package on PyPI does not replace the need for flux-core / sched binaries provided by your center. The flux optional dependency installs the Python bindings (flux-python) that talk to those libraries. See Installation.

Objects you interact with

Pipeline

Builder for a directed acyclic graph (DAG) of Chore instances. Calling @pipe.chore-decorated functions records delayed work; exec() adds shell/executable work.

Chore

Immutable specification for a single Flux submission: command vector, resource request, working directory, and (for Python chores) pickels callable by value into the registry

FluxManager

Created when you call submit(). It owns queues of ready, blocked, and running chore IDs, tracks free cores/GPUs from Flux, and drives the main scheduling loop.

Fluxlet

Thin wrapper that turns a Chore into a Flux JobspecV1 and submits it through a flux.job.FluxExecutor.

matensemble.runtime_worker

A normal Python module launched as the Flux job command for PYTHON-type chores. It unpickles the chore, imports your function, substitutes dependency results, runs the function, and writes result.pickle.

Workflow directory layout

When you construct Pipeline, it picks a timestamped root under your chosen base directory (by default, the current working directory):

<base>/
└── matensemble_workflow-YYYYMMDD_HHMMSS/
    ├── status.json              # atomically updated for the dashboard / monitoring
    ├── matensemble_workflow.log # detailed text log from the ``matensemble`` logger
    └── out/
        ├── registry/            # pickled chore callables
        │   ├── Callable name
        │   ├── Callable name
        │   └── ...
        └── <chore_id>/
            ├── stdout
            ├── stderr
            ├── chore.pickle     # Pickled chore object
            ├── metadata.json    # Metadata of the chore in JSON for debugging
            └── result.pickle    # Python chore return value (pickle)

The string <base> is pathlib.Path.cwd() unless you pass basedir= to Pipeline. The workflow folder name uses a compact timestamp.

DAG construction and ordering

Edges in the DAG are derived solely from OutputReference placeholders embedded in a Python chore’s positional or keyword arguments (including nested tuples, lists, dicts, and non-class dataclass instances). matensemble.pipeline.Pipeline.exec() does not currently accept dependency references; treat executable chores as root tasks unless you wrap shell work inside a Python chore.

Before submit, MatEnsemble:

  1. Builds a networkx.DiGraph with an edge upstream downstream for each dependency.

  2. Verifies that every referenced chore ID exists.

  3. Rejects cycles (topological sort must succeed).

The FluxManager receives chores in topological order, but submission order is additionally constrained by live resource availability (cores and GPUs).

Resource accounting

Each chore declares Resources:

  • num_tasks — Flux task count for the chore.

  • cores_per_task — CPU cores per task.

  • gpus_per_task — GPUs per task (may be zero).

The manager estimates needed cores and GPUs as num_tasks * cores_per_task and num_tasks * gpus_per_task, and compares against:

  • The total allocation (all chores must fit in the worst case—oversized chores are marked invalid).

  • The currently free counts reported by Flux after rank 0 is drained for the broker.

GPU affinity shell options are only applied when gpus_per_task > 0 and GPU affinity is enabled on submit.

Main scheduling loop (“super loop”)

Roughly each iteration:

  1. Refresh Flux free resource counts.

  2. Write status.json and a log line with pending / running / completed / failed counts.

  3. Drain the ready queue and submit every chore that fits; defer the rest to the back of the queue.

  4. Wait up to buffer_time seconds for at least one Flux future to complete (strategy-dependent).

  5. For each finished future: interpret exit code / exceptions, update dependents, and (in adaptive mode) try to submit more work immediately.

The two built-in strategies are AdaptiveStrategy and NonAdaptiveStrategy; see Configuration and behavior reference for behavioral differences.

Failure propagation

If a chore fails submission, raises in the Flux future wrapper, or returns a non-zero process exit code, MatEnsemble records a failure and cascades to all transitive dependents so the workflow cannot deadlock. Downstream chores receive failure reason dependency_failed with an upstream chore ID in the internal failure list. Check per-chore stderr for the detailed MatEnsemble annotations written by strategy.

Dashboard (optional)

Pass dashboard=True to submit(). A FastAPI + uvicorn thread serves static assets and GET /api/status on port 8000. On a cluster you typically SSH tunnel from your laptop to the compute node running the driver—for example:

ssh -L 8000:<nodelist>:8000 <user>@<login.host>

Use the exact hostname of the node where your workflow process runs; the snippet above is only illustrative.