Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 225 #229

Merged
merged 5 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "stimela"
version = "2.0rc9"
version = "2.0rc10"
description = "Framework for system agnostic pipelines for (not just) radio interferometry"
authors = ["Sphesihle Makhathini <[email protected]>", "Oleg Smirnov and RATT <[email protected]>"]
readme = "README.rst"
Expand Down
5 changes: 4 additions & 1 deletion scabha/configuratt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def load(path: str, use_sources: Optional[List[DictConfig]] = [], name: Optional
includes: bool=True, selfrefs: bool=True, include_path: str=None,
use_cache: bool = True,
no_toplevel_cache = False,
include_stack = [],
verbose: bool = False):
"""Loads config file, using a previously loaded config to resolve _use references.

Expand All @@ -30,6 +31,7 @@ def load(path: str, use_sources: Optional[List[DictConfig]] = [], name: Optional
includes (bool, optional): If True (default), "_include" references will be processed
selfrefs (bool, optional): If False, "_use" references will only be looked up in existing config.
If True (default), they'll also be looked up within the loaded config.
include_stack: list of paths which have been included. Used to catch recursive includes.
include_path (str, optional):
if set, path to each config file will be included in the section as element 'include_path'

Expand All @@ -50,7 +52,8 @@ def load(path: str, use_sources: Optional[List[DictConfig]] = [], name: Optional
if use_sources is not None and selfrefs:
use_sources = [subconf] + list(use_sources)
conf, deps = resolve_config_refs(subconf, pathname=path, location=location, name=name,
includes=includes, use_cache=use_cache, use_sources=use_sources, include_path=include_path)
includes=includes, use_cache=use_cache, use_sources=use_sources, include_path=include_path,
include_stack=include_stack + [path])
# update overall dependencies
dependencies.update(deps)

Expand Down
63 changes: 41 additions & 22 deletions scabha/configuratt/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ def _scrub_subsections(conf: DictConfig, scrubs: Union[str, List[str]]):
def resolve_config_refs(conf, pathname: str, location: str, name: str, includes: bool,
use_sources: Optional[List[DictConfig]],
use_cache = True,
include_path: Optional[str]=None):
include_path: Optional[str]=None,
include_stack=[]):
"""Resolves cross-references ("_use" and "_include" statements) in config object

