Source code for matensemble.strategy

import traceback

import concurrent.futures
import pickle

import flux
import flux.job.executor

from datetime import datetime
from pathlib import Path

from abc import ABC, abstractmethod

from matensemble.model import OutputReference


[docs] class FutureProcessingStrategy(ABC): """ The Base Class that all FutureProcessingStrategy's must extend in order to be compliant with how the :obj:`FluxManager` uses them """
[docs] def __init__(self, manager) -> None: self.manager = manager
[docs] @abstractmethod def process_futures(self, buffer_time) -> None: """ Must be implemented by the child classes """ pass
[docs] class AdaptiveStrategy(FutureProcessingStrategy): """ An implementation of the :obj:`FutureProcessingStrategy` which will adaptively submit new :obj:`Chore`'s as incoming chores are completed. """
[docs] def __init__(self, manager) -> None: """ AdaptiveStrategy constructor Parameters ---------- manager : FluxManager The :obj:`FluxManager` that holds all of the queues and functions to handle them. """ super().__init__(manager)
[docs] def process_futures(self, buffer_time: float) -> None: """ Process the future objects as :obj:`Chore`'s complete Parameters ---------- buffer_time : float The amount of time to wait between chores being completed. """ completed, self.manager._futures = concurrent.futures.wait( self.manager._futures, timeout=buffer_time ) had_failure = False for fut in completed: chore_id = fut.chore_id chore = fut.chore_obj chore_name = chore_id.removeprefix("chore-").rsplit("-", 1)[0] self.manager._running_chores.remove(chore_id) try: rc = fut.result() except Exception as e: tb = traceback.format_exc() stamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") append_text( chore.workdir / "stderr", ( f"\n\n===== MATENSEMBLE WRAPPER ERROR ({stamp}) =====\n" f"chore={chore_id}\n" f"workdir={chore.workdir}\n" f"{type(e).__name__}: {e}" f"{tb}\n" ), ) self.manager._logger.exception("CHORE FAILED: chore=%s", chore_id) self.manager._record_failure( chore_id, reason="exception", exception=f"{type(e).__name__}: {e}", ) self.manager._fail_dependents(chore_id) had_failure = True continue if rc != 0: append_text( chore.workdir / "stderr", f"\n\n===== MATENSEMBLE: NONZERO EXIT =====\nchore={chore_id} rc={rc}\n", ) self.manager._logger.error( "CHORE NONZERO EXIT: chore=%s rc=%s | workdir=%s | stdout=%s | stderr=%s", chore_id, rc, chore.workdir, chore.workdir / "stdout", chore.workdir / "stderr", ) self.manager._record_failure( chore_id, reason=f"nonzero_exit:{rc}", ) self.manager._fail_dependents(chore_id) had_failure = True continue self.manager._completed_chores.append(chore_id) for dep_id in self.manager._dependents.get(chore_id, []): self.manager._remaining_deps[dep_id] -= 1 if self.manager._remaining_deps[dep_id] == 0: self.manager._ready.append(dep_id) self.manager._blocked.discard(dep_id) # adaptively submit another chore self.manager._submit_until_ooresources(buffer_time=buffer_time) if self.manager._write_restart_freq and ( len(self.manager._completed_chores) % self.manager._write_restart_freq == 0 ): self.manager._make_restart() if had_failure: return
[docs] class NonAdaptiveStrategy(FutureProcessingStrategy): """ An implementation of the :obj:`FutureProcessingStrategy` which will not adaptively submit new :obj:`Chore`'s as incoming chores are completed. """
[docs] def __init__(self, manager) -> None: super().__init__(manager)
[docs] def process_futures(self, buffer_time) -> None: completed, self.manager._futures = concurrent.futures.wait( self.manager._futures, timeout=buffer_time ) had_failure = False for fut in completed: chore_id = fut.chore_id chore = fut.chore_obj self.manager._running_chores.remove(chore_id) try: rc = fut.result() except Exception as e: tb = traceback.format_exc() stamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") append_text( chore.workdir / "stderr", ( f"\n\n===== MATENSEMBLE WRAPPER ERROR ({stamp}) =====\n" f"chore={chore_id}\n" f"workdir={chore.workdir}\n" f"{type(e).__name__}: {e}" f"{tb}\n" ), ) self.manager._logger.exception("CHORE FAILED: chore=%s", chore_id) self.manager._record_failure( chore_id, reason="exception", exception=f"{type(e).__name__}: {e}", ) self.manager._fail_dependents(chore_id) had_failure = True continue if rc != 0: append_text( chore.workdir / "stderr", f"\n\n===== MATENSEMBLE: NONZERO EXIT =====\nchore={chore_id} rc={rc}\n", ) self.manager._logger.error( "CHORE NONZERO EXIT: chore=%s rc=%s | workdir=%s | stdout=%s | stderr=%s", chore_id, rc, chore.workdir, chore.workdir / "stdout", chore.workdir / "stderr", ) self.manager._record_failure( chore_id, reason=f"nonzero_exit:{rc}", ) self.manager._fail_dependents(chore_id) had_failure = True continue self.manager._completed_chores.append(chore_id) for dep_id in self.manager._dependents.get(chore_id, []): self.manager._remaining_deps[dep_id] -= 1 if self.manager._remaining_deps[dep_id] == 0: self.manager._ready.append(dep_id) self.manager._blocked.discard(dep_id) if self.manager._write_restart_freq and ( len(self.manager._completed_chores) % self.manager._write_restart_freq == 0 ): self.manager._make_restart() if had_failure: return
# TODO: Make the strategy look through the bolo list and spawn a new chore with the output of another # and then make sure that the chore that is acting as a processing strat is a on the bolo list # and if that is done then you need to get the results and spawn a new chore from the ChoreSpec
[docs] class UserStrategy(FutureProcessingStrategy):
[docs] def __init__(self, manager, pipeline, processing_chore, bolo_list) -> None: super().__init__(manager) self.pipeline = pipeline self.proc_chore = processing_chore self.bolo_list = set(bolo_list)
# if not isinstance(chore, Callable[..., Chore]): # raise Exception( # f"Error: Failed to construct UserStrategy due to Type Error" # )
[docs] def process_futures(self, buffer_time) -> None: completed, self.manager._futures = concurrent.futures.wait( self.manager._futures, timeout=buffer_time ) had_failure = False for fut in completed: chore_id = fut.chore_id chore = fut.chore_obj chore_name = chore_id.removeprefix("chore-").rsplit("-", 1)[0] self.manager._running_chores.remove(chore_id) try: rc = fut.result() except Exception as e: tb = traceback.format_exc() stamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") append_text( chore.workdir / "stderr", ( f"\n\n===== MATENSEMBLE WRAPPER ERROR ({stamp}) =====\n" f"chore={chore_id}\n" f"workdir={chore.workdir}\n" f"{type(e).__name__}: {e}" f"{tb}\n" ), ) self.manager._logger.exception("CHORE FAILED: chore=%s", chore_id) self.manager._record_failure( chore_id, reason="exception", exception=f"{type(e).__name__}: {e}", ) self.manager._fail_dependents(chore_id) had_failure = True continue if rc != 0: append_text( chore.workdir / "stderr", f"\n\n===== MATENSEMBLE: NONZERO EXIT =====\nchore={chore_id} rc={rc}\n", ) self.manager._logger.error( "CHORE NONZERO EXIT: chore=%s rc=%s | workdir=%s | stdout=%s | stderr=%s", chore_id, rc, chore.workdir, chore.workdir / "stdout", chore.workdir / "stderr", ) self.manager._record_failure( chore_id, reason=f"nonzero_exit:{rc}", ) self.manager._fail_dependents(chore_id) had_failure = True continue self.manager._completed_chores.append(chore_id) for dep_id in self.manager._dependents.get(chore_id, []): self.manager._remaining_deps[dep_id] -= 1 if self.manager._remaining_deps[dep_id] == 0: self.manager._ready.append(dep_id) self.manager._blocked.discard(dep_id) # --- Processing the chore and spawning the new one --- if self.proc_chore == chore_name: try: # Trust boundary: result.pickle is written by matensemble.runtime_worker # in this workflow's chore workdir only—do not load pickles from # untrusted paths or third-party producers. with (chore.workdir / "result.pickle").open("rb") as f: chore_spec = pickle.load(f) new_chore, new_out = self.pipeline._spawn_chore_from_spec( chore_spec ) self.pipeline._admit_spawned_chore( new_chore, new_out, self.manager ) except Exception as e: self.manager._logger.exception( f"FAILED TO SPAWN CHORE: chore={self.proc_chore} | due the following Exception ->\n{e}" ) else: for bolo_name in self.bolo_list: if bolo_name == chore_name: try: out_ref = OutputReference(chore_id, chore.workdir) new_chore, new_out = self.pipeline._spawn_chore_from_name( self.proc_chore, dependent=out_ref ) self.pipeline._admit_spawned_chore( new_chore, new_out, self.manager ) except Exception as e: self.manager._logger.exception( f"FAILED TO SPAWN CHORE: proc_chore={self.proc_chore} " f"bolo_match={chore_name} | due the following Exception ->\n{e}" ) if self.manager._write_restart_freq and ( len(self.manager._completed_chores) % self.manager._write_restart_freq == 0 ): self.manager._make_restart() if had_failure: return
[docs] def append_text(path: Path, text: str) -> None: """ Append some text to the end of a given file. Used for writing error messages to stderr on a specific chore Parameters ---------- path : Path The path to the file to write to text : str The text to append to the file Return ------ None """ path.parent.mkdir(parents=True, exist_ok=True) with open(path, "a", encoding="utf-8") as f: f.write(text)