Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes some substitution bugs #19

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion stimela/backends/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,14 @@ def run_callable(modulename: str, funcname: str, cab: Cab, log, subst: Optional[
"""

# import module and get function object
path0 = sys.path.copy()
sys.path.append('.')
try:
mod = importlib.import_module(modulename)
except ImportError as exc:
raise StimelaCabRuntimeError(f"can't import {modulename}: {exc}", log=log)
finally:
sys.path = path0

func = getattr(mod, funcname, None)

Expand All @@ -90,7 +94,7 @@ def run_callable(modulename: str, funcname: str, cab: Cab, log, subst: Optional[
for key, schema in cab.inputs_outputs.items():
if not schema.policies.skip:
if key in cab.params:
args[key] = cab.params
args[key] = cab.params[key]
elif cab.get_schema_policy(schema, 'pass_missing_as_none'):
args[key] = None

Expand Down
5 changes: 5 additions & 0 deletions stimela/cargo/cab/wsclean.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ outputs:
dtype: List[File]
implicit: "{current.prefix}-[0-9][0-9][0-9][0-9]-image.fits"
must_exist: false
restored_timeint:
info: Restored images per time interval
dtype: List[File]
implicit: "{current.prefix}-t[0-9][0-9][0-9][0-9]-image.fits"
must_exist: false
residual:
dtype: List[File]
implicit: "{current.prefix}-[0-9][0-9][0-9][0-9]-residual.fits"
Expand Down
48 changes: 27 additions & 21 deletions stimela/kitchen/recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,9 @@ def run(self, params=None, subst=None, batch=None):

self.log.debug(f"validating outputs")
validated = False
# insert output values into params for re-substitution and re-validation
output_params = {name: value for name, value in params.items() if name in self.cargo.outputs}

try:
params = self.cargo.validate_outputs(output_params, loosely=self.skip, subst=subst)
params = self.cargo.validate_outputs(params, loosely=self.skip, subst=subst)
validated = True
except ScabhaBaseException as exc:
level = logging.WARNING if self.skip else logging.ERROR
Expand All @@ -277,6 +275,9 @@ def run(self, params=None, subst=None, batch=None):
self.log_summary(level, "failed outputs", color="WARNING")
raise

if subst is not None:
subst.current._merge_(params)

if validated:
self.log_summary(logging.DEBUG, "validated outputs")

Expand Down Expand Up @@ -545,13 +546,12 @@ def _add_alias(self, alias_name: str, alias_target: Union[str, Tuple]):
if schema.required and not have_step_param:
existing_schema.required = True

### OMS 16/02/2022: see https://github.com/caracal-pipeline/stimela2/issues/16
### this was misguided. If an aliased parameter is set in one of the steps, it should not become implicit, i.e. the user
### should still be able to override it at recipe level.

## alias becomes implicit if any step parameter it refers to is defined (and it doesn't have its own default)
## if have_step_param and alias_name not in self.defaults:
## existing_schema.implicit = f"{step_label}.{step_param_name}"
## if our alias doesn't have its own default set, and the step has something set, then we'll propagate the value
## *from* the step to the recipe (for the first such step)
if have_step_param and alias_name not in self.defaults:
if alias_name not in self._alias_propagated_from_step:
self._alias_propagated_from_step[alias_name] = (step, step_param_name)
# all other steps will have the aliased value propagated *to* them

self._alias_map[step_label, step_param_name] = alias_name
self._alias_list.setdefault(alias_name, []).append((step, step_param_name))
Expand Down Expand Up @@ -593,6 +593,7 @@ def finalize(self, config=None, log=None, logopts=None, fqname=None, nesting=0):
# collect aliases
self._alias_map = OrderedDict()
self._alias_list = OrderedDict()
self._alias_propagated_from_step = OrderedDict()

# collect from inputs and outputs
for io in self.inputs, self.outputs:
Expand Down Expand Up @@ -684,7 +685,7 @@ def prevalidate(self, params: Optional[Dict[str, Any]], subst: Optional[Substitu
# merge again
subst.recipe._merge_(self.params)

# propagate aliases up to substeps
# propagate aliases up to/down from substeps
for name, value in self.params.items():
self._propagate_parameter(name, value)

Expand Down Expand Up @@ -739,9 +740,13 @@ def validate_inputs(self, params: Dict[str, Any], subst: Optional[SubstitutionNS
info = SubstitutionNS(fqname=self.fqname)
subst._add_('info', info, nosubst=True)
subst._add_('config', self.config, nosubst=True)
subst._add_('recipe', self.make_substitition_namespace(ns=self.assign))

# subst._add_('recipe', self.make_substitition_namespace(ns=self.assign))
# subst.recipe._merge_(params)
for name, (from_step, from_param) in self._alias_propagated_from_step.items():
if name not in params:
params[name] = from_step.cargo.params[from_param]

params = Cargo.validate_inputs(self, params, subst=subst, loosely=loosely)

Expand Down Expand Up @@ -770,13 +775,15 @@ def _link_steps(self):
step.previous_step = steps[i-2]

def _propagate_parameter(self, name, value):
### OMS: not sure why I had this, why not propagae unresolveds?
## if type(value) is not validate.Unresolved:
# check if aliased parameter is to be propagated down from a step
from_step, from_step_param_name = self._alias_propagated_from_step.get(name, (None, None))
if from_step is not None:
if from_step_param_name in from_step.cargo.params:
self.params[name] = step.cargo.params[from_step_param_name]

# propagate up to steps
for step, step_param_name in self._alias_list.get(name, []):
if self.inputs_outputs[name].implicit:
if step_param_name in step.cargo.params:
self.params[name] = step.cargo.params[name]
else:
if step is not from_step:
step.update_parameter(step_param_name, value)

def update_parameter(self, name: str, value: Any):
Expand Down Expand Up @@ -906,6 +913,7 @@ def loop_worker(inst, step, label, subst, count, iter_var):
info.fqname = step.fqname
stimelogging.update_file_logger(step.log, step.logopts, nesting=step.nesting, subst=subst, location=[step.fqname])

inst.log.info(f"{'skipping' if step.skip else 'running'} step '{label}'")
try:
#step_params = step.run(subst=subst.copy(), batch=batch) # make a copy of the subst dict since recipe might modify
step_params = step.run(subst=subst.copy()) # make a copy of the subst dict since recipe might modify
Expand All @@ -924,10 +932,8 @@ def loop_worker(inst, step, label, subst, count, iter_var):
for step1, step_param_name in inst._alias_list.get(name, []):
if step1 is step and step_param_name in step_params:
inst.params[name] = step_params[step_param_name]
# clear implicit setting
inst.outputs[name].implicit = None

inst.log.info(f"{'skipping' if step.skip else 'running'} step '{label}'")
# # clear implicit setting
# inst.outputs[name].implicit = None

loop_futures = []

Expand Down