Parameters
Expand All @@ -143,6 +144,8 @@ def resolve_config_refs(conf, pathname: str, location: str, name: str, includes:
one or more config object(s) in which to look up "_use" references. None to disable _use statements
include_path (str, optional):
if set, path to each config file will be included in the section as element 'include_path'
include_stack (list, optional):
stack of files from which this one was included. Used to catch recursion.

Returns
-------
Expand Down Expand Up @@ -222,29 +225,38 @@ def load_include_files(keyword):
else:
flags = {}

# check for (module)filename.yaml or (module)/filename.yaml style
match = re.match("^\\((.+)\\)/?(.+)$", incl)
# check for (location)filename.yaml or (location)/filename.yaml style
match= re.match("^\\((.+)\\)/?(.+)$", incl)
if match:
modulename, filename = match.groups()
try:
mod = importlib.import_module(modulename)
except ImportError as exc:
if 'optional' in flags:
dependencies.add_fail(FailRecord(incl, pathname, modulename=modulename, fname=filename))
if 'warn' in flags:
print(f"Warning: unable to import module for optional include {incl}")
continue
raise ConfigurattError(f"{errloc}: {keyword} {incl}: can't import {modulename} ({exc})")

filename = os.path.join(os.path.dirname(mod.__file__), filename)
if not os.path.exists(filename):
if 'optional' in flags:
dependencies.add_fail(FailRecord(incl, pathname, modulename=modulename, fname=filename))
if 'warn' in flags:
print(f"Warning: unable to find optional include {incl}")
continue
raise ConfigurattError(f"{errloc}: {keyword} {incl}: {filename} does not exist")

if modulename.startswith("."):
filename = os.path.join(os.path.dirname(pathname), modulename, filename)
if not os.path.exists(filename):
if 'optional' in flags:
dependencies.add_fail(FailRecord(filename, pathname))
if 'warn' in flags:
print(f"Warning: unable to find optional include {incl} ({filename})")
continue
raise ConfigurattError(f"{errloc}: {keyword} {incl} does not exist")
else:
try:
mod = importlib.import_module(modulename)
except ImportError as exc:
if 'optional' in flags:
dependencies.add_fail(FailRecord(incl, pathname, modulename=modulename, fname=filename))
if 'warn' in flags:
print(f"Warning: unable to import module for optional include {incl}")
continue
raise ConfigurattError(f"{errloc}: {keyword} {incl}: can't import {modulename} ({exc})")

filename = os.path.join(os.path.dirname(mod.__file__), filename)
if not os.path.exists(filename):
if 'optional' in flags:
dependencies.add_fail(FailRecord(incl, pathname, modulename=modulename, fname=filename))
if 'warn' in flags:
print(f"Warning: unable to find optional include {incl}")
continue
raise ConfigurattError(f"{errloc}: {keyword} {incl}: {filename} does not exist")
# absolute path -- one candidate
elif os.path.isabs(incl):
if not os.path.exists(incl):
Expand All @@ -270,10 +282,15 @@ def load_include_files(keyword):
continue
raise ConfigurattError(f"{errloc}: {keyword} {incl} not found in {':'.join(paths)}")

# check for recursion
for path in include_stack:
if os.path.samefile(path, filename):
raise ConfigurattError(f"{errloc}: {filename} is included recursively")
# load included file
incl_conf, deps = load(filename, location=location,
name=f"{filename}, included from {name}",
includes=True,
include_stack=include_stack,
use_cache=use_cache,
use_sources=None) # do not expand _use statements in included files, this is done below

Expand Down Expand Up @@ -351,6 +368,7 @@ def load_use_sections(keyword):
value1, deps = resolve_config_refs(value, pathname=pathname, name=name,
location=f"{location}.{key}" if location else key,
includes=includes,
include_stack=include_stack,
use_sources=use_sources, use_cache=use_cache,
include_path=include_path)
dependencies.update(deps)
Expand All @@ -366,6 +384,7 @@ def load_use_sections(keyword):
value1, deps = resolve_config_refs(value, pathname=pathname, name=name,
location=f"{location or ''}[{i}]",
includes=includes,
include_stack=include_stack,
use_sources=use_sources, use_cache=use_cache,
include_path=include_path)
dependencies.update(deps)
Expand Down
5 changes: 5 additions & 0 deletions scabha/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,11 @@ def validate_parameters(params: Dict[str, Any], schemas: Dict[str, Any],
schema = schemas[name]
if schema.choices and value not in schema.choices:
raise ParameterValidationError(f"{mkname(name)}: invalid value '{value}'")
if schema.element_choices:
listval = value if isinstance(value, (list, tuple, ListConfig)) else [value]
for elem in listval:
if elem not in schema.element_choices:
raise ParameterValidationError(f"{mkname(name)}: invalid list element '{elem}'")

# check for mkdir directives
if create_dirs:
Expand Down
8 changes: 7 additions & 1 deletion stimela/backends/kube/infrastructure.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from typing import Optional, Dict, List

from stimela.backends import StimelaBackendOptions
from stimela.stimelogging import log_exception, declare_chapter, update_process_status
from stimela.stimelogging import log_exception, declare_chapter
from stimela.task_stats import update_process_status
from scabha.basetypes import EmptyListDefault

from stimela.exceptions import BackendError
Expand Down Expand Up @@ -55,6 +56,11 @@ def init(backend: StimelaBackendOptions, log: logging.Logger, cleanup: bool = Fa
klog = log.getChild("kube")
kube = backend.kube

if not kube.namespace:
klog.error("kube.namespace not configured, kube backend will not be available")
return False


if cleanup:
klog.info("cleaning up backend")
else:
Expand Down
2 changes: 1 addition & 1 deletion stimela/backends/kube/pod_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import threading
from stimela.exceptions import BackendError
from stimela.utils.xrun_asyncio import dispatch_to_log
from stimela.stimelogging import update_process_status
from stimela.task_stats import update_process_status
from stimela.kitchen.cab import Cab, Parameter

from kubernetes.client import ApiException
Expand Down
2 changes: 1 addition & 1 deletion stimela/backends/kube/run_kube.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from stimela.utils.xrun_asyncio import dispatch_to_log
from stimela.exceptions import StimelaCabParameterError, StimelaCabRuntimeError, BackendError
from stimela.stimelogging import log_exception
from stimela.stimelogging import declare_subcommand, declare_subtask, update_process_status
from stimela.task_stats import declare_subcommand, declare_subtask, update_process_status
from stimela.backends import StimelaBackendOptions
from stimela.kitchen.cab import Cab

Expand Down
54 changes: 18 additions & 36 deletions stimela/commands/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,51 +9,33 @@
from stimela.kitchen.recipe import Recipe
from stimela.exceptions import RecipeValidationError, BackendError

from .run import load_recipe_file
from .run import load_recipe_files

@cli.command("cleanup",
help="""
Cleans up backend resources associated with recipe(s).
""")
@click.argument("items", nargs=-1, metavar="filename.yml|recipe name|...")
@click.argument("items", nargs=-1, metavar="filename.yml...")
def cleanup(items: List[str] = []):

log = logger()

for item in items:
# a filename -- treat it as a config
if os.path.isfile(item):
log.info(f"loading recipe/config {item}")
# load all recipe/config files
# load config and recipes from all given files
load_recipe_files(items)

# if file contains a recipe entry, treat it as a full config (that can include cabs etc.)
conf, recipe_deps = load_recipe_file(item)

# anything that is not a standard config section will be treated as a recipe
recipes = [name for name in conf if name not in stimela.CONFIG]

if len(recipes) == 1:
default_recipe = recipes[0]

for name in recipes:
try:
# cast section to Recipe and remove from loaded conf
recipe = OmegaConf.structured(Recipe)
recipe = OmegaConf.unsafe_merge(recipe, conf[name])
except Exception as exc:
log.error(f"recipe '{name}': {exc}")
sys.exit(2)
del conf[name]
# add to global namespace
stimela.CONFIG.lib.recipes[name] = recipe

# the rest is safe to merge into config as is
stimela.CONFIG = OmegaConf.unsafe_merge(stimela.CONFIG, conf)

# now cleanup backend
try:
backend = OmegaConf.to_object(stimela.CONFIG.opts.backend)
stimela.backends.cleanup_backends(backend, log)
except BackendError as exc:
log_exception(exc)
return 1
backends_list = stimela.CONFIG.opts.backend.select
if type(backends_list) is str:
backends_list = [backends_list]
if not backends_list:
log.info(f"configuration does not specify any backends, nothing to clean up")
else:
log.info(f"invoking cleanup procedure, selected backends: {', '.join(backends_list)}")
try:
backend = OmegaConf.to_object(stimela.CONFIG.opts.backend)
stimela.backends.cleanup_backends(backend, log)
except BackendError as exc:
log_exception(exc)
return 1

77 changes: 30 additions & 47 deletions stimela/commands/doc.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from stimela.config import ConfigExceptionTypes
from stimela.exceptions import RecipeValidationError

from .run import load_recipe_file
from .run import load_recipe_files

@cli.command("doc",
help="""
Expand Down Expand Up @@ -66,56 +66,39 @@ def load_recipe(name: str, section: Dict):
log_exception(exc)
sys.exit(2)


# load all recipe/config files
files_to_load = []
names_to_document = []
for item in items:
# a filename -- treat it as a config
if os.path.isfile(item):
log.info(f"loading recipe/config {item}")

# if file contains a recipe entry, treat it as a full config (that can include cabs etc.)
conf, recipe_deps = load_recipe_file(item)

# anything that is not a standard config section will be treated as a recipe
recipes = [name for name in conf if name not in stimela.CONFIG]

if len(recipes) == 1:
default_recipe = recipes[0]

for name in recipes:
try:
# cast section to Recipe and remove from loaded conf
recipe = OmegaConf.structured(Recipe)
recipe = OmegaConf.unsafe_merge(recipe, conf[name])
except Exception as exc:
log.error(f"recipe '{name}': {exc}")
sys.exit(2)
del conf[name]
# add to global namespace
stimela.CONFIG.lib.recipes[name] = recipe

# the rest is safe to merge into config as is
stimela.CONFIG = OmegaConf.unsafe_merge(stimela.CONFIG, conf)

# else treat as a wildcard for recipe names or cab names
if os.path.isfile(item) and os.path.splitext(item)[1].lower() in (".yml", ".yaml"):
files_to_load.append(item)
log.info(f"will load recipe/config file '{item}'")
else:
recipe_names = fnmatch.filter(stimela.CONFIG.lib.recipes.keys(), item)
cab_names = fnmatch.filter(stimela.CONFIG.cabs.keys(), item)
if not recipe_names and not cab_names:
log.error(f"'{item}' does not match any files, recipes or cab names. Try -l/--list")
sys.exit(2)
names_to_document.append(item)

# load config and recipes from all given files
if files_to_load:
load_recipe_files(files_to_load)

for item in names_to_document:
recipe_names = fnmatch.filter(stimela.CONFIG.lib.recipes.keys(), item)
cab_names = fnmatch.filter(stimela.CONFIG.cabs.keys(), item)
if not recipe_names and not cab_names:
log.error(f"'{item}' does not match any files, recipes or cab names. Try -l/--list")
sys.exit(2)

for name in recipe_names:
recipe = load_recipe(name, stimela.CONFIG.lib.recipes[name])
tree = top_tree.add(f"Recipe: [bold]{name}[/bold]")
recipe.rich_help(tree, max_category=max_category)
for name in cab_names:
cab = Cab(**stimela.CONFIG.cabs[name])
cab.finalize(config=stimela.CONFIG)
tree = top_tree.add(f"Cab: [bold]{name}[/bold]")
cab.rich_help(tree, max_category=max_category)
for name in recipe_names:
recipe = load_recipe(name, stimela.CONFIG.lib.recipes[name])
tree = top_tree.add(f"Recipe: [bold]{name}[/bold]")
recipe.rich_help(tree, max_category=max_category)

for name in cab_names:
cab = Cab(**stimela.CONFIG.cabs[name])
cab.finalize(config=stimela.CONFIG)
tree = top_tree.add(f"Cab: [bold]{name}[/bold]")
cab.rich_help(tree, max_category=max_category)

found_something = True
found_something = True

if do_list or (not found_something and not default_recipe):

Expand Down
Loading
Loading