Skip to content

Commit

Permalink
Merge pull request #21 from caracal-pipeline/parsers
Browse files Browse the repository at this point in the history
Formula parser & alias cleanup & rich-based help
  • Loading branch information
SpheMakh authored Apr 5, 2022
2 parents c49f4d6 + 14e56c4 commit 6ff1aaf
Show file tree
Hide file tree
Showing 11 changed files with 809 additions and 136 deletions.
16 changes: 15 additions & 1 deletion scabha/basetypes.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from dataclasses import field
from dataclasses import field, dataclass
from collections import OrderedDict
from typing import List


def EmptyDictDefault():
Expand All @@ -8,3 +9,16 @@ def EmptyDictDefault():

def EmptyListDefault():
return field(default_factory=lambda:[])


@dataclass
class Unresolved(object):
value: str = ""
errors: List[Exception] = EmptyListDefault

def __post_init__(self):
if not self.value:
self.value = "; ".join(map(str, self.errors))

def __str__(self):
return f"Unresolved({self.value})"
178 changes: 107 additions & 71 deletions scabha/cargo.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import os.path, re, stat, itertools, logging, yaml, shlex, importlib
from typing import Any, List, Dict, Optional, Union
from collections import OrderedDict
from enum import Enum
from enum import Enum, IntEnum
from dataclasses import dataclass
from omegaconf import MISSING, ListConfig, DictConfig

import rich.box
import rich.markup
from rich.table import Table
from rich.markdown import Markdown

import scabha
from scabha import exceptions
Expand Down Expand Up @@ -41,7 +45,7 @@ class ParameterPolicies(object):
prefix: Optional[str] = None

# skip this parameter
skip: bool = False
skip: Optional[bool] = None
# if True, implicit parameters will be skipped automatically
skip_implicits: Optional[bool] = None

Expand Down Expand Up @@ -87,7 +91,10 @@ class CabManagement: # defines common cab management behaviours
wranglers: Optional[Dict[str, ListOrString]] = EmptyDictDefault()



# used to classify parameters. Purely for cosmetic and help purposes
ParameterCategory = IntEnum("ParameterCategory",
dict(Required=0, Optional=1, Implicit=2, Obscure=3, Hidden=4),
module=__name__)

@dataclass
class Parameter(object):
Expand All @@ -97,8 +104,8 @@ class Parameter(object):
writable: bool = False
# data type
dtype: str = "str"
# for file-type parameters, specifies that the filename is implicitly set inside the step (i.e. not a free parameter)
implicit: Optional[str] = None
# specifies that the value is implicitly set inside the step (i.e. not a free parameter). Typically used with filenames
implicit: Any = None
# optonal list of arbitrary tags, used to group parameters
tags: List[str] = EmptyListDefault()

Expand Down Expand Up @@ -130,20 +137,20 @@ class Parameter(object):
# policies object, specifying a non-default way to handle this parameter
policies: ParameterPolicies = ParameterPolicies()

# Parameter category, purely cosmetic, used for generating help and debug messages.
# Assigned automatically if None, but a schema may explicitly mark parameters as e.g.
# "obscure" or "hidden"
category: Optional[ParameterCategory] = None

# metavar corresponding to this parameter. Used when constructing command-line interfaces
metavar: Optional[str] = None

# abbreviated option name for this parameter. Used when constructing command-line interfaces
abbreviation: Optional[str] = None

# # inherited from Stimela 1 -- used to handle paremeters inside containers?
# # might need a re-think, but we can leave them in for now
# pattern: Optional[str] = None

# arbitrary metadata associated with parameter
metadata: Dict[str, Any] = EmptyDictDefault()


def __post_init__(self):
def natify(value):
# convert OmegaConf lists and dicts to native types
Expand All @@ -155,6 +162,16 @@ def natify(value):
self.default = natify(self.default)
self.choices = natify(self.choices)

def get_category(self):
"""Returns category of parameter, auto-setting it if not already preset"""
if self.category is None:
if self.required:
self.category = ParameterCategory.Required
elif self.implicit is not None:
self.category = ParameterCategory.Implicit
else:
self.category = ParameterCategory.Optional
return self.category

@dataclass
class Cargo(object):
Expand All @@ -171,11 +188,13 @@ class Cargo(object):

def __post_init__(self):
self.fqname = self.fqname or self.name
self.inputs = OrderedDict((name, Parameter(**schema)) for name, schema in self.inputs.items())
self.outputs = OrderedDict((name, Parameter(**schema)) for name, schema in self.outputs.items())
for name in self.inputs.keys():
if name in self.outputs:
raise DefinitionError(f"{name} appears in both inputs and outputs")
self.params = {}
self._inputs_outputs = None
self._implicit_params = set() # marks implicitly set values
# pausterized name
self.name_ = re.sub(r'\W', '_', self.name or "") # pausterized name
# config and logger objects
Expand All @@ -201,18 +220,6 @@ def inputs_outputs(self):
self._inputs_outputs.update(**self.outputs)
return self._inputs_outputs

