import logging
import json
import sys
import os
import tempfile
from datetime import datetime
from pathlib import Path
[docs]
class StatusWriter:
"""
Class to handle updating the status file
Attributes
----------
path : Path
the path to the status file
nnodes : int
The number of nodes that flux is managing (total_allocation - 1 for flux borker)
cores_per_node : int
The number of CPU cores that are available on each node
gpus_per_node : int
The number of GPUs that are available on each node
"""
[docs]
def __init__(
self, path: Path, nnodes: int, cores_per_node: int, gpus_per_node: int
) -> None:
self.path = path
self.nnodes = nnodes
self.cores_per_node = cores_per_node
self.gpus_per_node = gpus_per_node
[docs]
def update(
self,
pending: int,
running: int,
completed: int,
failed: int,
free_cores: int,
free_gpus: int,
) -> None:
data = {
"nodes": self.nnodes,
"cores_per_node": self.cores_per_node,
"gpus_per_node": self.gpus_per_node,
"pending": pending,
"running": running,
"completed": completed,
"failed": failed,
"free_cores": free_cores,
"free_gpus": free_gpus,
}
# atomically write the status file so that the dashboard never sees a
# half written file.
with tempfile.NamedTemporaryFile("w", dir=self.path.parent, delete=False) as tf:
tf.write(json.dumps(data))
temp_name = tf.name
os.replace(temp_name, self.path)
def _setup_status_writer(
path: Path, nnodes: int, cores_per_node: int, gpus_per_node: int
):
"""
Setup the status writer for the :obj:`FluxManager` to
Parameters
----------
path : Path
The path to the status file
nnodes : int
The number of nodes that are on the allocation minus one for the Flux
borker
cores_per_node : int
The number of CPU cores per node
gpus_per_node : int
The number of GPUs per nod
"""
return StatusWriter(
path=path,
nnodes=nnodes,
cores_per_node=cores_per_node,
gpus_per_node=gpus_per_node,
)
def _setup_logger(base_dir: Path) -> logging.Logger:
"""
setup the status writer for the :obj:`FluxManager`
"""
logger = logging.getLogger("matensemble")
logger.setLevel(logging.DEBUG)
logger.propagate = False
# Prevent duplicate handlers if setup is called twice
if logger.handlers:
logger.handlers.clear()
fmt = logging.Formatter(
fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log_file = base_dir / f"matensemble_workflow.log"
file_handler = logging.FileHandler(log_file, encoding="utf-8")
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(fmt)
logger.addHandler(file_handler)
if sys.stderr.isatty():
console_handler = logging.StreamHandler(stream=sys.stderr)
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(fmt)
logger.addHandler(console_handler)
hint = (
f"Logs: {base_dir}/matensemble_workflow.log\n"
f"Outputs: {base_dir}/out\n\n"
f"Watch logs: watch tail -n 5 {base_dir}/matensemble_workflow.log"
)
print(hint, file=sys.stderr)
logger.info(f"Workflow initialized at {base_dir}")
return logger