diff --git a/DEVNOTES.md b/DEVNOTES.md new file mode 100644 index 00000000..90a4fd23 --- /dev/null +++ b/DEVNOTES.md @@ -0,0 +1,61 @@ + + + +Problem with globs: +------------------- + +* prevalidate() calls validation of inputs and outputs. This is the only chance for skipped recipes to get aliases propagated down. I.e. if a sub-recipe has an output that is an alias of a sub-step, and the sub-recipe is skipped, this is the only chance to evaluate the glob (presumably, to existing outputs on disk). + +* validate_inputs() called before running a step. Currently this does not evaluate globs. + +* validate_outputs() called after running. Here we must re-expand the globs, since running the step may have changed the content. + +The current scheme where the glob is expanded and substituted as ``params[name] = [files]`` creates a problem. Expansion needs to happen at prevalidation. Then it needs to happen again at the output stage. So we can't replace the glob with a filelist. Somehow we must retain knowledge that this is a glob, otherwise we won't know to re-evaluate it. + +I tried creating a Glob class, but pydantic won't allow that, it expects a list of strings. So we need to retain this information externally (in Cargo, perhaps?) + +So: keep a copy of the original params dict, and re-evaluate all globs when asked to. + +Consider adding an explicit "glob:" prefix to glob values, so that we know not to re-evaluate explicitly specified files? + + + + +Problem with aliases: +--------------------- + +Let's clean up the logic. First, things to check during finalization: + +* an input alias may refer to multiple steps' inputs, but then their schema.dtype must be consistent + +* an output alias may refer to only one step's output, aliasing multiple outputs is nonsensical + +* an alias's schema is copied from the step, so implicit outputs are implicit for the recipe as well (but mark implicit=True, +since we don't want to copy the value literally until the step has validated it in its own {}-substitution context) + +During prevalidation (when parameter values are available), they must be propagated up or down + +* before prevalidating recipe parameters + + * go over all aliases -- if a recipe's parameter is not set: + + * check if a substep has a default or implicit -- propagate that value down to the recipe + + +* before prevalidating steps -- if a recipe has a parameter (or default) + + * propagate that value up to the step aliases, overriding their values + + * implicit output values can't be set, so this will be caught at this point + +* after prevalidating the steps -- if the recipe does not have a parameter value + + * check if a substep has a parameter value or default or implicit -- propagate that value down to the recipe + + * if multiple substeps are aliased, check that they're not set to conflicting values, throw an error if so. Otherwise propagate up. + + * make a list of parameters that have been propagated down + +After running the step + +* outputs may have changed compared to prevalidation. Propagate their value down to the recipe again (using the list compiled in prevalidation) diff --git a/setup.py b/setup.py index 05a920d1..5a5205c3 100644 --- a/setup.py +++ b/setup.py @@ -9,13 +9,19 @@ requirements = ["pyyaml", "nose>=1.3.7", "future-fstrings", - "scabha @ git+https://github.com/caracal-pipeline/scabha2", + "scabha >= 0.7.0", + ## OMS: not a fan of this: + # @ git+https://github.com/caracal-pipeline/scabha2", + ## ...because it interferes with running scabha2 off a dev branch (i.e. if you have a local dev install of scabha, + ## pip install stimela will blow it away and replace with master branch...) "ruamel.yaml", "munch", "omegaconf>=2.1pre1", "click", "pydantic", "pathos", + "psutil", + "rich" ], PACKAGE_NAME = "stimela" diff --git a/stimela/backends/native.py b/stimela/backends/native.py index 13735a4a..296ec96a 100644 --- a/stimela/backends/native.py +++ b/stimela/backends/native.py @@ -5,9 +5,9 @@ from collections import OrderedDict from contextlib import redirect_stderr, redirect_stdout -from scabha.cargo import Cab +from scabha.cargo import Cab, Parameter from stimela import logger -from stimela.utils.xrun_poll import xrun, dispatch_to_log +from stimela.utils.xrun_asyncio import xrun, dispatch_to_log from stimela.exceptions import StimelaCabRuntimeError import click @@ -30,7 +30,7 @@ def write(self, s): return len(s) -def run(cab: Cab, log, subst: Optional[Dict[str, Any]] = None, batch=None): +def run(cab: Cab, params: Dict[str, Any], log, subst: Optional[Dict[str, Any]] = None, batch=None): """Runs cab contents Args: @@ -45,13 +45,13 @@ def run(cab: Cab, log, subst: Optional[Dict[str, Any]] = None, batch=None): # commands of form "(module)function" are a Python call match = re.match("^\((.+)\)(.+)$", cab.command) if match: - return run_callable(match.group(1), match.group(2), cab, log, subst) + return run_callable(match.group(1), match.group(2), cab, params, log, subst) # everything else is a shell command else: - return run_command(cab, log, subst) + return run_command(cab, params, log, subst) -def run_callable(modulename: str, funcname: str, cab: Cab, log, subst: Optional[Dict[str, Any]] = None): +def run_callable(modulename: str, funcname: str, cab: Cab, params: Dict[str, Any], log, subst: Optional[Dict[str, Any]] = None): """Runs a cab corresponding to a Python callable. Intercepts stdout/stderr into the logger. Args: @@ -69,10 +69,14 @@ def run_callable(modulename: str, funcname: str, cab: Cab, log, subst: Optional[ """ # import module and get function object + path0 = sys.path.copy() + sys.path.append('.') try: mod = importlib.import_module(modulename) except ImportError as exc: raise StimelaCabRuntimeError(f"can't import {modulename}: {exc}", log=log) + finally: + sys.path = path0 func = getattr(mod, funcname, None) @@ -89,8 +93,8 @@ def run_callable(modulename: str, funcname: str, cab: Cab, log, subst: Optional[ args = OrderedDict() for key, schema in cab.inputs_outputs.items(): if not schema.policies.skip: - if key in cab.params: - args[key] = cab.params + if key in params: + args[key] = params[key] elif cab.get_schema_policy(schema, 'pass_missing_as_none'): args[key] = None @@ -114,7 +118,7 @@ def run_callable(modulename: str, funcname: str, cab: Cab, log, subst: Optional[ return retval -def run_command(cab: Cab, log, subst: Optional[Dict[str, Any]] = None, batch=None): +def run_command(cab: Cab, params: Dict[str, Any], log, subst: Optional[Dict[str, Any]] = None, batch=None): """Runns command represented by cab. Args: @@ -129,11 +133,11 @@ def run_command(cab: Cab, log, subst: Optional[Dict[str, Any]] = None, batch=Non int: return value (e.g. exit code) of command """ # build command line from parameters - args, venv = cab.build_command_line(subst) + args, venv = cab.build_command_line(params, subst) if batch: batch = SlurmBatch(**batch) - batch.__init_cab__(cab, subst, log) + batch.__init_cab__(cab, params, subst, log) runcmd = "/bin/bash -c" + " ".join(args) jobfile = "foo-bar.job" batch.name = "foo-bar" @@ -155,7 +159,7 @@ def run_command(cab: Cab, log, subst: Optional[Dict[str, Any]] = None, batch=Non retcode = xrun(args[0], args[1:], shell=False, log=log, output_wrangler=cab.apply_output_wranglers, - return_errcode=True, command_name=command_name) + return_errcode=True, command_name=command_name, progress_bar=True) # if retcode is not zero, raise error, unless cab declared itself a success (via the wrangler) if retcode: diff --git a/stimela/cargo/cab/wsclean.yaml b/stimela/cargo/cab/wsclean.yaml index b86aaab5..056d5e2e 100644 --- a/stimela/cargo/cab/wsclean.yaml +++ b/stimela/cargo/cab/wsclean.yaml @@ -21,21 +21,26 @@ outputs: dirty: info: Dirty images dtype: List[File] - implicit: "{current.prefix}-[0-9][0-9][0-9][0-9]-dirty.fits" + implicit: "=GLOB({current.prefix}-[0-9][0-9][0-9][0-9]-dirty.fits)" must_exist: false restored: info: Restored images dtype: List[File] - implicit: "{current.prefix}-[0-9][0-9][0-9][0-9]-image.fits" + implicit: "=GLOB({current.prefix}-[0-9][0-9][0-9][0-9]-image.fits)" + must_exist: false + restored_timeint: + info: Restored images per time interval + dtype: List[File] + implicit: "=GLOB({current.prefix}-t[0-9][0-9][0-9][0-9]-image.fits)" must_exist: false residual: dtype: List[File] - implicit: "{current.prefix}-[0-9][0-9][0-9][0-9]-residual.fits" + implicit: "=GLOB({current.prefix}-[0-9][0-9][0-9][0-9]-residual.fits)" must_exist: false model: info: Model images dtype: List[File] - implicit: "{current.prefix}-[0-9][0-9][0-9][0-9]-model.fits" + implicit: "=GLOB({current.prefix}-[0-9][0-9][0-9][0-9]-model.fits)" must_exist: false restored_mfs: info: Restored MFS image diff --git a/stimela/cargo/cab/wsclean_pol.yaml b/stimela/cargo/cab/wsclean_pol.yaml index a7b48d88..c74b01d6 100644 --- a/stimela/cargo/cab/wsclean_pol.yaml +++ b/stimela/cargo/cab/wsclean_pol.yaml @@ -27,44 +27,44 @@ outputs: dirty: info: Dirty images dtype: List[File] - implicit: "{current.prefix}-[0-9][0-9][0-9][0-9]-[IQUV]-dirty.fits" + implicit: "=GLOB({current.prefix}-[0-9][0-9][0-9][0-9]-[IQUV]-dirty.fits)" must_exist: false restored: info: Restored images dtype: List[File] - implicit: "{current.prefix}-[0-9][0-9][0-9][0-9]-[IQUV]-image.fits" + implicit: "=GLOB({current.prefix}-[0-9][0-9][0-9][0-9]-[IQUV]-image.fits)" must_exist: false residual: dtype: List[File] - implicit: "{current.prefix}-[0-9][0-9][0-9][0-9]-[IQUV]-residual.fits" + implicit: "=GLOB({current.prefix}-[0-9][0-9][0-9][0-9]-[IQUV]-residual.fits)" must_exist: false model: info: Model images dtype: List[File] - implicit: "{current.prefix}-[0-9][0-9][0-9][0-9]-[IQUV]-model.fits" + implicit: "=GLOB({current.prefix}-[0-9][0-9][0-9][0-9]-[IQUV]-model.fits)" must_exist: false restored_mfs_i: info: Restored MFS images dtype: File - implicit: "{current.prefix}-MFS-I-image.fits" + implicit: "=GLOB({current.prefix}-MFS-I-image.fits)" must_exist: false restored_mfs: info: Restored MFS images dtype: List[File] - implicit: "{current.prefix}-MFS-[IQUV]-image.fits" + implicit: "=GLOB({current.prefix}-MFS-[IQUV]-image.fits)" must_exist: false residual_mfs: info: Residual MFS images dtype: List[File] - implicit: "{current.prefix}-MFS-[IQUV]-residual.fits" + implicit: "=GLOB({current.prefix}-MFS-[IQUV]-residual.fits)" must_exist: false model_mfs: info: Model MFS images dtype: List[File] - implicit: "{current.prefix}-MFS-[IQUV]-model.fits" + implicit: "=GLOB({current.prefix}-MFS-[IQUV]-model.fits)" must_exist: false dirty_mfs: info: Dirty MFS images dtype: List[File] - implicit: "{current.prefix}-MFS-[IQUV]-dirty.fits" + implicit: "=GLOB({current.prefix}-MFS-[IQUV]-dirty.fits)" must_exist: false diff --git a/stimela/commands/help.py b/stimela/commands/help.py new file mode 100644 index 00000000..bc3344fa --- /dev/null +++ b/stimela/commands/help.py @@ -0,0 +1,120 @@ +import fnmatch +import os, sys +import click +from omegaconf import OmegaConf + +import stimela +from scabha import configuratt +from stimela import logger +from stimela.main import cli +from scabha.cargo import Cab, ParameterCategory +from stimela.kitchen.recipe import Recipe, Step, join_quote +from stimela.config import ConfigExceptionTypes +from typing import * +from rich.tree import Tree +from rich.table import Table +from rich import box +from rich import print as rich_print + +@cli.command("help", + help=""" + Print help on a cab or a recipe. + """) +@click.option("do_list", "-l", "--list", is_flag=True, + help="""Lists the available cabs and recipes, including custom-defined ones.""") +@click.option("-I", "--implicit", is_flag=True, + help="""Increases level of detail to include implicit inputs/outputs.""") +@click.option("-O", "--obscure", is_flag=True, + help="""Increases level of detail to include implicit and obscure inputs/outputs.""") +@click.option("-A", "--all", is_flag=True, + help="""Increases level of detail to include all inputs/outputs.""") +@click.option("-R", "--required", is_flag=True, + help="""Decreases level of detail to include required inputs/outputs only.""") +@click.argument("items", nargs=-1, metavar="filename.yml|cab name|recipe name|...") +def help(items: List[str] = [], do_list=False, implicit=False, obscure=False, all=False, required=False): + + log = logger() + top_tree = Tree(f"stimela help {' '.join(items)}", guide_style="dim") + found_something = False + + if required: + max_category = ParameterCategory.Required + else: + max_category = ParameterCategory.Optional + if all: + max_category = ParameterCategory.Hidden + elif obscure: + max_category = ParameterCategory.Obscure + elif implicit: + max_category = ParameterCategory.Implicit + + + for item in items: + # a filename -- treat it as a config + if os.path.isfile(item): + log.info(f"loading recipe/config {item}") + + # if file contains a recipe entry, treat it as a full config (that can include cabs etc.) + try: + conf = configuratt.load(item, use_sources=[stimela.CONFIG]) + except ConfigExceptionTypes as exc: + log.error(f"error loading {item}: {exc}") + sys.exit(2) + + # anything that is not a standard config section will be treated as a recipe + recipes = [name for name in conf if name not in stimela.CONFIG] + + for name in recipes: + # cast section to Recipe and remove from loaded conf + recipe = OmegaConf.create(Recipe) + recipe = OmegaConf.unsafe_merge(recipe, conf[name]) + del conf[name] + # add to global namespace + stimela.CONFIG.lib.recipes[name] = recipe + + # the rest is safe to merge into config as is + stimela.CONFIG = OmegaConf.merge(stimela.CONFIG, conf) + + # else treat as a wildcard for recipe names or cab names + else: + recipe_names = fnmatch.filter(stimela.CONFIG.lib.recipes.keys(), item) + cab_names = fnmatch.filter(stimela.CONFIG.cabs.keys(), item) + if not recipe_names and not cab_names: + log.error(f"'{item}' does not match any files, recipes or cab names. Try -l/--list") + sys.exit(2) + + for name in recipe_names: + recipe = Recipe(**stimela.CONFIG.lib.recipes[name]) + recipe.finalize(fqname=name) + tree = top_tree.add(f"Recipe: [bold]{name}[/bold]") + recipe.rich_help(tree, max_category=max_category) + + for name in cab_names: + cab = Cab(**stimela.CONFIG.cabs[name]) + cab.finalize() + tree = top_tree.add(f"Cab: [bold]{name}[/bold]") + cab.rich_help(tree, max_category=max_category) + + found_something = True + + if do_list or not found_something: + if stimela.CONFIG.lib.recipes: + subtree = top_tree.add("Recipes:") + table = Table.grid("", "", padding=(0,2)) + for name, recipe in stimela.CONFIG.lib.recipes.items(): + table.add_row(f"[bold]{name}[/bold]", recipe.info) + subtree.add(table) + elif not do_list and not found_something: + log.error(f"nothing particular to help on, perhaps specify a recipe name or a cab name, or use -l/--list") + sys.exit(2) + + if do_list: + subtree = top_tree.add("Cabs:") + table = Table.grid("", "", padding=(0,2)) + for name, cab in stimela.CONFIG.cabs.items(): + table.add_row(f"[bold]{name}[/bold]", cab.info) + subtree.add(table) + + rich_print(top_tree) + + diff --git a/stimela/commands/run.py b/stimela/commands/run.py index 3539ac2d..d5192208 100644 --- a/stimela/commands/run.py +++ b/stimela/commands/run.py @@ -1,22 +1,26 @@ -from collections import OrderedDict import dataclasses import itertools - -from yaml.error import YAMLError -from scabha import configuratt -from scabha.exceptions import ScabhaBaseException -from omegaconf.omegaconf import OmegaConf, OmegaConfBaseException -from stimela.config import ConfigExceptionTypes import click import logging -import os.path, yaml, sys +import os.path +import yaml +import sys + +from datetime import datetime from typing import List, Optional +from collections import OrderedDict +from omegaconf.omegaconf import OmegaConf, OmegaConfBaseException + import stimela +from scabha import configuratt +from scabha.exceptions import ScabhaBaseException +from stimela.config import ConfigExceptionTypes from stimela import logger from stimela.main import cli from stimela.kitchen.recipe import Recipe, Step, join_quote from stimela.config import get_config_class + @cli.command("run", help=""" Execute a single cab, or a recipe from a YML file. @@ -41,7 +45,7 @@ help="""Doesn't actually run anything, only prints the selected steps.""") @click.argument("what", metavar="filename.yml|CAB") @click.argument("parameters", nargs=-1, metavar="[recipe name] [PARAM=VALUE] [X.Y.Z=FOO] ...", required=False) -def run(what: str, parameters: List[str] = [], dry_run: bool = False, +def run(what: str, parameters: List[str] = [], dry_run: bool = False, help: bool = False, step_names: List[str] = [], tags: List[str] = [], skip_tags: List[str] = [], enable_steps: List[str] = []): log = logger() @@ -133,6 +137,7 @@ def run(what: str, parameters: List[str] = [], dry_run: bool = False, sys.exit(2) else: if len(all_recipe_names) > 1: + print(f"This file contains the following recipes: {', '.join(all_recipe_names)}") log.error(f"multiple recipes found, please specify one on the command line") sys.exit(2) recipe_name = all_recipe_names[0] @@ -180,7 +185,7 @@ def run(what: str, parameters: List[str] = [], dry_run: bool = False, log.info("pre-validating the recipe") outer_step = Step(recipe=recipe, name=f"{recipe_name}", info=what, params=params) try: - outer_step.prevalidate() + params = outer_step.prevalidate() except ScabhaBaseException as exc: if not exc.logged: log.error(f"pre-validation failed: {exc}") @@ -262,37 +267,41 @@ def run(what: str, parameters: List[str] = [], dry_run: bool = False, # apply restrictions, if any recipe.restrict_steps(tagged_steps, force_enable=False) - steps = [name for name, step in recipe.steps.items() if not step.skip] + steps = [name for name, step in recipe.steps.items() if not step._skip] log.info(f"will run the following recipe steps:") log.info(f" {' '.join(steps)}", extra=dict(color="GREEN")) # warn user if som steps remain explicitly disabled - if any(recipe.steps[name].skip for name in tagged_steps): + if any(recipe.steps[name]._skip for name in tagged_steps): log.warning("note that some steps remain explicitly skipped") # in debug mode, pretty-print the recipe if log.isEnabledFor(logging.DEBUG): log.debug("---------- prevalidated step follows ----------") - for line in outer_step.summary(): + for line in outer_step.summary(params=params): log.debug(line) if dry_run: log.info("dry run was requested, exiting") sys.exit(0) - # run step + start_time = datetime.now() + def elapsed(): + return str(datetime.now() - start_time).split('.', 1)[0] + try: outputs = outer_step.run() except ScabhaBaseException as exc: if not exc.logged: - outer_step.log.error(f"run failed with exception: {exc}") + outer_step.log.error(f"run failed after {elapsed()} with exception: {exc}") sys.exit(1) if outputs and step.log.isEnabledFor(logging.DEBUG): - outer_step.log.debug("run successful, outputs follow:") + outer_step.log.debug(f"run successful after {elapsed()}, outputs follow:") for name, value in outputs.items(): - outer_step.log.debug(f" {name}: {value}") + if name in recipe.outputs: + outer_step.log.debug(f" {name}: {value}") else: - outer_step.log.info("run successful") + outer_step.log.info(f"run successful after {elapsed()}") return 0 \ No newline at end of file diff --git a/stimela/config.py b/stimela/config.py index 113f0d75..38a577b1 100644 --- a/stimela/config.py +++ b/stimela/config.py @@ -1,5 +1,5 @@ import glob -import os, os.path, time, re, logging +import os, os.path, time, re, logging, platform from typing import Any, List, Dict, Optional, Union from enum import Enum from dataclasses import dataclass, field @@ -77,11 +77,6 @@ class StimelaOptions(object): ## For distributed computes and cpu allocation dist: Dict[str, Any] = EmptyDictDefault() -@dataclass -class StimelaLibrary(object): - params: Dict[str, Any] = EmptyDictDefault() - recipes: Dict[str, Any] = EmptyDictDefault() - steps: Dict[str, Any] = EmptyDictDefault() def DefaultDirs(): return field(default_factory=lambda:dict(indir='.', outdir='.')) @@ -106,7 +101,6 @@ def DefaultDirs(): # set to the config file that was actually found CONFIG_LOADED = None - def merge_extra_config(conf, newconf): from stimela import logger @@ -130,7 +124,14 @@ def load_config(extra_configs=List[str]): stimela_dir = os.path.dirname(stimela.__file__) from stimela.kitchen.recipe import Recipe, Cab - global StimelaConfig + global StimelaConfig, StimelaLibrary + @dataclass + class StimelaLibrary(object): + params: Dict[str, Parameter] = EmptyDictDefault() + recipes: Dict[str, Recipe] = EmptyDictDefault() + steps: Dict[str, Any] = EmptyDictDefault() + misc: Dict[str, Any] = EmptyDictDefault() + @dataclass class StimelaConfig: base: Dict[str, StimelaImage] = EmptyDictDefault() @@ -206,7 +207,7 @@ def _load(conf, config_file): # add runtime info _ds = time.strftime("%Y%m%d") _ts = time.strftime("%H%M%S") - runtime = dict(date=_ds, time=_ts, datetime=f"{_ds}-{_ts}") + runtime = dict(date=_ds, time=_ts, datetime=f"{_ds}-{_ts}", node=platform.node()) conf.run = OmegaConf.create(runtime) diff --git a/stimela/kitchen/recipe.py b/stimela/kitchen/recipe.py index 2da62e9b..35501101 100644 --- a/stimela/kitchen/recipe.py +++ b/stimela/kitchen/recipe.py @@ -1,39 +1,35 @@ -import glob, time from multiprocessing import cpu_count -import os, os.path, re, logging +import os, os.path, re, logging, fnmatch, copy, time from typing import Any, Tuple, List, Dict, Optional, Union -from enum import Enum from dataclasses import dataclass - from omegaconf import MISSING, OmegaConf, DictConfig, ListConfig from collections import OrderedDict from collections.abc import Mapping +import rich.table + from scabha import cargo from pathos.pools import ProcessPool from pathos.serial import SerialPool from multiprocessing import cpu_count - -from scabha.exceptions import SubstitutionError, SubstitutionErrorList from stimela.config import EmptyDictDefault, EmptyListDefault, StimelaLogConfig import stimela from stimela import logger, stimelogging - from stimela.exceptions import * - from scabha import validate -from scabha.validate import Unresolved, join_quote +import scabha.exceptions +from scabha.exceptions import SubstitutionError, SubstitutionErrorList +from scabha.validate import evaluate_and_substitute, Unresolved, join_quote from scabha.substitutions import SubstitutionNS, substitutions_from +from scabha.cargo import Parameter, Cargo, Cab, Batch, ParameterCategory +from scabha.types import File, Directory, MS from . import runners Conditional = Optional[str] -from scabha.cargo import Cargo, Cab, Batch -from scabha.types import File, Directory, MS - - - - +class DeferredAlias(Unresolved): + """Class used as placeholder for deferred alias lookup (i.e. before an aliased value is available)""" + pass @dataclass class Step: @@ -42,7 +38,7 @@ class Step: recipe: Optional["Recipe"] = None # if not None, this step is a nested recipe params: Dict[str, Any] = EmptyDictDefault() # assigns parameter values info: Optional[str] = None # comment or info - skip: bool = False # if true, step is skipped unless explicitly enabled + skip: Optional[str] = None # if this evaluates to True, step is skipped tags: List[str] = EmptyListDefault() backend: Optional[str] = None # backend setting, overrides opts.config.backend if set @@ -54,42 +50,50 @@ class Step: assign_based_on: Dict[str, Any] = EmptyDictDefault() # assigns variables when step is executed based on value of another variable - _skip: Conditional = None # skip this step if conditional evaluates to true - _break_on: Conditional = None # break out (of parent recipe) if conditional evaluates to true + # _skip: Conditional = None # skip this step if conditional evaluates to true + # _break_on: Conditional = None # break out (of parent recipe) if conditional evaluates to true def __post_init__(self): self.fqname = self.fqname or self.name if bool(self.cab) == bool(self.recipe): raise StepValidationError("step must specify either a cab or a nested recipe, but not both") self.cargo = self.config = None - self._prevalidated = None self.tags = set(self.tags) # convert params into stadard dict, else lousy stuff happens when we insert non-standard objects if isinstance(self.params, DictConfig): self.params = OmegaConf.to_container(self.params) - - def summary(self, recursive=True): - return self.cargo and self.cargo.summary(recursive=recursive) + # after (pre)validation, this contains parameter values + self.validated_params = None + # the "skip" attribute is reevaluated at runtime since it may contain substitutions, but if it's set to a bool + # constant, self._skip will be preset already + if self.skip in {"True", "true", "1"}: + self._skip = True + elif self.skip in {"False", "false", "0", "", None}: + self._skip = False + else: + # otherwise, self._skip stays at None, and will be re-evaluated at runtime + self._skip = None + + def summary(self, params=None, recursive=True, ignore_missing=False): + return self.cargo and self.cargo.summary(recursive=recursive, + params=params or self.validated_params or self.params, ignore_missing=ignore_missing) @property def finalized(self): return self.cargo is not None - @property - def prevalidated(self): - return self._prevalidated - @property def missing_params(self): - return self.cargo.missing_params + return OrderedDict([(name, schema) for name, schema in self.cargo.inputs_outputs.items() + if schema.required and name not in self.validated_params]) @property def invalid_params(self): - return self.cargo.invalid_params + return [name for name, value in self.validated_params.items() if isinstance(value, scabha.exceptions.Error)] @property def unresolved_params(self): - return self.cargo.unresolved_params + return [name for name, value in self.validated_params.items() if isinstance(value, Unresolved)] @property def inputs(self): @@ -120,10 +124,6 @@ def nesting(self): def update_parameter(self, name, value): self.params[name] = value - # only pass value up to cargo if has already been validated. This avoids redefinition errors from nested aliases. - # otherwise, just keep the value in our dict (cargo will get it upon validation) - if self.cargo is not None and self.prevalidated: - self.cargo.update_parameter(name, value) def finalize(self, config=None, log=None, logopts=None, fqname=None, nesting=0): if not self.finalized: @@ -152,8 +152,6 @@ def finalize(self, config=None, log=None, logopts=None, fqname=None, nesting=0): # init and/or update logger options logopts = (logopts if logopts is not None else self.config.opts.log).copy() - if 'log' in self.assign: - logopts.update(**self.assign.log) # update file logging if not recipe (a recipe will do it in its finalize() anyway, with its own substitions) if not self.recipe: @@ -163,133 +161,162 @@ def finalize(self, config=None, log=None, logopts=None, fqname=None, nesting=0): # finalize the cargo self.cargo.finalize(config, log=log, logopts=logopts, fqname=self.fqname, nesting=nesting) + # build dictionary of defaults from cargo + self.defaults = {name: schema.default for name, schema in self.cargo.inputs_outputs.items() + if schema.default is not None and not isinstance(schema.default, Unresolved) } + self.defaults.update(**self.cargo.defaults) + + # set missing parameters from defaults + for name, value in self.defaults.items(): + if name not in self.params: + self.params[name] = value + # set backend self.backend = self.backend or self.config.opts.backend def prevalidate(self, subst: Optional[SubstitutionNS]=None): - if not self.prevalidated: - self.finalize() - # validate cab or recipe - self.cargo.prevalidate(self.params, subst) - self.log.debug(f"{self.cargo.name}: {len(self.missing_params)} missing, " - f"{len(self.invalid_params)} invalid and " - f"{len(self.unresolved_params)} unresolved parameters") - if self.invalid_params: - raise StepValidationError(f"{self.cargo.name} has the following invalid parameters: {join_quote(self.invalid_params)}") - self._prevalidated = True - - def log_summary(self, level, title, color=None): + self.finalize() + # validate cab or recipe + params = self.validated_params = self.cargo.prevalidate(self.params, subst) + self.log.debug(f"{self.cargo.name}: {len(self.missing_params)} missing, " + f"{len(self.invalid_params)} invalid and " + f"{len(self.unresolved_params)} unresolved parameters") + if self.invalid_params: + raise StepValidationError(f"{self.cargo.name} has the following invalid parameters: {join_quote(self.invalid_params)}") + return params + + def log_summary(self, level, title, color=None, ignore_missing=True): extra = dict(color=color, boldface=True) if self.log.isEnabledFor(level): self.log.log(level, f"### {title}", extra=extra) del extra['boldface'] - for line in self.summary(recursive=False): + for line in self.summary(recursive=False, ignore_missing=ignore_missing): self.log.log(level, line, extra=extra) - def run(self, params=None, subst=None, batch=None): + def run(self, subst=None, batch=None): """Runs the step""" - self.prevalidate() - if params is None: - params = self.params - - skip_warned = False # becomes True when warnings are given - - self.log.debug(f"validating inputs {subst and list(subst.keys())}") - validated = None - try: - params = self.cargo.validate_inputs(params, loosely=self.skip, subst=subst) - validated = True - - except ScabhaBaseException as exc: - level = logging.WARNING if self.skip else logging.ERROR - if not exc.logged: - if type(exc) is SubstitutionErrorList: - self.log.log(level, f"unresolved {{}}-substitution(s):") - for err in exc.errors: - self.log.log(level, f" {err}") - else: - self.log.log(level, f"error validating inputs: {exc}") - exc.logged = True - self.log_summary(level, "summary of inputs follows", color="WARNING") - # raise up, unless step is being skipped - if self.skip: - self.log.warning("since the step is being skipped, this is not fatal") - skip_warned = True - else: - raise + if self.validated_params is None: + self.prevalidate(self.params) + + with stimelogging.declare_subtask(self.name) as subtask: + # evaluate the skip attribute (it can be a formula and/or a {}-substititon) + skip = self._skip + if self._skip is None and subst is not None: + skips = dict(skip=self.skip) + skips = evaluate_and_substitute(skips, subst, subst.current, location=[self.fqname], ignore_subst_errors=False) + skip = skips["skip"] + + # Since prevalidation will have populated default values for potentially missing parameters, use those values + # For parameters that aren't missing, use whatever value that was suplied + params = self.validated_params.copy() + params.update(**self.params) + + skip_warned = False # becomes True when warnings are given + + self.log.debug(f"validating inputs {subst and list(subst.keys())}") + validated = None + try: + params = self.cargo.validate_inputs(params, loosely=skip, subst=subst) + validated = True - # log inputs - if validated and not self.skip: - self.log_summary(logging.INFO, "validated inputs", color="GREEN") - if subst is not None: - subst.current = params - - # bomb out if some inputs failed to validate or substitutions resolve - if self.cargo.invalid_params or self.cargo.unresolved_params: - invalid = self.cargo.invalid_params + self.cargo.unresolved_params - if self.skip: - self.log.warning(f"invalid inputs: {join_quote(invalid)}") - if not skip_warned: - self.log.warning("since the step was skipped, this is not fatal") + except ScabhaBaseException as exc: + level = logging.WARNING if skip else logging.ERROR + if not exc.logged: + if type(exc) is SubstitutionErrorList: + self.log.log(level, f"unresolved {{}}-substitution(s):") + for err in exc.errors: + self.log.log(level, f" {err}") + else: + self.log.log(level, f"error validating inputs: {exc}") + exc.logged = True + self.log_summary(level, "summary of inputs follows", color="WARNING") + # raise up, unless step is being skipped + if self.skip: + self.log.warning("since the step is being skipped, this is not fatal") skip_warned = True + else: + raise + + self.validated_params.update(**params) + + # log inputs + if validated and not skip: + self.log_summary(logging.INFO, "validated inputs", color="GREEN", ignore_missing=True) + if subst is not None: + subst.current = params + + # bomb out if some inputs failed to validate or substitutions resolve + if self.invalid_params or self.unresolved_params: + invalid = self.invalid_params + self.unresolved_params + if self.skip: + self.log.warning(f"invalid inputs: {join_quote(invalid)}") + if not skip_warned: + self.log.warning("since the step was skipped, this is not fatal") + skip_warned = True + else: + raise StepValidationError(f"invalid inputs: {join_quote(invalid)}", log=self.log) + + if not skip: + try: + if type(self.cargo) is Recipe: + self.cargo.backend = self.cargo.backend or self.backend + self.cargo._run(params) + elif type(self.cargo) is Cab: + self.cargo.backend = self.cargo.backend or self.backend + runners.run_cab(self.cargo, params, log=self.log, subst=subst, batch=batch) + else: + raise RuntimeError("Unknown cargo type") + except ScabhaBaseException as exc: + if not exc.logged: + self.log.error(f"error running step: {exc}") + exc.logged = True + raise else: - raise StepValidationError(f"invalid inputs: {join_quote(invalid)}", log=self.log) + if self._skip is None and subst is not None: + self.log.info(f"skipping step based on setting of '{self.skip}'") + else: + self.log.info("skipping step based on explicit setting") + + self.log.debug(f"validating outputs") + validated = False - if not self.skip: try: - if type(self.cargo) is Recipe: - self.cargo.backend = self.cargo.backend or self.backend - self.cargo._run() - elif type(self.cargo) is Cab: - self.cargo.backend = self.cargo.backend or self.backend - runners.run_cab(self.cargo, log=self.log, subst=subst, batch=batch) - else: - raise RuntimeError("Unknown cargo type") + params = self.cargo.validate_outputs(params, loosely=skip, subst=subst) + validated = True except ScabhaBaseException as exc: + level = logging.WARNING if self.skip else logging.ERROR if not exc.logged: - self.log.error(f"error running step: {exc}") + if type(exc) is SubstitutionErrorList: + self.log.log(level, f"unresolved {{}}-substitution(s):") + for err in exc.errors: + self.log.log(level, f" {err}") + else: + self.log.log(level, f"error validating outputs: {exc}") exc.logged = True - raise - - self.log.debug(f"validating outputs") - validated = False - # insert output values into params for re-substitution and re-validation - output_params = {name: value for name, value in params.items() if name in self.cargo.outputs} - - try: - params = self.cargo.validate_outputs(output_params, loosely=self.skip, subst=subst) - validated = True - except ScabhaBaseException as exc: - level = logging.WARNING if self.skip else logging.ERROR - if not exc.logged: - if type(exc) is SubstitutionErrorList: - self.log.log(level, f"unresolved {{}}-substitution(s):") - for err in exc.errors: - self.log.log(level, f" {err}") + # raise up, unless step is being skipped + if skip: + self.log.warning("since the step was skipped, this is not fatal") else: - self.log.log(level, f"error validating outputs: {exc}") - exc.logged = True - # raise up, unless step is being skipped - if self.skip: - self.log.warning("since the step was skipped, this is not fatal") - else: - self.log_summary(level, "failed outputs", color="WARNING") - raise - - if validated: - self.log_summary(logging.DEBUG, "validated outputs") - - # bomb out if an output was invalid - invalid = [name for name in self.cargo.invalid_params + self.cargo.unresolved_params if name in self.cargo.outputs] - if invalid: - if self.skip: - self.log.warning(f"invalid outputs: {join_quote(invalid)}") - self.log.warning("since the step was skipped, this is not fatal") - else: - raise StepValidationError(f"invalid outputs: {join_quote(invalid)}", log=self.log) + self.log_summary(level, "failed outputs", color="WARNING") + raise + + if validated: + self.validated_params.update(**params) + if subst is not None: + subst.current._merge_(params) + self.log_summary(logging.DEBUG, "validated outputs", ignore_missing=True) + + # bomb out if an output was invalid + invalid = [name for name in self.invalid_params + self.unresolved_params if name in self.cargo.outputs] + if invalid: + if skip: + self.log.warning(f"invalid outputs: {join_quote(invalid)}") + self.log.warning("since the step was skipped, this is not fatal") + else: + raise StepValidationError(f"invalid outputs: {join_quote(invalid)}", log=self.log) - return {name: value for name, value in self.cargo.params.items()} + return params @dataclass class ForLoopClause(object): @@ -309,7 +336,6 @@ class Recipe(Cargo): Additional attributes available after validation with arguments are as per for a Cab: self.input_output: combined parameter dict (self.input + self.output), maps name to Parameter - self.missing_params: dict (name to Parameter) of required parameters that have not been specified Raises: various classes of validation errors @@ -372,10 +398,11 @@ def __post_init__ (self): for io, io_label in [(self.inputs, "inputs"), (self.outputs, "outputs")]: if self.for_loop.var in io: raise RecipeValidationError(f"'for_loop.var={self.for_loop.var}' clashes with recipe {io_label}") - # map of aliases - self._alias_map = None + # marked when finalized + self._alias_map = None # set of keys protected from assignment self._protected_from_assign = set() + self._for_loop_values = None def protect_from_assignments(self, keys): self._protected_from_assign.update(keys) @@ -398,48 +425,109 @@ def validate_assignments(self, assign, assign_based_on, location): # if key in io: # raise RecipeValidationError(f"'{location}.{assign_label}.{key}' clashes with an {io_label}") - def update_assignments(self, assign, assign_based_on, params=None, location=""): - if params is None: - params = self.params - for basevar, value_list in assign_based_on.items(): + def update_assignments(self, subst: SubstitutionNS, whose = None, params: Dict[str, Any] = {}, ignore_subst_errors: bool = False): + """Updates variable assignments, using the recipe's (or a step's) 'assign' and 'assign_based_on' sections. + Also updates the corresponding (recipe or step's) file logger. + + Args: + subst (SubstitutionNS): substitution namespace + whose (Step or None): if None, use recipe's (self) assignments, else use this step's + params (dict, optional): dictionary of parameters + + Raises: + AssignmentError: on errors + """ + whose = whose or self + # short-circuit out if nothong to do + if not whose.assign and not whose.assign_based_on: + return + + def resolve_config_variable(key, base=self.config): + """helper function to look up a key like a.b.c in a nested dict-like structure""" + path = key.split('.') + varname = path[-1] + section = base + for element in path[:-1]: + if element in section: + section = section[element] + else: + raise AssignmentError(f"{whose.fqname}.assign_based_on.{basevar}: '{element}' in '{key}' is not a valid config section") + return section, varname + + # split into config and non-config assignments + config_assign = OrderedDict((key, value) for key, value in whose.assign.items() if '.' in key) + assign = OrderedDict((key, value) for key, value in whose.assign.items() if '.' not in key) + updated = False + + # merge non-config assignments into ns.recipe, resolve and substitute as appropriate + if assign: + subst.recipe._merge_(assign) + evaluate_and_substitute(assign, subst, subst.recipe, location=[whose.fqname], ignore_subst_errors=ignore_subst_errors) + + # now process assign_based_on entries + for basevar, value_list in whose.assign_based_on.items(): # make sure the base variable is defined - if basevar in assign: - value = assign[basevar] - elif basevar in params: - value = params[basevar] + # it will be in subst.recipe if it was assigned, or is an input + if basevar in subst.recipe: + value = str(subst.recipe[basevar]) elif basevar in self.inputs_outputs and self.inputs_outputs[basevar].default is not None: - value = self.inputs_outputs[basevar].default + value = str(self.inputs_outputs[basevar].default) + elif '.' in basevar: + section, varname = resolve_config_variable(basevar) + if varname not in section: + raise AssignmentError(f"{whose.fqname}.assign_based_on.{basevar}: is not a valid config entry") + value = str(section[varname]) else: - raise AssignmentError(f"{location}.assign_based_on.{basevar} is an unset variable or parameter") + raise AssignmentError(f"{whose.fqname}.assign_based_on.{basevar} is not a known input or variable") # look up list of assignments assignments = value_list.get(value, value_list.get('DEFAULT')) if assignments is None: - raise AssignmentError(f"{location}.assign_based_on.{basevar}: unknown value '{value}', and no default defined") - # update assignments + raise AssignmentError(f"{whose.fqname}.assign_based_on.{basevar}: unknown value '{value}', and no default defined") + updated = True + # update assignments (also inside ns.recipe) for key, value in assignments.items(): if key in self._protected_from_assign: self.log.debug(f"skipping protected assignment {key}={value}") + elif key in self.inputs_outputs: + self.log.debug(f"params assignment: {key}={value}") + params[key] = value + subst.recipe[key] = value + elif '.' not in key: + self.log.debug(f"variable assignment: {key}={value}") + assign[key] = value + subst.recipe[key] = value + self.log.debug(f"variable assignment: {key}={value}") else: - # vars with dots are config settings - if '.' in key: - self.log.debug(f"config assignment: {key}={value}") - path = key.split('.') - varname = path[-1] - section = self.config - for element in path[:-1]: - if element in section: - section = section[element] - else: - raise AssignmentError("{location}.assign_based_on.{basevar}: '{element}' in '{key}' is not a valid config section") - section[varname] = value - # vars without dots are local variables or parameters - else: - if key in self.inputs_outputs: - self.log.debug(f"params assignment: {key}={value}") - params[key] = value - else: - self.log.debug(f"variable assignment: {key}={value}") - self.assign[key] = value + config_assign[key] = value + + # if anything changed, merge non-config assignments into ns.recipe, resolve and substitute as appropriate + if assign and updated: + subst.recipe._merge_(assign) + evaluate_and_substitute(assign, subst, subst.recipe, location=[whose.fqname], ignore_subst_errors=ignore_subst_errors) + + # propagate config assignments + logopts_changed = False + for key, value in config_assign.items(): + # log.opt are log options + if key.startswith("log."): + self.log.debug(f"log options assignment: {key}={value}") + whose.logopts[key.split('.', 1)[1]] = value + logopts_changed = True + # vars with dots are config settings + elif '.' in key: + self.log.debug(f"config assignment: {key}={value}") + # change system config + section, varname = resolve_config_variable(key) + section[varname] = value + # do the same for the subst dict. Note that if the above succeeded, then key + # is a valid config entry, so it will also be look-uppable in subst + section, varname = resolve_config_variable(key, base=subst.config) + section[varname] = value + + # propagate log options + if logopts_changed: + stimelogging.update_file_logger(whose.log, whose.logopts, nesting=whose.nesting, subst=subst, location=[whose.fqname]) + @property def finalized(self): @@ -450,11 +538,15 @@ def enable_step(self, label, enable=True): step = self.steps.get(label) if step is None: raise RecipeValidationError(f"unknown step {label}", log=self.log) - if step.skip and enable: - self.log.warning(f"enabling step '{label}' which was previously marked as skipped") - elif not step.skip and not enable: + if enable: + if step._skip is True: + self.log.warning(f"enabling step '{label}' which was previously marked as skipped") + elif step._skip is not False: + self.log.warning(f"enabling step '{label}' which was previously marked as potentially skipped ('{self.skip}')") + step.skip = self._skip = False + else: self.log.warning(f"will skip step '{label}'") - step.skip = not enable + step.skip = self._skip = True def restrict_steps(self, steps: List[str], force_enable=True): self.finalize() @@ -467,9 +559,9 @@ def restrict_steps(self, steps: List[str], force_enable=True): # apply skip flags for label, step in self.steps.items(): if label not in restrict_steps: - step.skip = True + step.skip = step._skip = True elif force_enable: - step.skip = False + step.skip = step._skip = False def add_step(self, step: Step, label: str = None): """Adds a step to the recipe. Label is auto-generated if not supplied @@ -499,62 +591,103 @@ def add(self, cabname: str, label: str = None, """ return self.add_step(Step(cab=cabname, params=params, info=info), label=label) - - def _add_alias(self, alias_name: str, alias_target: Union[str, Tuple]): + @dataclass + class AliasInfo(object): + label: str # step label + step: Step # step + param: str # parameter name + io: Dict[str, Parameter] # points to self.inputs or self.outputs + from_recipe: bool = False # if True, value propagates from recipe up to step + from_step: bool = False # if True, value propagates from step down to recipe + + def _add_alias(self, alias_name: str, alias_target: Union[str, Tuple], category: Optional[int] = None): + wildcards = False if type(alias_target) is str: - step_label, step_param_name = alias_target.split('.', 1) - step = self.steps.get(step_label) - else: - step, step_label, step_param_name = alias_target - - if step is None: - raise RecipeValidationError(f"alias '{alias_name}' refers to unknown step '{step_label}'", log=self.log) - # find it in inputs or outputs - input_schema = step.inputs.get(step_param_name) - output_schema = step.outputs.get(step_param_name) - schema = input_schema or output_schema - if schema is None: - raise RecipeValidationError(f"alias '{alias_name}' refers to unknown step parameter '{step_label}.{step_param_name}'", log=self.log) - # check that it's not an input that is already set - if step_param_name in step.params and input_schema: - raise RecipeValidationError(f"alias '{alias_name}' refers to input '{step_label}.{step_param_name}' that is already predefined", log=self.log) - # check that its I/O is consistent - if (input_schema and alias_name in self.outputs) or (output_schema and alias_name in self.inputs): - raise RecipeValidationError(f"alias '{alias_name}' can't refer to both an input and an output", log=self.log) - # see if it's already defined consistently - io = self.inputs if input_schema else self.outputs - existing_schema = io.get(alias_name) - if existing_schema is None: - io[alias_name] = schema.copy() # make copy of schema object - existing_schema = io[alias_name] # get reference to this copy now - existing_schema.required = False # will be set to True below as needed - existing_schema.implicit = None + step_spec, step_param_name = alias_target.split('.', 1) + # treat label as a "(cabtype)" specifier? + if re.match('^\(.+\)$', step_spec): + steps = [(label, step) for label, step in self.steps.items() if isinstance(step.cargo, Cab) and step.cab == step_spec[1:-1]] + # treat label as a wildcard? + elif any(ch in step_spec for ch in '*?['): + steps = [(label, step) for label, step in self.steps.items() if fnmatch.fnmatchcase(label, step_spec)] + wildcards = True + # else treat label as a specific step name + else: + steps = [(step_spec, self.steps.get(step_spec))] else: - # if existing type was unset, set it quietly - if not existing_schema.dtype: - existing_schema.dtype = schema.dtype - # check if definition conflicts - elif schema.dtype != existing_schema.dtype: - raise RecipeValidationError(f"alias '{alias_name}': dtype {schema.dtype} of '{step_label}.{step_param_name}' doesn't match previous dtype {existing_schema.dtype}", log=self.log) - - # this is True if the step's parameter is defined in any way (set, default, or implicit) - have_step_param = step_param_name in step.params or step_param_name in step.cargo.defaults or \ - schema.default is not None or schema.implicit is not None - - # alias becomes required if any step parameter it refers to was required and unset - if schema.required and not have_step_param: - existing_schema.required = True - - ### OMS 16/02/2022: see https://github.com/caracal-pipeline/stimela2/issues/16 - ### this was misguided. If an aliased parameter is set in one of the steps, it should not become implicit, i.e. the user - ### should still be able to override it at recipe level. - - ## alias becomes implicit if any step parameter it refers to is defined (and it doesn't have its own default) - ## if have_step_param and alias_name not in self.defaults: - ## existing_schema.implicit = f"{step_label}.{step_param_name}" - - self._alias_map[step_label, step_param_name] = alias_name - self._alias_list.setdefault(alias_name, []).append((step, step_param_name)) + step, step_spec, step_param_name = alias_target + steps = [(step_spec, step)] + + for (step_label, step) in steps: + if step is None: + raise RecipeValidationError(f"alias '{alias_name}' refers to unknown step '{step_label}'", log=self.log) + # is the alias already defined + existing_alias = self._alias_list.get(alias_name, [None])[0] + # find it in inputs or outputs + input_schema = step.inputs.get(step_param_name) + output_schema = step.outputs.get(step_param_name) + schema = input_schema or output_schema + if schema is None: + if wildcards: + continue + else: + raise RecipeValidationError(f"alias '{alias_name}' refers to unknown step parameter '{step_label}.{step_param_name}'", log=self.log) + # implicit inputs cannot be aliased + if input_schema and input_schema.implicit: + raise RecipeValidationError(f"alias '{alias_name}' refers to implicit input '{step_label}.{step_param_name}'", log=self.log) + # if alias is already defined, check for conflicts + if existing_alias is not None: + io = existing_alias.io + if io is self.outputs: + raise RecipeValidationError(f"output alias '{alias_name}' is defined more than once", log=self.log) + elif output_schema: + raise RecipeValidationError(f"alias '{alias_name}' refers to both an input and an output", log=self.log) + alias_schema = io[alias_name] + # now we know it's a multiply-defined input, check for type consistency + if alias_schema.dtype != schema.dtype: + raise RecipeValidationError(f"alias '{alias_name}': dtype {schema.dtype} of '{step_label}.{step_param_name}' doesn't match previous dtype {existing_schema.dtype}", log=self.log) + + # else alias not yet defined, insert a schema + else: + io = self.inputs if input_schema else self.outputs + # if we have a schema defined for the alias, some params must be inherited from it + own_schema = io.get(alias_name) + # define schema based on copy of the target + io[alias_name] = copy.copy(schema) + alias_schema = io[alias_name] + # default set from own schema + if own_schema is not None and own_schema.default is not None: + alias_schema.default = own_schema.default + if own_schema and own_schema.info is not None: + alias_schema.info = own_schema.info + # required flag overrides, if set from our own schema + if own_schema is not None and own_schema.required: + alias_schema.required = True + # category is set by argument, else from own schema, else from target + if category is not None: + alias_schema.category = category + elif own_schema is not None and own_schema.category is not None: + alias_schema.category = own_schema.category + + # if step parameter is implicit, mark the alias as implicit. Note that this only applies to outputs + if schema.implicit: + alias_schema.implicit = Unresolved(f"{step_label}.{step_param_name}") # will be resolved when propagated from step + self._implicit_params.add(alias_name) + + # this is True if the step's parameter is defined in any way (set, default, or implicit) + have_step_param = step_param_name in step.params or step_param_name in step.cargo.defaults or \ + schema.default is not None or schema.implicit is not None + + # if the step parameter is set and ours isn't, mark our schema as having a default + if have_step_param and alias_schema.default is None: + alias_schema.default = DeferredAlias(f"{step_label}.{step_param_name}") + + # alias becomes required if any step parameter it refers to was required, but wasn't already set + if schema.required and not have_step_param: + alias_schema.required = True + + self._alias_map[step_label, step_param_name] = alias_name + self._alias_list.setdefault(alias_name, []).append(Recipe.AliasInfo(step_label, step, step_param_name, io)) def finalize(self, config=None, log=None, logopts=None, fqname=None, nesting=0): if not self.finalized: @@ -573,22 +706,20 @@ def finalize(self, config=None, log=None, logopts=None, fqname=None, nesting=0): self.validate_assignments(step.assign, step.assign_based_on, f"{fqname}.{label}") # init and/or update logger options - logopts = (logopts if logopts is not None else config.opts.log).copy() - if 'log' in self.assign: - logopts.update(**self.assign.log) + self.logopts = (logopts if logopts is not None else config.opts.log).copy() # update file logger logsubst = SubstitutionNS(config=config, info=dict(fqname=fqname)) - stimelogging.update_file_logger(log, logopts, nesting=nesting, subst=logsubst, location=[self.fqname]) + stimelogging.update_file_logger(log, self.logopts, nesting=nesting, subst=logsubst, location=[self.fqname]) # call Cargo's finalize method - super().finalize(config, log=log, logopts=logopts, fqname=fqname, nesting=nesting) + super().finalize(config, log=log, logopts=self.logopts, fqname=fqname, nesting=nesting) # finalize steps for label, step in self.steps.items(): step_log = log.getChild(label) step_log.propagate = True - step.finalize(config, log=step_log, logopts=logopts, fqname=f"{fqname}.{label}", nesting=nesting+1) + step.finalize(config, log=step_log, logopts=self.logopts, fqname=f"{fqname}.{label}", nesting=nesting+1) # collect aliases self._alias_map = OrderedDict() @@ -600,7 +731,6 @@ def finalize(self, config=None, log=None, logopts=None, fqname=None, nesting=0): if schema.aliases: if schema.dtype != "str" or schema.choices or schema.writable: raise RecipeValidationError(f"alias '{name}' should not specify type, choices or writability", log=log) - schema.dtype = "" # tells _add_alias to not check for alias_target in schema.aliases: self._add_alias(name, alias_target) @@ -612,13 +742,14 @@ def finalize(self, config=None, log=None, logopts=None, fqname=None, nesting=0): # automatically make aliases for step parameters that are unset, and don't have a default, and aren't implict for label, step in self.steps.items(): for name, schema in step.inputs_outputs.items(): - if (label, name) not in self._alias_map and name not in step.params \ + if (label, name) not in self._alias_map and name not in step.params \ and name not in step.cargo.defaults and schema.default is None \ and not schema.implicit: auto_name = f"{label}_{name}" if auto_name in self.inputs or auto_name in self.outputs: raise RecipeValidationError(f"auto-generated parameter name '{auto_name}' conflicts with another name. Please define an explicit alias for this.", log=log) - self._add_alias(auto_name, (step, label, name)) + self._add_alias(auto_name, (step, label, name), + category=ParameterCategory.Required if schema.required else ParameterCategory.Obscure) # these will be re-merged when needed again self._inputs_outputs = None @@ -638,9 +769,9 @@ def finalize(self, config=None, log=None, logopts=None, fqname=None, nesting=0): else: raise RecipeValidationError(f"for_loop: over is of invalid type {type(self.for_loop.over)}", log=log) - # insert empty loop variable - if self.for_loop.var not in self.assign: - self.assign[self.for_loop.var] = "" + # # insert empty loop variable + # if self.for_loop.var not in self.assign: + # self.assign[self.for_loop.var] = "" def _prep_step(self, label, step, subst): parts = label.split("-") @@ -657,95 +788,164 @@ def prevalidate(self, params: Optional[Dict[str, Any]], subst: Optional[Substitu self.log.debug("prevalidating recipe") errors = [] - # update assignments - self.update_assignments(self.assign, self.assign_based_on, params=params, location=self.fqname) + subst_outer = subst # outer dictionary is used to prevalidate our parameters subst = SubstitutionNS() - info = SubstitutionNS(fqname=self.fqname) + info = SubstitutionNS(fqname=self.fqname, label='', label_parts=[], suffix='') # mutable=False means these sub-namespaces are not subject to {}-substitutions subst._add_('info', info, nosubst=True) subst._add_('config', self.config, nosubst=True) subst._add_('steps', {}, nosubst=True) subst._add_('previous', {}, nosubst=True) - subst._add_('recipe', self.make_substitition_namespace(ns=self.assign)) + subst._add_('recipe', {}) subst.recipe._merge_(params) + # update assignments + self.update_assignments(subst, params=params, ignore_subst_errors=True) + # add for-loop variable to inputs, if expected there if self.for_loop is not None and self.for_loop.var in self.inputs: params[self.for_loop.var] = Unresolved("for-loop") - # validate our own parameters - try: - Cargo.prevalidate(self, params, subst=subst) - except ScabhaBaseException as exc: - msg = f"recipe pre-validation failed: {exc}" - errors.append(RecipeValidationError(msg, log=self.log)) + # prevalidate our own parameters. This substitutes in defaults and does {}-substitutions + # we call this twice, potentially, so define as a function + def prevalidate_self(params): + try: + params = Cargo.prevalidate(self, params, subst=subst_outer) + # validate for-loop, if needed + self.validate_for_loop(params, strict=False) - # merge again - subst.recipe._merge_(self.params) + except ScabhaBaseException as exc: + msg = f"recipe pre-validation failed: {exc}" + errors.append(RecipeValidationError(msg, log=self.log)) - # propagate aliases up to substeps - for name, value in self.params.items(): - self._propagate_parameter(name, value) + # merge again, since values may have changed + subst.recipe._merge_(params) + return params - # check for missing parameters - if self.missing_params: - msg = f"""recipe '{self.name}' is missing the following required parameters: {join_quote(self.missing_params)}""" - errors.append(RecipeValidationError(msg, log=self.log)) + params = prevalidate_self(params) - # prevalidate step parameters - for label, step in self.steps.items(): - self._prep_step(label, step, subst) + # propagate alias values up to substeps, except for implicit values (these only ever propagate down to us) + for name, aliases in self._alias_list.items(): + if name in params and type(params[name]) is not DeferredAlias and name not in self._implicit_params: + for alias in aliases: + alias.from_recipe = True + alias.step.update_parameter(alias.param, params[name]) - try: - step.prevalidate(subst) - except ScabhaBaseException as exc: - if type(exc) is SubstitutionErrorList: - self.log.error(f"unresolved {{}}-substitution(s):") - for err in exc.errors: - self.log.error(f" {err}") - msg = f"step '{label}' failed pre-validation: {exc}" - errors.append(RecipeValidationError(msg, log=self.log)) + # prevalidate step parameters + # we call this twice, potentially, so define as a function - subst.previous = subst.current - subst.steps[label] = subst.previous + def prevalidate_steps(): + for label, step in self.steps.items(): + self._prep_step(label, step, subst) + # update, since these may depend on step info + self.update_assignments(subst, params=params, ignore_subst_errors=True) + self.update_assignments(subst, whose=step, params=params, ignore_subst_errors=True) + + try: + step_params = step.prevalidate(subst) + subst.current._merge_(step_params) # these may have changed in prevalidation + except ScabhaBaseException as exc: + if type(exc) is SubstitutionErrorList: + self.log.error(f"unresolved {{}}-substitution(s):") + for err in exc.errors: + self.log.error(f" {err}") + msg = f"step '{label}' failed pre-validation: {exc}" + errors.append(RecipeValidationError(msg, log=self.log)) + + subst.previous = subst.current + subst.steps[label] = subst.previous + + prevalidate_steps() + + # now check for aliases that need to be propagated up/down + if not errors: + revalidate_self = revalidate_steps = False + for name, aliases in self._alias_list.items(): + # propagate up if alias is not set, or it is implicit=Unresolved (meaning it gets set from an implicit substep parameter) + if name not in params or type(params[name]) is DeferredAlias or type(self.inputs_outputs[name].implicit) is Unresolved: + from_step = False + for alias in aliases: + # if alias is set in step but not with us, mark it as propagating down + if alias.param in alias.step.validated_params: + alias.from_step = from_step = revalidate_self = True + params[name] = alias.step.validated_params[alias.param] + # and break out, we do this for the first matching step only + break + # if we propagated an input value down from a step, check if we need to propagate it up to any other steps + # note that this only ever applies to inputs + if from_step: + for alias in aliases: + if not alias.from_step: + alias.from_recipe = revalidate_steps = True + alias.step.update_parameter(alias.param, params[name]) + + # do we or any steps need to be revalidated? + if revalidate_self: + params = prevalidate_self(params) + if revalidate_steps: + prevalidate_steps() + # check for missing parameters + missing_params = [name for name, schema in self.inputs_outputs.items() if schema.required and name not in params] + if missing_params: + msg = f"""recipe '{self.name}' is missing the following required parameters: {join_quote(missing_params)}""" + errors.append(RecipeValidationError(msg, log=self.log)) if errors: raise RecipeValidationError(f"{len(errors)} error(s) validating the recipe '{self.name}'", log=self.log) self.log.debug("recipe pre-validated") - def validate_inputs(self, params: Dict[str, Any], subst: Optional[SubstitutionNS]=None, loosely=False): + return params + + def validate_for_loop(self, params, strict=False): # in case of for loops, get list of values to be iterated over if self.for_loop is not None: # if over != None (see finalize() above), list of values needs to be looked up in inputs + # if it is None, then an explicit list was supplied and is already in self._for_loop_values. if self.for_loop.over is not None: - self._for_loop_values = self.params[self.for_loop.over] - if not isinstance(self._for_loop_values, (list, tuple)): - self._for_loop_values = [self._for_loop_values] - self.log.info(f"recipe is a for-loop with '{self.for_loop.var}' iterating over {len(self._for_loop_values)} values") - self.log.info(f"Loop values: {self._for_loop_values}") - # add first value to inputs, if needed - if self.for_loop.var in self.inputs and self._for_loop_values: + # check that it's legal + if self.for_loop.over in self.assign: + values = self.assign[self.for_loop.over] + elif self.for_loop.over in params: + values = params[self.for_loop.over] + elif self.for_loop.over not in self.inputs: + raise ParameterValidationError(f"for_loop.over={self.for_loop.over} does not refer to a known parameter") + else: + raise ParameterValidationError(f"for_loop.over={self.for_loop.over} is unset") + if strict and isinstance(values, Unresolved): + raise ParameterValidationError(f"for_loop.over={self.for_loop.over} is unresolved") + if type(values) is ListConfig: + values = list(values) + elif not isinstance(values, (list, tuple)): + values = [values] + if self._for_loop_values is None: + self.log.info(f"recipe is a for-loop with '{self.for_loop.var}' iterating over {len(values)} values") + self.log.info(f"Loop values: {values}") + self._for_loop_values = values + if self.for_loop.var in self.inputs: params[self.for_loop.var] = self._for_loop_values[0] + else: + self.assign[self.for_loop.var] = self._for_loop_values[0] # else fake a single-value list else: self._for_loop_values = [None] - # add 'recipe' substitutions + def validate_inputs(self, params: Dict[str, Any], subst: Optional[SubstitutionNS]=None, loosely=False): + + self.validate_for_loop(params, strict=True) + if subst is None: subst = SubstitutionNS() info = SubstitutionNS(fqname=self.fqname) subst._add_('info', info, nosubst=True) subst._add_('config', self.config, nosubst=True) + subst._add_('recipe', self.make_substitition_namespace(params)) + # 'current' points to recipe's parameters initially, + subst.current = subst.recipe - # subst._add_('recipe', self.make_substitition_namespace(ns=self.assign)) - # subst.recipe._merge_(params) - - params = Cargo.validate_inputs(self, params, subst=subst, loosely=loosely) - - return params + return Cargo.validate_inputs(self, params, subst=subst, loosely=loosely) def _link_steps(self): """ @@ -769,35 +969,12 @@ def _link_steps(self): step.next_step = None step.previous_step = steps[i-2] - def _propagate_parameter(self, name, value): - ### OMS: not sure why I had this, why not propagae unresolveds? - ## if type(value) is not validate.Unresolved: - for step, step_param_name in self._alias_list.get(name, []): - if self.inputs_outputs[name].implicit: - if step_param_name in step.cargo.params: - self.params[name] = step.cargo.params[name] - else: - step.update_parameter(step_param_name, value) - - def update_parameter(self, name: str, value: Any): - """[summary] - - Parameters - ---------- - name : str - [description] - value : Any - [description] - """ - self.params[name] = value - # resolved values propagate up to substeps if aliases, and propagate back if implicit - self._propagate_parameter(name, value) - - def summary(self, recursive=True): + def summary(self, params: Dict[str, Any], recursive=True, ignore_missing=False): """Returns list of lines with a summary of the recipe state """ - lines = [f"recipe '{self.name}':"] + [f" {name} = {value}" for name, value in self.params.items()] + \ - [f" {name} = ???" for name in self.missing_params] + lines = [f"recipe '{self.name}':"] + [f" {name} = {value}" for name, value in params.items()] + if not ignore_missing: + lines += [f" {name} = ???" for name in self.inputs_outputs if name not in params] if recursive: lines.append(" steps:") for name, step in self.steps.items(): @@ -808,7 +985,29 @@ def summary(self, recursive=True): _root_recipe_ns = None - def _run(self) -> Dict[str, Any]: + def rich_help(self, tree, max_category=ParameterCategory.Optional): + Cargo.rich_help(self, tree, max_category=max_category) + if self.for_loop: + loop_tree = tree.add("For loop:") + if self._for_loop_values is not None: + over = f"{len(self._for_loop_values)} values" + else: + over = f"[bold]{self.for_loop.over}[/bold]" + loop_tree.add(f"iterating [bold]{self.for_loop.var}[/bold] over {over}") + if self.steps: + have_skips = any(step._skip for step in self.steps.values()) + steps_tree = tree.add(f"Steps (note [italic]some steps[/italic] are skipped by default):" + if have_skips else "Steps:") + table = rich.table.Table.grid("", "", "", padding=(0,2)) # , show_header=False, show_lines=False, box=rich.box.SIMPLE) + steps_tree.add(table) + for label, step in self.steps.items(): + style = "italic" if step._skip else "bold" + table.add_row(f"[{style}]{label}[/{style}]", step.info) + else: + steps_tree = tree.add("No recipe steps defined") + + + def _run(self, params) -> Dict[str, Any]: """Internal recipe run method. Meant to be called from a wrapper Step object (which validates the parameters, etc.) Parameters @@ -826,12 +1025,13 @@ def _run(self) -> Dict[str, Any]: # set up substitution namespace subst = SubstitutionNS() - info = SubstitutionNS(fqname=self.fqname) + info = SubstitutionNS(fqname=self.fqname, label='', label_parts=[], suffix='') # nosubst=True means these sub-namespaces are not subject to {}-substitutions subst._add_('info', info, nosubst=True) + subst._add_('config', self.config, nosubst=True) subst._add_('steps', {}, nosubst=True) subst._add_('previous', {}, nosubst=True) - recipe_ns = self.make_substitition_namespace(ns=self.assign) + recipe_ns = self.make_substitition_namespace(params) subst._add_('recipe', recipe_ns) # merge in config sections, except "recipe" which clashes with our namespace @@ -844,13 +1044,8 @@ def _run(self) -> Dict[str, Any]: Recipe._root_recipe_ns = recipe_ns subst._add_('root', Recipe._root_recipe_ns) - - logopts = self.config.opts.log.copy() - if 'log' in self.assign: - logopts.update(**self.assign.log) - - # update logfile name (since this may depend on substitutions) - stimelogging.update_file_logger(self.log, self.logopts, nesting=self.nesting, subst=subst, location=[self.fqname]) + # update variable assignments + self.update_assignments(subst, params=params) # Harmonise before running self._link_steps() @@ -859,19 +1054,18 @@ def _run(self) -> Dict[str, Any]: # our inputs have been validated, so propagate aliases to steps. Check for missing stuff just in case for name, schema in self.inputs.items(): - if name in self.params: - value = self.params[name] - if type(value) is validate.Unresolved: + if name in params: + value = params[name] + if isinstance(value, Unresolved): raise RecipeValidationError(f"recipe '{self.name}' has unresolved input '{name}'", log=self.log) # propagate up all aliases - for step, step_param_name in self._alias_list.get(name, []): - step.update_parameter(step_param_name, value) + for alias in self._alias_list.get(name, []): + if alias.from_recipe: + alias.step.update_parameter(alias.param, value) else: if schema.required: raise RecipeValidationError(f"recipe '{self.name}' is missing required input '{name}'", log=self.log) - - # iterate over for-loop values (if not looping, this is set up to [None] in advance) scatter = getattr(self.for_loop, "scatter", False) @@ -880,32 +1074,26 @@ def loop_worker(inst, step, label, subst, count, iter_var): Needed for concurrency """ + # update step info + inst._prep_step(label, step, subst) + # if for-loop, assign new value if inst.for_loop: inst.log.info(f"for loop iteration {count}: {inst.for_loop.var} = {iter_var}") - # update variables - inst.assign[inst.for_loop.var] = iter_var + # update variable (in params, if expected there, else in assignments) + if inst.for_loop.var in inst.inputs_outputs: + params[inst.for_loop.var] = iter_var + else: + inst.assign[inst.for_loop.var] = iter_var + # update variable index inst.assign[f"{inst.for_loop.var}@index"] = count - inst.update_assignments(inst.assign, inst.assign_based_on, inst.fqname) - subst.recipe._merge_(inst.assign) - # update logfile name (since this may depend on substitutions) - stimelogging.update_file_logger(inst.log, inst.logopts, nesting=inst.nesting, subst=subst, location=[inst.fqname]) - + stimelogging.declare_subtask_attributes(f"{count+1}/{len(inst._for_loop_values)}") - # merge in variable assignments and add step params as "current" namespace - self.update_assignments(step.assign, step.assign_based_on, f"{self.name}.{label}") - subst.recipe._merge_(step.assign) + # update and re-evaluate assignments + inst.update_assignments(subst, params=params) + inst.update_assignments(subst, whose=step, params=params) - # update info - inst._prep_step(label, step, subst) - # update log options again (based on assign.log which may have changed) - if 'log' in step.assign: - logopts.update(**step.assign.log) - - # update logfile name regardless (since this may depend on substitutions) - info.fqname = step.fqname - stimelogging.update_file_logger(step.log, step.logopts, nesting=step.nesting, subst=subst, location=[step.fqname]) - + inst.log.info(f"processing step '{label}'") try: #step_params = step.run(subst=subst.copy(), batch=batch) # make a copy of the subst dict since recipe might modify step_params = step.run(subst=subst.copy()) # make a copy of the subst dict since recipe might modify @@ -919,16 +1107,6 @@ def loop_worker(inst, step, label, subst, count, iter_var): subst.previous = step_params subst.steps[label] = subst.previous - # check aliases, our outputs need to be retrieved from the step - for name, _ in inst.outputs.items(): - for step1, step_param_name in inst._alias_list.get(name, []): - if step1 is step and step_param_name in step_params: - inst.params[name] = step_params[step_param_name] - # clear implicit setting - inst.outputs[name].implicit = None - - inst.log.info(f"{'skipping' if step.skip else 'running'} step '{label}'") - loop_futures = [] for count, iter_var in enumerate(self._for_loop_values): @@ -950,19 +1128,26 @@ def loop_worker(inst, step, label, subst, count, iter_var): # results = list(loop_pool.imap(loop_worker, *loop_args)) results = [loop_worker(*args) for args in loop_futures] + # now check for output aliases that need to be propagated down + for name, aliases in self._alias_list.items(): + for alias in aliases: + if alias.from_step: + if alias.param in alias.step.validated_params: + params[name] = alias.step.validated_params[alias.param] + self.log.info(f"recipe '{self.name}' executed successfully") - return {name: value for name, value in self.params.items() if name in self.outputs} + return OrderedDict((name, value) for name, value in params.items() if name in self.outputs) - def run(self, **params) -> Dict[str, Any]: - """Public interface for running a step. Keywords are passed in as step parameters + # def run(self, **params) -> Dict[str, Any]: + # """Public interface for running a step. Keywords are passed in as step parameters - Returns - ------- - Dict[str, Any] - Dictionary of formal outputs - """ - return Step(recipe=self, params=params, info=f"wrapper step for recipe '{self.name}'").run() + # Returns + # ------- + # Dict[str, Any] + # Dictionary of formal outputs + # """ + # return Step(recipe=self, params=params, info=f"wrapper step for recipe '{self.name}'").run() class PyRecipe(Recipe): diff --git a/stimela/kitchen/runners.py b/stimela/kitchen/runners.py index 20c09f0f..20de8285 100644 --- a/stimela/kitchen/runners.py +++ b/stimela/kitchen/runners.py @@ -1,13 +1,13 @@ import shlex from typing import Dict, Optional, Any -from scabha.cargo import Cab, Batch +from scabha.cargo import Cab, Batch, Parameter from stimela import logger from stimela.utils.xrun_poll import xrun from stimela.exceptions import StimelaCabRuntimeError -def run_cab(cab: Cab, log=None, subst: Optional[Dict[str, Any]] = None, batch: Batch=None): +def run_cab(cab: Cab, params: Dict[str, Any], log=None, subst: Optional[Dict[str, Any]] = None, batch: Batch=None): log = log or logger() backend = __import__(f"stimela.backends.{cab.backend.name}", fromlist=[cab.backend.name]) - return backend.run(cab, log=log, subst=subst, batch=batch) + return backend.run(cab, params=params, log=log, subst=subst, batch=batch) diff --git a/stimela/main.py b/stimela/main.py index bb60612b..b2354d2e 100644 --- a/stimela/main.py +++ b/stimela/main.py @@ -93,7 +93,7 @@ def cli(backend, config_files=[], verbose=False): # import commands -from stimela.commands import run, images, build, push, save_config +from stimela.commands import run, images, build, push, save_config, help ## the ones not listed above haven't been converted to click yet. They are: # cabs, clean, containers, kill, ps, pull diff --git a/stimela/stimelogging.py b/stimela/stimelogging.py index f244e1d6..c2e72bf2 100644 --- a/stimela/stimelogging.py +++ b/stimela/stimelogging.py @@ -1,8 +1,15 @@ +import atexit import sys, os.path, re +from datetime import datetime import logging -from typing import Optional, Dict, Any, Union +import contextlib +from typing import Optional, Union from omegaconf import DictConfig from scabha.substitutions import SubstitutionNS, forgiving_substitutions_from +import asyncio +import psutil +import rich.progress +import rich.logging class MultiplexingHandler(logging.Handler): """handler to send INFO and below to stdout, everything above to stderr""" @@ -13,7 +20,11 @@ def __init__(self, info_stream=sys.stdout, err_stream=sys.stderr): self.multiplex = True def emit(self, record): - handler = self.err_handler if record.levelno > logging.INFO and self.multiplex else self.info_handler + # does record come with its own handler? Rather use that + if hasattr(record, 'custom_console_handler'): + handler = record.custom_console_handler + else: + handler = self.err_handler if record.levelno > logging.INFO and self.multiplex else self.info_handler handler.emit(record) # ignore broken pipes, this often happens when cleaning up and exiting try: @@ -43,6 +54,7 @@ class ConsoleColors(): DIM = '\033[2m' if sys.stdin.isatty() else '' GREEN = '\033[92m' if sys.stdin.isatty() else '' YELLOW = '\033[93m' if sys.stdin.isatty() else '' + BLUE = '\033[94m' if sys.stdin.isatty() else '' WHITE = '\033[39m' if sys.stdin.isatty() else '' ENDC = '\033[0m' if sys.stdin.isatty() else '' @@ -75,6 +87,7 @@ def format(self, record): class SelectiveFormatter(logging.Formatter): """Selective formatter. if condition(record) is True, invokes other formatter""" def __init__(self, default_formatter, dispatch_list): + logging.Formatter.__init__(self) self._dispatch_list = dispatch_list self._default_formatter = default_formatter @@ -88,7 +101,8 @@ def format(self, record): _logger = None log_console_handler = log_formatter = log_boring_formatter = log_colourful_formatter = None - +progress_bar = progress_task = None +_start_time = datetime.now() def is_logger_initialized(): return _logger is not None @@ -127,10 +141,29 @@ def _is_from_subprocess(rec): log_formatter = log_boring_formatter if boring else log_colourful_formatter if console: + global progress_bar, progress_task + progress_bar = rich.progress.Progress( + rich.progress.SpinnerColumn(), + "[yellow]{task.fields[elapsed_time]}[/yellow]", + "[bold]{task.description}[/bold]", + rich.progress.SpinnerColumn(), + "{task.fields[command]}", + rich.progress.TimeElapsedColumn(), + "{task.fields[cpu_info]}", + refresh_per_second=2, + transient=True) + + progress_task = progress_bar.add_task("stimela", command="starting", cpu_info=" ", elapsed_time="", start=True) + progress_bar.__enter__() + atexit.register(lambda:progress_bar.__exit__(None, None, None)) + if "SILENT_STDERR" in os.environ and os.environ["SILENT_STDERR"].upper()=="ON": log_console_handler = logging.StreamHandler(stream=sys.stdout) else: - log_console_handler = MultiplexingHandler() + log_console_handler = rich.logging.RichHandler(console=progress_bar, + highlighter=rich.highlighter.NullHighlighter(), + show_level=False, show_path=False, show_time=False) + log_console_handler.setFormatter(log_formatter) log_console_handler.setLevel(loglevel) _logger.addHandler(log_console_handler) @@ -141,6 +174,66 @@ def _is_from_subprocess(rec): return _logger +progress_task_names = [] +progress_task_names_orig = [] + +@contextlib.contextmanager +def declare_subtask(subtask_name): + progress_task_names.append(subtask_name) + progress_task_names_orig.append(subtask_name) + update_process_status(description='.'.join(progress_task_names)) + try: + yield subtask_name + finally: + progress_task_names.pop(-1) + progress_task_names_orig.pop(-1) + update_process_status(progress_task, description='.'.join(progress_task_names)) + +def declare_subtask_attributes(*args, **kw): + attrs = [str(x) for x in args] + [f"{key} {value}" for key, value in kw.items()] + attrs = ', '.join(attrs) + progress_task_names[-1] = f"{progress_task_names_orig[-1]}\[{attrs}]" + update_process_status(description='.'.join(progress_task_names)) + + +@contextlib.contextmanager +def declare_subcommand(command): + update_process_status(command=command) + progress_bar and progress_bar.reset(progress_task) + try: + yield command + finally: + update_process_status(command="") + +def update_process_status(command=None, description=None): + if progress_bar is not None: + # elapsed time + elapsed = str(datetime.now() - _start_time).split('.', 1)[0] + # CPU and memory + cpu = psutil.cpu_percent() + mem = psutil.virtual_memory() + used = round(mem.total*mem.percent/100 / 2**30) + total = round(mem.total / 2**30) + updates = dict(elapsed_time=elapsed, + cpu_info=f"CPU [green]{cpu}%[/green] RAM [green]{used}[/green]/[green]{total}[/green]G") + if command is not None: + updates['command'] = command + if description is not None: + updates['description'] = description + progress_bar.update(progress_task, **updates) + + +async def run_process_status_update(): + if progress_bar: + with contextlib.suppress(asyncio.CancelledError): + while True: + update_process_status() + await asyncio.sleep(1) + + + + + _logger_file_handlers = {} _logger_console_handlers = {} diff --git a/stimela/tests/test_aliasing.yml b/stimela/tests/test_aliasing.yml new file mode 100644 index 00000000..dfdbaa4e --- /dev/null +++ b/stimela/tests/test_aliasing.yml @@ -0,0 +1,82 @@ +cabs: + echo: + image: null + command: echo + policies: + skip_implicits: false + key_value: true + inputs: + a: + dtype: str + required: true + b: + dtype: str + c: + dtype: str + d: + implicit: "d_is_implicit" + e: + dtype: str + f: + dtype: str + outputs: + out: + dtype: File + implicit: "{current.a}" + must_exist: false + out2: + dtype: File + must_exist: false + default: out2 + +opts: + log: + dir: test-logs/logs-{config.run.datetime} + nest: 3 + symlink: logs + +recipe: + name: "alias test recipe" + aliases: + a: [s1.a, s2.a] + b: [s1.b, s2.b] + out: [s4.out] + out2: [s3.out2] + e: ['s[12].e'] + f: ['(echo).f'] + g: ['xx*.g'] + + steps: + s1: + cab: echo + params: + b: 1 + c: 2 + s2: + cab: echo + params: + c: 2 + s3: + cab: echo + params: + c: 2 + s4: + cab: echo + params: + c: 4 + s5: + recipe: + name: "alias sub-recipe" + aliases: + a: [ss1.a, ss2.a] + steps: + ss1: + cab: echo + params: + c: 2 + ss2: + cab: echo + params: + c: 2 + params: + a: '{info.fqname}' diff --git a/stimela/tests/test_loop_recipe.yml b/stimela/tests/test_loop_recipe.yml index c177d4ba..8e10ee7b 100644 --- a/stimela/tests/test_loop_recipe.yml +++ b/stimela/tests/test_loop_recipe.yml @@ -23,8 +23,6 @@ lib: cubical_image: name: "cubical_image" info: 'does one step of cubical, followed by one step of imaging' - dirs: - log: logs aliases: ms: [calibrate.ms, image.ms] steps: @@ -53,7 +51,7 @@ lib: opts: log: - dir: logs-{config.run.datetime} + dir: test-logs/logs-{config.run.datetime} nest: 3 symlink: logs @@ -72,7 +70,6 @@ cubical_image_loop: image-prefix: "{recipe.dir.out}/im{info.suffix}-{recipe.loop-name}/im{info.suffix}-{recipe.loop-name}" loop-name: "s{recipe.loop:02d}" log: - dir: logs-{config.run.datetime} name: log-{recipe.loop-name}-{info.fqname}.txt for_loop: @@ -85,6 +82,7 @@ cubical_image_loop: steps: calibrate: cab: cubical + skip: "=recipe.loop == 1" image-1: cab: myclean params: diff --git a/stimela/tests/test_recipe.py b/stimela/tests/test_recipe.py index 9af5b966..97782f6d 100644 --- a/stimela/tests/test_recipe.py +++ b/stimela/tests/test_recipe.py @@ -1,5 +1,41 @@ import stimela -import os +import os, re, subprocess + +def run(command): + """Runs command, returns tuple of exit code, output""" + try: + return 0, subprocess.check_output(command, shell=True).strip().decode() + except subprocess.CalledProcessError as exc: + return exc.returncode, exc.output.strip().decode() + +def verify_output(output, *regexes): + """Returns true if the output contains lines matching the regexes in sequence (possibly with other lines in between)""" + regexes = list(regexes[::-1]) + for line in output.split("\n"): + if regexes and re.search(regexes[-1], line): + regexes.pop() + if regexes: + print("Error, the following regexes did not match the output:") + for regex in regexes: + print(f" {regex}") + return False + return True + + +def test_test_aliasing(): + print("===== expecting an error since required parameters are missing =====") + retcode, _ = run("stimela -v exec test_aliasing.yml") + assert retcode != 0 + + print("===== expecting no errors now =====") + retcode, output = run("stimela -v exec test_aliasing.yml a=1 s3_a=1 s4_a=1 e=e f=f") + assert retcode == 0 + print(output) + assert verify_output(output, + "DEBUG: ### validated outputs", + "DEBUG: recipe 'recipe'", + "DEBUG: out: 1") + def test_test_recipe(): print("===== expecting an error since 'msname' parameter is missing =====") @@ -12,7 +48,7 @@ def test_test_recipe(): def test_test_loop_recipe(): print("===== expecting an error since 'ms' parameter is missing =====") - retcode = os.system("stimela -v exec test_loop_recipe.yml cubical_image") + retcode = os.system("stimela -v exec test_loop_recipe.yml cubical_image_loop") assert retcode != 0 print("===== expecting no errors now =====") @@ -27,8 +63,6 @@ def test_test_loop_recipe(): retcode = os.system("stimela -v exec test_loop_recipe.yml loop_recipe") assert retcode == 0 - - def test_runtime_recipe(): ## OMS ## disabling for now, need to revise to use "dummy" cabs (or add real cabs?) diff --git a/stimela/tests/test_recipe.yml b/stimela/tests/test_recipe.yml index aa82b8ff..1076e40a 100644 --- a/stimela/tests/test_recipe.yml +++ b/stimela/tests/test_recipe.yml @@ -66,6 +66,13 @@ cabs: dtype: str required: true +opts: + log: + dir: test-logs/logs-{config.run.datetime} + nest: 3 + symlink: logs + + recipe: name: "demo recipe" info: 'top level recipe definition' diff --git a/stimela/tests/test_xrun.yml b/stimela/tests/test_xrun.yml new file mode 100644 index 00000000..9f660265 --- /dev/null +++ b/stimela/tests/test_xrun.yml @@ -0,0 +1,78 @@ +cabs: + sleep: + command: sleep + inputs: + seconds: + dtype: int + required: true + policies: + positional: true + help: + command: wsclean + + shell: + command: '{current.command} {current.args}' + policies: + skip: true + inputs: + command: + dtype: str + required: true + args: + dtype: str + required: true + + fileops: + command: '{current.command}' + policies: + positional: true + inputs: + command: + choices: + - cp + - mv + policies: + skip: true + src: + dtype: File + required: true + must_exist: false + dest: + dtype: Union[File, Directory] + required: true + must_exist: false + +opts: + log: + dir: test-logs/logs-{config.run.datetime} + nest: 3 + symlink: logs + + +xrun_recipe: + name: "demo recipe" + info: 'demo recipe with loops and sleeps' + + for_loop: + var: x + over: [1,2,3] + + steps: + echo: + cab: shell + params: + command: echo + args: "1 2 3 4 5" + + # help: + # cab: help + slp1: + cab: sleep + params: + seconds: 5 + # cp: + # cab: fileops + # params: + # command: cp + # src: a + # dest: a diff --git a/stimela/utils/__init__.py b/stimela/utils/__init__.py index 19c68208..f58cd62b 100644 --- a/stimela/utils/__init__.py +++ b/stimela/utils/__init__.py @@ -16,7 +16,7 @@ CPUS = 1 -from .xrun_poll import xrun +from .xrun_asyncio import xrun def assign(key, value): frame = inspect.currentframe().f_back diff --git a/stimela/utils/xrun_asyncio.py b/stimela/utils/xrun_asyncio.py new file mode 100644 index 00000000..7b35b791 --- /dev/null +++ b/stimela/utils/xrun_asyncio.py @@ -0,0 +1,127 @@ +import traceback, subprocess, errno, re, time, logging, os, sys, signal, contextlib, datetime +import asyncio +import psutil +import rich +import rich.highlighter +from rich.style import Style +from rich.table import Column +from rich.progress import Progress, SpinnerColumn, TimeElapsedColumn +from rich.logging import RichHandler + +from stimela import stimelogging + +from .xrun_poll import get_stimela_logger, dispatch_to_log, xrun_nolog +from . import StimelaCabRuntimeError, StimelaProcessRuntimeError + +DEBUG = 0 + +log = None + + + +def xrun(command, options, log=None, env=None, timeout=-1, kill_callback=None, output_wrangler=None, shell=True, + return_errcode=False, command_name=None, progress_bar=False): + + command_name = command_name or command + + # this part could be inside the container + command_line = " ".join([command] + list(map(str, options))) + if shell: + command_line = " ".join([command] + list(map(str, options))) + command = [command_line] + else: + command = [command] + list(map(str, options)) + command_line = " ".join(command) + + log = log or get_stimela_logger() + + if log is None: + return xrun_nolog(command, name=command_name, shell=shell) + + # this part is never inside the container + import stimela + + log = log or stimela.logger() + + log.info("running " + command_line, extra=dict(stimela_subprocess_output=(command_name, "start"))) + + with stimelogging.declare_subcommand(os.path.basename(command_name)): + + start_time = datetime.datetime.now() + def elapsed(): + """Returns string representing elapsed time""" + return str(datetime.datetime.now() - start_time).split('.', 1)[0] + + loop = asyncio.get_event_loop() + + proc = loop.run_until_complete( + asyncio.create_subprocess_exec(*command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE)) + + async def stream_reader(stream, stream_name): + while not stream.at_eof(): + line = await stream.readline() + line = (line.decode('utf-8') if type(line) is bytes else line).rstrip() + if line or not stream.at_eof(): + dispatch_to_log(log, line, command_name, stream_name, output_wrangler=output_wrangler) + + async def proc_awaiter(proc, *cancellables): + await proc.wait() + for task in cancellables: + task.cancel() + + reporter = asyncio.Task(stimelogging.run_process_status_update()) + + try: + job = asyncio.gather( + proc_awaiter(proc, reporter), + stream_reader(proc.stdout, "stdout"), + stream_reader(proc.stderr, "stderr"), + reporter + ) + results = loop.run_until_complete(job) + status = proc.returncode + log.info(f"{command_name} exited with code {status} after {elapsed()}") + except SystemExit as exc: + loop.run_until_complete(proc.wait()) + except KeyboardInterrupt: + if callable(kill_callback): + log.warning(f"Ctrl+C caught after {elapsed()}, shutting down {command_name} process, please give it a few moments") + kill_callback() + log.info(f"the {command_name} process was shut down successfully", + extra=dict(stimela_subprocess_output=(command_name, "status"))) + loop.run_until_complete(proc.wait()) + else: + log.warning(f"Ctrl+C caught after {elapsed()}, interrupting {command_name} process {proc.pid}") + proc.send_signal(signal.SIGINT) + + async def wait_on_process(proc): + for retry in range(10): + await asyncio.sleep(1) + if proc.returncode is not None: + log.info(f"Process {proc.pid} has exited with return code {proc.returncode}") + break + if retry == 5: + log.warning(f"Process {proc.pid} not exited after {retry} seconds, will tyr to terminate it") + proc.terminate() + else: + log.info(f"Process {proc.pid} not exited after {retry} seconds, waiting a bit longer...") + else: + log.warning(f"Killing process {proc.pid}") + proc.kill() + + loop.run_until_complete(wait_on_process(proc)) + + raise StimelaCabRuntimeError(f"{command_name} interrupted with Ctrl+C") + + except Exception as exc: + loop.run_until_complete(proc.wait()) + traceback.print_exc() + raise StimelaCabRuntimeError(f"{command_name} threw exception: {exc} after {elapsed()}'", log=log) + + if status and not return_errcode: + raise StimelaCabRuntimeError(f"{command_name} returns error code {status} after {elapsed()}") + + return status + diff --git a/stimela/utils/xrun_poll.py b/stimela/utils/xrun_poll.py index 183c7874..dfa3b537 100644 --- a/stimela/utils/xrun_poll.py +++ b/stimela/utils/xrun_poll.py @@ -135,7 +135,7 @@ def xrun_nolog(command, name=None, shell=True): return 0 -def dispatch_to_log(log, line, command_name, stream_name, output_wrangler): +def dispatch_to_log(log, line, command_name, stream_name, output_wrangler, custom_console_handler=None): # dispatch output to log line = _remove_ctrls(line) extra = dict(stimela_subprocess_output=(command_name, stream_name)) @@ -143,6 +143,8 @@ def dispatch_to_log(log, line, command_name, stream_name, output_wrangler): severity = logging.INFO if stream_name == 'stderr': extra['color'] = 'WHITE' + if custom_console_handler is not None: + extra['custom_console_handler'] = custom_console_handler # feed through wrangler to adjust severity and content if output_wrangler is not None: line, severity = output_wrangler(line, severity)