@property
def invalid_params(self):
return [name for name, value in self.params.items() if type(value) is exceptions.Error]

@property
def missing_params(self):
return {name: schema for name, schema in self.inputs_outputs.items() if schema.required and name not in self.params}

@property
def unresolved_params(self):
return [name for name, value in self.params.items() if type(value) is Unresolved]

@property
def finalized(self):
return self.config is not None
Expand All @@ -235,33 +242,31 @@ def prevalidate(self, params: Optional[Dict[str, Any]], subst: Optional[Substitu
if self._dyn_schema:
self._inputs_outputs = None
self.inputs, self.outputs = self._dyn_schema(params, self.inputs, self.outputs)
# prevalidate parameters
self.params = validate_parameters(params, self.inputs_outputs, defaults=self.defaults, subst=subst, fqname=self.fqname,
check_unknowns=True, check_required=False, check_exist=False,
create_dirs=False, expand_globs=False, ignore_subst_errors=True)

return self.params

def _add_implicits(self, params: Dict[str, Any], schemas: Dict[str, Parameter]):
# add implicit inputs
for name, schema in schemas.items():
if schema.implicit is not None:
if name in params:
# add implicits, if resolved
for name, schema in self.inputs_outputs.items():
if schema.implicit is not None and type(schema.implicit) is not Unresolved:
if name in params and name not in self._implicit_params:
raise ParameterValidationError(f"implicit parameter {name} was supplied explicitly")
if name in self.defaults:
raise SchemaError(f"implicit parameter {name} also has a default value")
params[name] = schema.implicit
self._implicit_params.add(name)
# assign unset categories
for name, schema in self.inputs_outputs.items():
schema.get_category()

params = validate_parameters(params, self.inputs_outputs, defaults=self.defaults, subst=subst, fqname=self.fqname,
check_unknowns=True, check_required=False, check_exist=False,
create_dirs=False, ignore_subst_errors=True)

return params

def validate_inputs(self, params: Dict[str, Any], subst: Optional[SubstitutionNS]=None, loosely=False):
"""Validates inputs.
If loosely is True, then doesn't check for required parameters, and doesn't check for files to exist etc.
This is used when skipping a step.
"""
assert(self.finalized)
# add implicit inputs
params = params.copy()
self._add_implicits(params, self.inputs)
self._add_implicits(params, self.outputs)

# check inputs
params.update(**validate_parameters(params, self.inputs, defaults=self.defaults, subst=subst, fqname=self.fqname,
Expand All @@ -270,32 +275,54 @@ def validate_inputs(self, params: Dict[str, Any], subst: Optional[SubstitutionNS
# check outputs
params.update(**validate_parameters(params, self.outputs, defaults=self.defaults, subst=subst, fqname=self.fqname,
check_unknowns=False, check_required=False, check_exist=False,
create_dirs=not loosely, expand_globs=False))
self.params.update(**params)
return self.params
create_dirs=not loosely))
return params

def validate_outputs(self, params: Dict[str, Any], subst: Optional[SubstitutionNS]=None, loosely=False):
"""Validates outputs. Parameter substitution is done.
If loosely is True, then doesn't check for required parameters, and doesn't check for files to exist etc.
"""
assert(self.finalized)
# add implicit outputs
#self._add_implicits(params, self.outputs)
self.params.update(**validate_parameters(params, self.outputs, defaults=self.defaults, subst=subst, fqname=self.fqname,
params.update(**validate_parameters(params, self.outputs, defaults=self.defaults, subst=subst, fqname=self.fqname,
check_unknowns=False, check_required=not loosely, check_exist=not loosely))
return self.params


def update_parameter(self, name, value):
assert(self.finalized)
self.params[name] = value
return params

def make_substitition_namespace(self, ns=None):
def make_substitition_namespace(self, params={}):
from .substitutions import SubstitutionNS
ns = {} if ns is None else ns.copy()
ns.update(**{name: str(value) for name, value in self.params.items()})
ns.update(**{name: "MISSING" for name in self.missing_params})
return SubstitutionNS(**ns)
return SubstitutionNS(**params)

def rich_help(self, tree, max_category=ParameterCategory.Optional):
"""Generates help into a rich.tree.Tree object"""
if self.info:
tree.add("Description:").add(Markdown(self.info))
# adds tables for inputs and outputs
for io, title in (self.inputs, "inputs"), (self.outputs, "outputs"):
for cat in ParameterCategory:
schemas = [(name, schema) for name, schema in io.items() if schema.get_category() == cat]
if not schemas:
continue
if cat > max_category:
subtree = tree.add(f"[dim]{cat.name} {title}: omitting {len(schemas)}[/dim]")
continue
subtree = tree.add(f"{cat.name} {title}:")
table = Table.grid("", "", "", padding=(0,2)) # , show_header=False, show_lines=False, box=rich.box.SIMPLE)
subtree.add(table)
for name, schema in schemas:
attrs = []
default = self.defaults.get(name, schema.default)
if schema.implicit:
attrs.append(f"implicit: {schema.implicit}")
if default is not None and not isinstance(default, Unresolved):
attrs.append(f"default: {default}")
if schema.choices:
attrs.append(f"choices: {', '.join(schema.choices)}")
info = []
schema.info and info.append(rich.markup.escape(schema.info))
attrs and info.append(f"[dim]\[{rich.markup.escape(', '.join(attrs))}][/dim]")
table.add_row(f"[bold]{name}[/bold]",
f"[dim]{rich.markup.escape(str(schema.dtype))}[/dim]",
" ".join(info))


ParameterPassingMechanism = Enum("ParameterPassingMechanism", "args yaml", module=__name__)

Expand Down Expand Up @@ -370,17 +397,26 @@ def __post_init__ (self):
self._runtime_status = None


def summary(self, recursive=True):
def summary(self, params=None, recursive=True, ignore_missing=False):
lines = [f"cab {self.name}:"]
for name, value in self.params.items():
# if type(value) is validate.Error:
# lines.append(f" {name} = ERR: {value}")
# else:
lines.append(f" {name} = {value}")

lines += [f" {name} = ???" for name in self.missing_params.keys()]
if params is not None:
for name, value in params.items():
# if type(value) is validate.Error:
# lines.append(f" {name} = ERR: {value}")
# else:
lines.append(f" {name} = {value}")
lines += [f" {name} = ???" for name, schema in self.inputs_outputs.items()
if name not in params and (not ignore_missing or schema.required)]
return lines

def rich_help(self, tree, max_category=ParameterCategory.Optional):
tree.add(f"command: {self.command}")
if self.image:
tree.add(f"image: {self.image}")
if self.virtual_env:
tree.add(f"virtual environment: {self.virtual_env}")
Cargo.rich_help(self, tree, max_category=max_category)

def get_schema_policy(self, schema, policy, default=None):
"""Resolves a policy setting. If the policy is set here, returns it. If None and set in the cab,
returns that. Else returns default value.
Expand All @@ -392,7 +428,7 @@ def get_schema_policy(self, schema, policy, default=None):
else:
return default

def build_command_line(self, subst: Optional[Dict[str, Any]] = None):
def build_command_line(self, params: Dict[str, Any], subst: Optional[Dict[str, Any]] = None):
from .substitutions import substitutions_from

with substitutions_from(subst, raise_errors=True) as context:
Expand Down Expand Up @@ -423,10 +459,10 @@ def build_command_line(self, subst: Optional[Dict[str, Any]] = None):

self.log.debug(f"command is {command}")

return ([command] + args + self.build_argument_list()), venv
return ([command] + args + self.build_argument_list(params)), venv


def build_argument_list(self):
def build_argument_list(self, params):
"""
Converts command, and current dict of parameters, into a list of command-line arguments.
Expand All @@ -444,10 +480,10 @@ def build_argument_list(self):

# collect parameters

value_dict = dict(**self.params)
value_dict = dict(**params)

if self.parameter_passing is ParameterPassingMechanism.yaml:
return [yaml.dump(value_dict)]
return [yaml.safe_dump(value_dict)]

def get_policy(schema: Parameter, policy: str, default=None):
return self.get_schema_policy(schema, policy, default)
Expand Down Expand Up @@ -612,8 +648,8 @@ class Batch:
mem: str = "128gb"
email: Optional[str] = None

def __init_cab__(self, cab: Cab, subst: Optional[Dict[str, Any]], log: Any=None):
def __init_cab__(self, cab: Cab, params: Dict[str, Any], subst: Optional[Dict[str, Any]], log: Any=None):
self.cab = cab
self.log = log
self.args, self.venv = self.cab.build_command_line(subst)
self.args, self.venv = self.cab.build_command_line(params, subst)

7 changes: 6 additions & 1 deletion scabha/configuratt.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ def _resolve_config_refs(conf, pathname: str, location: str, name: str, includes
raise ConfigurattError(f"{errloc}: _include: must be a string or a list of strings")

# load includes
accum_incl_conf = OmegaConf.create()
for incl in include_files:
if not incl:
raise ConfigurattError(f"{errloc}: empty _include specifier")
Expand Down Expand Up @@ -203,7 +204,11 @@ def _resolve_config_refs(conf, pathname: str, location: str, name: str, includes
if flatten:
_flatten_subsections(incl_conf, flatten, flatten_sep)

conf = OmegaConf.merge(incl_conf, conf)
# accumulate included config so that later includes override earlier ones
accum_incl_conf = OmegaConf.unsafe_merge(accum_incl_conf, incl_conf)

# merge: our section overrides anything that has been included
conf = OmegaConf.merge(accum_incl_conf, conf)

# handle _use entries
if use_sources is not None:
Expand Down
Loading

0 comments on commit 6ff1aaf

Please sign in to comment.