Source code for matensemble.logger

import logging
import json
import sys
import os
import tempfile

from datetime import datetime
from pathlib import Path


[docs] class StatusWriter: """ Class to handle updating the status file Attributes ---------- path : Path the path to the status file nnodes : int The number of nodes that flux is managing (total_allocation - 1 for flux borker) cores_per_node : int The number of CPU cores that are available on each node gpus_per_node : int The number of GPUs that are available on each node """
[docs] def __init__( self, path: Path, nnodes: int, cores_per_node: int, gpus_per_node: int ) -> None: self.path = path self.nnodes = nnodes self.cores_per_node = cores_per_node self.gpus_per_node = gpus_per_node
[docs] def update( self, pending: int, running: int, completed: int, failed: int, free_cores: int, free_gpus: int, ) -> None: data = { "nodes": self.nnodes, "cores_per_node": self.cores_per_node, "gpus_per_node": self.gpus_per_node, "pending": pending, "running": running, "completed": completed, "failed": failed, "free_cores": free_cores, "free_gpus": free_gpus, } # atomically write the status file so that the dashboard never sees a # half written file. with tempfile.NamedTemporaryFile("w", dir=self.path.parent, delete=False) as tf: tf.write(json.dumps(data)) temp_name = tf.name os.replace(temp_name, self.path)
def _setup_status_writer( path: Path, nnodes: int, cores_per_node: int, gpus_per_node: int ): """ Setup the status writer for the :obj:`FluxManager` to Parameters ---------- path : Path The path to the status file nnodes : int The number of nodes that are on the allocation minus one for the Flux borker cores_per_node : int The number of CPU cores per node gpus_per_node : int The number of GPUs per nod """ return StatusWriter( path=path, nnodes=nnodes, cores_per_node=cores_per_node, gpus_per_node=gpus_per_node, ) def _setup_logger(base_dir: Path) -> logging.Logger: """ setup the status writer for the :obj:`FluxManager` """ logger = logging.getLogger("matensemble") logger.setLevel(logging.DEBUG) logger.propagate = False # Prevent duplicate handlers if setup is called twice if logger.handlers: logger.handlers.clear() fmt = logging.Formatter( fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) log_file = base_dir / f"matensemble_workflow.log" file_handler = logging.FileHandler(log_file, encoding="utf-8") file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(fmt) logger.addHandler(file_handler) if sys.stderr.isatty(): console_handler = logging.StreamHandler(stream=sys.stderr) console_handler.setLevel(logging.INFO) console_handler.setFormatter(fmt) logger.addHandler(console_handler) hint = ( f"Logs: {base_dir}/matensemble_workflow.log\n" f"Outputs: {base_dir}/out\n\n" f"Watch logs: watch tail -n 5 {base_dir}/matensemble_workflow.log" ) print(hint, file=sys.stderr) logger.info(f"Workflow initialized at {base_dir}") return logger