diff --git a/pydra/__init__.py b/pydra/__init__.py index 34e1b2578..1422e46fd 100644 --- a/pydra/__init__.py +++ b/pydra/__init__.py @@ -15,14 +15,13 @@ import attr from . import mark -from .engine import AuditFlag, DockerTask, ShellCommandTask, Submitter, Workflow, specs +from .engine import AuditFlag, ShellCommandTask, Submitter, Workflow, specs __all__ = ( "Submitter", "Workflow", "AuditFlag", "ShellCommandTask", - "DockerTask", "specs", "mark", ) diff --git a/pydra/engine/__init__.py b/pydra/engine/__init__.py index 411af8a1d..9f46b7ea3 100644 --- a/pydra/engine/__init__.py +++ b/pydra/engine/__init__.py @@ -1,12 +1,11 @@ """The core of the workflow engine.""" from .submitter import Submitter from .core import Workflow -from .task import AuditFlag, ShellCommandTask, DockerTask +from .task import AuditFlag, ShellCommandTask from . import specs __all__ = [ "AuditFlag", - "DockerTask", "ShellCommandTask", "Submitter", "Workflow", diff --git a/pydra/engine/core.py b/pydra/engine/core.py index 26e122bbd..67b662741 100644 --- a/pydra/engine/core.py +++ b/pydra/engine/core.py @@ -429,7 +429,13 @@ def cont_dim(self, cont_dim): self._cont_dim = cont_dim def __call__( - self, submitter=None, plugin=None, plugin_kwargs=None, rerun=False, **kwargs + self, + submitter=None, + plugin=None, + plugin_kwargs=None, + rerun=False, + environment=None, + **kwargs, ): """Make tasks callable themselves.""" from .submitter import Submitter @@ -449,9 +455,9 @@ def __call__( if submitter: with submitter as sub: self.inputs = attr.evolve(self.inputs, **kwargs) - res = sub(self) + res = sub(self, environment=environment) else: # tasks without state could be run without a submitter - res = self._run(rerun=rerun, **kwargs) + res = self._run(rerun=rerun, environment=environment, **kwargs) return res def _modify_inputs(self): @@ -501,7 +507,7 @@ def _populate_filesystem(self, checksum, output_dir): shutil.rmtree(output_dir) output_dir.mkdir(parents=False, exist_ok=self.can_resume) - def _run(self, rerun=False, **kwargs): + def _run(self, rerun=False, environment=None, **kwargs): self.inputs = attr.evolve(self.inputs, **kwargs) self.inputs.check_fields_input_spec() @@ -518,6 +524,7 @@ def _run(self, rerun=False, **kwargs): return result cwd = os.getcwd() self._populate_filesystem(checksum, output_dir) + os.chdir(output_dir) orig_inputs = self._modify_inputs() result = Result(output=None, runtime=None, errored=False) self.hooks.pre_run_task(self) @@ -526,7 +533,7 @@ def _run(self, rerun=False, **kwargs): self.audit.audit_task(task=self) try: self.audit.monitor() - self._run_task() + self._run_task(environment=environment) result.output = self._collect_outputs(output_dir=output_dir) except Exception: etype, eval, etr = sys.exc_info() @@ -538,7 +545,6 @@ def _run(self, rerun=False, **kwargs): self.hooks.post_run_task(self, result) self.audit.finalize_audit(result) save(output_dir, result=result, task=self) - self.output_ = None # removing the additional file with the chcksum (self.cache_dir / f"{self.uid}_info.json").unlink() # # function etc. shouldn't change anyway, so removing @@ -551,15 +557,14 @@ def _run(self, rerun=False, **kwargs): return result def _collect_outputs(self, output_dir): - run_output = self.output_ output_klass = make_klass(self.output_spec) output = output_klass( **{f.name: attr.NOTHING for f in attr.fields(output_klass)} ) other_output = output.collect_additional_outputs( - self.inputs, output_dir, run_output + self.inputs, output_dir, self.output_ ) - return attr.evolve(output, **run_output, **other_output) + return attr.evolve(output, **self.output_, **other_output) def split( self, diff --git a/pydra/engine/environments.py b/pydra/engine/environments.py new file mode 100644 index 000000000..3c60927bc --- /dev/null +++ b/pydra/engine/environments.py @@ -0,0 +1,157 @@ +from .helpers import execute + +from pathlib import Path + + +class Environment: + """ + Base class for environments that are used to execute tasks. + Right now it is asssumed that the environment, including container images, + are available and are not removed at the end + TODO: add setup and teardown methods + """ + + def setup(self): + pass + + def execute(self, task): + """ + Execute the task in the environment. + + Parameters + ---------- + task : TaskBase + the task to execute + + Returns + ------- + output + Output of the task. + """ + raise NotImplementedError + + def teardown(self): + pass + + +class Native(Environment): + """ + Native environment, i.e. the tasks are executed in the current python environment. + """ + + def execute(self, task): + keys = ["return_code", "stdout", "stderr"] + values = execute(task.command_args(), strip=task.strip) + output = dict(zip(keys, values)) + if output["return_code"]: + msg = f"Error running '{task.name}' task with {task.command_args()}:" + if output["stderr"]: + msg += "\n\nstderr:\n" + output["stderr"] + if output["stdout"]: + msg += "\n\nstdout:\n" + output["stdout"] + raise RuntimeError(msg) + return output + + +class Container(Environment): + """ + Base class for container environments used by Docker and Singularity. + + Parameters + ---------- + image : str + Name of the container image + tag : str + Tag of the container image + root : str + Base path for mounting host directories into the container + xargs : Union[str, List[str]] + Extra arguments to be passed to the container + """ + + def __init__(self, image, tag="latest", root="/mnt/pydra", xargs=None): + self.image = image + self.tag = tag + if xargs is None: + xargs = [] + elif isinstance(xargs, str): + xargs = xargs.split() + self.xargs = xargs + self.root = root + + def bind(self, loc, mode="ro"): + loc_abs = Path(loc).absolute() + return f"{loc_abs}:{self.root}{loc_abs}:{mode}" + + +class Docker(Container): + """Docker environment.""" + + def execute(self, task): + docker_img = f"{self.image}:{self.tag}" + # mounting all input locations + mounts = task.get_bindings(root=self.root) + + docker_args = [ + "docker", + "run", + "-v", + self.bind(task.cache_dir, "rw"), + *self.xargs, + ] + docker_args.extend( + " ".join( + [f"-v {key}:{val[0]}:{val[1]}" for (key, val) in mounts.items()] + ).split() + ) + docker_args.extend(["-w", f"{self.root}{task.output_dir}"]) + keys = ["return_code", "stdout", "stderr"] + + values = execute( + docker_args + [docker_img] + task.command_args(root=self.root), + strip=task.strip, + ) + output = dict(zip(keys, values)) + if output["return_code"]: + if output["stderr"]: + raise RuntimeError(output["stderr"]) + else: + raise RuntimeError(output["stdout"]) + return output + + +class Singularity(Container): + """Singularity environment.""" + + def execute(self, task): + singularity_img = f"{self.image}:{self.tag}" + # mounting all input locations + mounts = task.get_bindings(root=self.root) + + # todo adding xargsy etc + singularity_args = [ + "singularity", + "exec", + "-B", + self.bind(task.cache_dir, "rw"), + *self.xargs, + ] + singularity_args.extend( + " ".join( + [f"-B {key}:{val[0]}:{val[1]}" for (key, val) in mounts.items()] + ).split() + ) + singularity_args.extend(["--pwd", f"{self.root}{task.output_dir}"]) + keys = ["return_code", "stdout", "stderr"] + + values = execute( + singularity_args + [singularity_img] + task.command_args(root=self.root), + strip=task.strip, + ) + output = dict(zip(keys, values)) + if output["return_code"]: + if output["stderr"]: + raise RuntimeError(output["stderr"]) + else: + raise RuntimeError(output["stdout"]) + return output diff --git a/pydra/engine/specs.py b/pydra/engine/specs.py index c31705be7..a8a4a69b7 100644 --- a/pydra/engine/specs.py +++ b/pydra/engine/specs.py @@ -676,37 +676,6 @@ def _check_requires(self, fld, inputs): return False -@attr.s(auto_attribs=True, kw_only=True) -class ContainerSpec(ShellSpec): - """Refine the generic command-line specification to container execution.""" - - image: ty.Union[File, str] = attr.ib( - metadata={"help_string": "image", "mandatory": True} - ) - """The image to be containerized.""" - container: ty.Union[File, str, None] = attr.ib( - metadata={"help_string": "container"} - ) - """The container.""" - container_xargs: ty.Optional[ty.List[str]] = attr.ib( - default=None, metadata={"help_string": "todo"} - ) - - -@attr.s(auto_attribs=True, kw_only=True) -class DockerSpec(ContainerSpec): - """Particularize container specifications to the Docker engine.""" - - container: str = attr.ib("docker", metadata={"help_string": "container"}) - - -@attr.s(auto_attribs=True, kw_only=True) -class SingularitySpec(ContainerSpec): - """Particularize container specifications to Singularity.""" - - container: str = attr.ib("singularity", metadata={"help_string": "container type"}) - - @attr.s class LazyInterface: _task: "core.TaskBase" = attr.ib() diff --git a/pydra/engine/submitter.py b/pydra/engine/submitter.py index 9327e0604..e4a2a0102 100644 --- a/pydra/engine/submitter.py +++ b/pydra/engine/submitter.py @@ -35,14 +35,16 @@ def __init__(self, plugin="cf", **kwargs): raise NotImplementedError(f"No worker for {self.plugin}") self.worker.loop = self.loop - def __call__(self, runnable, cache_locations=None, rerun=False): + def __call__(self, runnable, cache_locations=None, rerun=False, environment=None): """Submitter run function.""" if cache_locations is not None: runnable.cache_locations = cache_locations - self.loop.run_until_complete(self.submit_from_call(runnable, rerun)) + self.loop.run_until_complete( + self.submit_from_call(runnable, rerun, environment) + ) return runnable.result() - async def submit_from_call(self, runnable, rerun): + async def submit_from_call(self, runnable, rerun, environment): """ This coroutine should only be called once per Submitter call, and serves as the bridge between sync/async lands. @@ -56,7 +58,7 @@ async def submit_from_call(self, runnable, rerun): Once Python 3.10 is the minimum, this should probably be refactored into using structural pattern matching. """ - if is_workflow(runnable): + if is_workflow(runnable): # TODO: env to wf # connect and calculate the checksum of the graph before running runnable._connect_and_propagate_to_tasks(override_task_caches=True) # 0 @@ -74,10 +76,11 @@ async def submit_from_call(self, runnable, rerun): # 2 if runnable.state is None: # run_el should always return a coroutine - await self.worker.run_el(runnable, rerun=rerun) + print("in SUBM", environment) + await self.worker.run_el(runnable, rerun=rerun, environment=environment) # 3 else: - await self.expand_runnable(runnable, wait=True, rerun=rerun) + await self.expand_runnable(runnable, wait=True, rerun=rerun) # TODO return True async def expand_runnable(self, runnable, wait=False, rerun=False): diff --git a/pydra/engine/task.py b/pydra/engine/task.py index 1607e616b..f977a3240 100644 --- a/pydra/engine/task.py +++ b/pydra/engine/task.py @@ -38,6 +38,8 @@ `__ """ +from __future__ import annotations + import platform import re import attr @@ -55,21 +57,18 @@ SpecInfo, ShellSpec, ShellOutSpec, - ContainerSpec, - DockerSpec, - SingularitySpec, attr_fields, ) from .helpers import ( ensure_list, - execute, position_sort, argstr_formatting, output_from_inputfields, parse_copyfile, ) -from .helpers_file import template_update, is_local_file +from .helpers_file import template_update from ..utils.typing import TypeParser +from .environments import Native class FunctionTask(TaskBase): @@ -195,7 +194,7 @@ def __init__( self.output_spec = output_spec - def _run_task(self): + def _run_task(self, environment=None): inputs = attr.asdict(self.inputs, recurse=False) del inputs["_func"] self.output_ = None @@ -223,33 +222,6 @@ class ShellCommandTask(TaskBase): input_spec = None output_spec = None - def __new__(cls, container_info=None, *args, **kwargs): - if not container_info: - return super().__new__(cls) - - if len(container_info) == 2: - type_cont, image = container_info - else: - raise Exception( - f"container_info has to have 2 elements, but {container_info} provided" - ) - - if type_cont == "docker": - # changing base class of spec if user defined - if "input_spec" in kwargs: - kwargs["input_spec"].bases = (DockerSpec,) - return DockerTask(image=image, *args, **kwargs) - elif type_cont == "singularity": - # changing base class of spec if user defined - if "input_spec" in kwargs: - kwargs["input_spec"].bases = (SingularitySpec,) - return SingularityTask(image=image, *args, **kwargs) - else: - raise Exception( - f"first element of container_info has to be " - f"docker or singularity, but {container_info[0]} provided" - ) - def __init__( self, audit_flags: AuditFlag = AuditFlag.NONE, @@ -262,6 +234,7 @@ def __init__( output_spec: ty.Optional[SpecInfo] = None, rerun=False, strip=False, + environment=Native(), **kwargs, ): """ @@ -329,9 +302,33 @@ def __init__( rerun=rerun, ) self.strip = strip + self.environment = environment + self.bindings = {} + self.inputs_mod_root = {} - @property - def command_args(self): + def get_bindings(self, root: str | None = None) -> dict[str, tuple[str, str]]: + """Return bindings necessary to run task in an alternative root. + + This is primarily intended for contexts when a task is going + to be run in a container with mounted volumes. + + Arguments + --------- + root: str + + Returns + ------- + bindings: dict + Mapping from paths in the host environment to the target environment + """ + + if root is None: + return {} + else: + self._prepare_bindings(root=root) + return self.bindings + + def command_args(self, root=None): """Get command line arguments""" if is_lazy(self.inputs): raise Exception("can't return cmdline, self.inputs has LazyFields") @@ -344,10 +341,7 @@ def command_args(self): pos_args = [] # list for (position, command arg) self._positions_provided = [] - for field in attr_fields( - self.inputs, - exclude_names=("container", "image", "container_xargs"), - ): + for field in attr_fields(self.inputs): name, meta = field.name, field.metadata if ( getattr(self.inputs, name) is attr.NOTHING @@ -362,7 +356,10 @@ def command_args(self): if pos_val: pos_args.append(pos_val) else: - pos_val = self._command_pos_args(field) + if name in modified_inputs: + pos_val = self._command_pos_args(field, root=root) + else: + pos_val = self._command_pos_args(field) if pos_val: pos_args.append(pos_val) @@ -399,7 +396,7 @@ def _command_shelltask_args(self, field): else: return pos, ensure_list(value, tuple2list=True) - def _command_pos_args(self, field): + def _command_pos_args(self, field, root=None): """ Checking all additional input fields, setting pos to None, if position not set. Creating a list with additional parts of the command that comes from @@ -428,6 +425,13 @@ def _command_pos_args(self, field): pos += 1 if pos >= 0 else -1 value = self._field_value(field, check_file=True) + + if value: + if field.name in self.inputs_mod_root: + value = self.inputs_mod_root[field.name] + elif root: # values from templates + value = value.replace(str(self.output_dir), f"{root}{self.output_dir}") + if field.metadata.get("readonly", False) and value is not None: raise Exception(f"{field.name} is read only, the value can't be provided") elif ( @@ -519,13 +523,9 @@ def cmdline(self): self.inputs.check_fields_input_spec() if self.state: raise NotImplementedError - if isinstance(self, ContainerTask): - command_args = self.container_args + self.command_args - else: - command_args = self.command_args - # Skip the executable, which can be a multipart command, e.g. 'docker run'. - cmdline = command_args[0] - for arg in command_args[1:]: + # Skip the executable, which can be a multi-part command, e.g. 'docker run'. + cmdline = self.command_args()[0] + for arg in self.command_args()[1:]: # If there are spaces in the arg, and it is not enclosed by matching # quotes, add quotes to escape the space. Not sure if this should # be expanded to include other special characters apart from spaces @@ -535,318 +535,34 @@ def cmdline(self): cmdline += " " + arg return cmdline - def _run_task(self): - self.output_ = None - if isinstance(self, ContainerTask): - args = self.container_args + self.command_args - else: - args = self.command_args - if args: - # removing empty strings - args = [str(el) for el in args if el not in ["", " "]] - keys = ["return_code", "stdout", "stderr"] - values = execute(args, strip=self.strip) - self.output_ = dict(zip(keys, values)) - if self.output_["return_code"]: - msg = f"Error running '{self.name}' task with {args}:" - if self.output_["stderr"]: - msg += "\n\nstderr:\n" + self.output_["stderr"] - if self.output_["stdout"]: - msg += "\n\nstdout:\n" + self.output_["stdout"] - raise RuntimeError(msg) - - DEFAULT_COPY_COLLATION = FileSet.CopyCollation.adjacent - - -class ContainerTask(ShellCommandTask): - """Extend shell command task for containerized execution.""" - - def __init__( - self, - name, - audit_flags: AuditFlag = AuditFlag.NONE, - cache_dir=None, - input_spec: ty.Optional[SpecInfo] = None, - messenger_args=None, - messengers=None, - output_cpath="/output_pydra", - output_spec: ty.Optional[SpecInfo] = None, - rerun=False, - strip=False, - **kwargs, - ): - """ - Initialize this task. - - Parameters - ---------- - name : :obj:`str` - Name of this task. - audit_flags : :obj:`pydra.utils.messenger.AuditFlag` - Auditing configuration - cache_dir : :obj:`os.pathlike` - Cache directory - input_spec : :obj:`pydra.engine.specs.SpecInfo` - Specification of inputs. - messenger_args : - TODO - messengers : - TODO - output_cpath : :obj:`str` - Output path within the container filesystem. - output_spec : :obj:`pydra.engine.specs.BaseSpec` - Specification of inputs. - strip : :obj:`bool` - TODO - - """ - if input_spec is None: - input_spec = SpecInfo(name="Inputs", fields=[], bases=(ContainerSpec,)) - self.output_cpath = Path(output_cpath) - self.bindings = {} - super().__init__( - name=name, - input_spec=input_spec, - output_spec=output_spec, - audit_flags=audit_flags, - messengers=messengers, - messenger_args=messenger_args, - cache_dir=cache_dir, - strip=strip, - rerun=rerun, - **kwargs, - ) - - def _field_value(self, field, check_file=False): - """ - Checking value of the specific field, if value is not set, None is returned. - If check_file is True, checking if field is a local file - and settings bindings if needed. - """ - value = super()._field_value(field) - if value and check_file and is_local_file(field): - # changing path to the cpath (the directory should be mounted) - lpath = Path(str(value)) - cdir = self.bind_paths()[lpath.parent][0] - cpath = cdir.joinpath(lpath.name) - value = str(cpath) - return value + def _run_task(self, environment=None): + if environment is None: + environment = self.environment + self.output_ = environment.execute(self) - def container_check(self, container_type): - """Get container-specific CLI arguments.""" - if self.inputs.container is None: - raise AttributeError("Container software is not specified") - elif self.inputs.container != container_type: - raise AttributeError( - f"Container type should be {container_type}, but {self.inputs.container} given" - ) - if self.inputs.image is attr.NOTHING: - raise AttributeError("Container image is not specified") + def _prepare_bindings(self, root: str): + """Prepare input files to be passed to the task - def bind_paths(self): - """Get bound mount points - - Returns - ------- - mount points: dict - mapping from local path to tuple of container path + mode + This updates the ``bindings`` attribute of the current task to make files available + in an ``Environment``-defined ``root``. """ - self._check_inputs() - return {**self.bindings, **{self.output_dir: (self.output_cpath, "rw")}} - - def binds(self, opt): - """ - Specify mounts to bind from local filesystems to container and working directory. - - Uses py:meth:`bind_paths` - - """ - bargs = [] - for lpath, (cpath, mode) in self.bind_paths().items(): - bargs.extend([opt, f"{lpath}:{cpath}:{mode}"]) - return bargs - - def _check_inputs(self): - fields = attr_fields(self.inputs) - for fld in fields: + for fld in attr_fields(self.inputs): if TypeParser.contains_type(FileSet, fld.type): - assert not fld.metadata.get( - "container_path" - ) # <-- Is container_path necessary, container paths should just be typed PurePath - if fld.name == "image": # <-- What is the image about? - continue fileset = getattr(self.inputs, fld.name) - copy_mode, _ = parse_copyfile(fld) - container_path = Path(f"/pydra_inp_{fld.name}") - self.bindings[fileset.parent] = ( - container_path, - "rw" if copy_mode == FileSet.CopyMode.copy else "ro", - ) - - SUPPORTED_COPY_MODES = FileSet.CopyMode.any - FileSet.CopyMode.symlink - + copy = parse_copyfile(fld)[0] == FileSet.CopyMode.copy -class DockerTask(ContainerTask): - """Extend shell command task for containerized execution with the Docker Engine.""" + host_path, env_path = fileset.parent, Path(f"{root}{fileset.parent}") - init = False + # Default to mounting paths as read-only, but respect existing modes + old_mode = self.bindings.get(host_path, ("", "ro"))[1] + self.bindings[host_path] = (env_path, "rw" if copy else old_mode) - def __init__( - self, - name=None, - audit_flags: AuditFlag = AuditFlag.NONE, - cache_dir=None, - input_spec: ty.Optional[SpecInfo] = None, - messenger_args=None, - messengers=None, - output_cpath="/output_pydra", - output_spec: ty.Optional[SpecInfo] = None, - rerun=False, - strip=False, - **kwargs, - ): - """ - Initialize this task. - - Parameters - ---------- - name : :obj:`str` - Name of this task. - audit_flags : :obj:`pydra.utils.messenger.AuditFlag` - Auditing configuration - cache_dir : :obj:`os.pathlike` - Cache directory - input_spec : :obj:`pydra.engine.specs.SpecInfo` - Specification of inputs. - messenger_args : - TODO - messengers : - TODO - output_cpath : :obj:`str` - Output path within the container filesystem. - output_spec : :obj:`pydra.engine.specs.BaseSpec` - Specification of inputs. - strip : :obj:`bool` - TODO - - """ - if not self.init: - if input_spec is None: - input_spec = SpecInfo(name="Inputs", fields=[], bases=(DockerSpec,)) - super().__init__( - name=name, - input_spec=input_spec, - output_spec=output_spec, - audit_flags=audit_flags, - messengers=messengers, - messenger_args=messenger_args, - cache_dir=cache_dir, - strip=strip, - output_cpath=output_cpath, - rerun=rerun, - **kwargs, - ) - self.inputs.container_xargs = ["--rm"] - self.init = True - - @property - def container_args(self): - """Get container-specific CLI arguments, returns a list if the task has a state""" - if is_lazy(self.inputs): - raise Exception("can't return container_args, self.inputs has LazyFields") - self.container_check("docker") - if self.state: - raise NotImplementedError - - cargs = ["docker", "run"] - if self.inputs.container_xargs is not None: - cargs.extend(self.inputs.container_xargs) - - cargs.extend(self.binds("-v")) - cargs.extend(["-w", str(self.output_cpath)]) - cargs.append(self.inputs.image) - - return cargs - - -class SingularityTask(ContainerTask): - """Extend shell command task for containerized execution with Singularity.""" - - init = False - - def __init__( - self, - name=None, - audit_flags: AuditFlag = AuditFlag.NONE, - cache_dir=None, - input_spec: ty.Optional[SpecInfo] = None, - messenger_args=None, - messengers=None, - output_spec: ty.Optional[SpecInfo] = None, - rerun=False, - strip=False, - **kwargs, - ): - """ - Initialize this task. - - Parameters - ---------- - name : :obj:`str` - Name of this task. - audit_flags : :obj:`pydra.utils.messenger.AuditFlag` - Auditing configuration - cache_dir : :obj:`os.pathlike` - Cache directory - input_spec : :obj:`pydra.engine.specs.SpecInfo` - Specification of inputs. - messenger_args : - TODO - messengers : - TODO - output_spec : :obj:`pydra.engine.specs.BaseSpec` - Specification of inputs. - strip : :obj:`bool` - TODO - - """ - if not self.init: - if input_spec is None: - input_spec = SpecInfo( - name="Inputs", fields=[], bases=(SingularitySpec,) + # Provide in-container paths without type-checking + self.inputs_mod_root[fld.name] = tuple( + env_path / rel for rel in fileset.relative_fspaths ) - super().__init__( - name=name, - input_spec=input_spec, - output_spec=output_spec, - audit_flags=audit_flags, - messengers=messengers, - messenger_args=messenger_args, - cache_dir=cache_dir, - strip=strip, - rerun=rerun, - **kwargs, - ) - self.init = True - @property - def container_args(self): - """Get container-specific CLI arguments.""" - if is_lazy(self.inputs): - raise Exception("can't return container_args, self.inputs has LazyFields") - self.container_check("singularity") - if self.state: - raise NotImplementedError - - cargs = ["singularity", "exec"] - - if self.inputs.container_xargs is not None: - cargs.extend(self.inputs.container_xargs) - - cargs.extend(self.binds("-B")) - cargs.extend(["--pwd", str(self.output_cpath)]) - cargs.append(self.inputs.image) - return cargs + DEFAULT_COPY_COLLATION = FileSet.CopyCollation.adjacent def split_cmd(cmd: str): diff --git a/pydra/engine/tests/test_dockertask.py b/pydra/engine/tests/test_dockertask.py index 479b09556..5ccf37e29 100644 --- a/pydra/engine/tests/test_dockertask.py +++ b/pydra/engine/tests/test_dockertask.py @@ -2,11 +2,12 @@ import pytest import attr -from ..task import DockerTask, ShellCommandTask +from ..task import ShellCommandTask from ..submitter import Submitter from ..core import Workflow -from ..specs import ShellOutSpec, SpecInfo, File, DockerSpec, ShellSpec -from .utils import no_win, need_docker +from ..specs import ShellOutSpec, SpecInfo, File, ShellSpec +from ..environments import Docker +from .utils import no_win, need_docker, result_submitter, result_no_submitter @no_win @@ -16,13 +17,13 @@ def test_docker_1_nosubm(): no submitter """ cmd = "whoami" - docky = DockerTask(name="docky", executable=cmd, image="busybox") - assert docky.inputs.image == "busybox" - assert docky.inputs.container == "docker" - assert ( - docky.cmdline - == f"docker run --rm -v {docky.output_dir}:/output_pydra:rw -w /output_pydra {docky.inputs.image} {cmd}" + docky = ShellCommandTask( + name="docky", executable=cmd, environment=Docker(image="busybox") ) + assert docky.environment.image == "busybox" + assert docky.environment.tag == "latest" + assert isinstance(docky.environment, Docker) + assert docky.cmdline == cmd res = docky() assert res.output.stdout == "root\n" @@ -36,7 +37,9 @@ def test_docker_1(plugin): using submitter """ cmd = "whoami" - docky = DockerTask(name="docky", executable=cmd, image="busybox") + docky = ShellCommandTask( + name="docky", executable=cmd, environment=Docker(image="busybox") + ) with Submitter(plugin=plugin) as sub: docky(submitter=sub) @@ -48,568 +51,68 @@ def test_docker_1(plugin): @no_win @need_docker -def test_docker_1_dockerflag(plugin): - """simple command in a container, a default bindings and working directory is added - using ShellComandTask with container_info=("docker", image) - """ - cmd = "whoami" - shocky = ShellCommandTask( - name="shocky", executable=cmd, container_info=("docker", "busybox") - ) - - with Submitter(plugin=plugin) as sub: - shocky(submitter=sub) - - res = shocky.result() - assert res.output.stdout == "root\n" - assert res.output.return_code == 0 - - -@no_win -@need_docker -def test_docker_1_dockerflag_exception(plugin): - """using ShellComandTask with container_info=("docker"), no image provided""" - cmd = "whoami" - with pytest.raises(Exception) as excinfo: - ShellCommandTask(name="shocky", executable=cmd, container_info=("docker")) - assert "container_info has to have 2 elements" in str(excinfo.value) - - -@no_win -@need_docker -def test_docker_2_nosubm(): - """a command with arguments, cmd and args given as executable - no submitter - """ - cmd = ["echo", "hail", "pydra"] - docky = DockerTask(name="docky", executable=cmd, image="busybox") - assert ( - docky.cmdline - == f"docker run --rm -v {docky.output_dir}:/output_pydra:rw -w /output_pydra {docky.inputs.image} {' '.join(cmd)}" - ) - - res = docky() - assert res.output.stdout.strip() == " ".join(cmd[1:]) - assert res.output.return_code == 0 - - -@no_win -@need_docker -def test_docker_2(plugin): - """a command with arguments, cmd and args given as executable - using submitter - """ - cmd = ["echo", "hail", "pydra"] - docky = DockerTask(name="docky", executable=cmd, image="busybox") - assert ( - docky.cmdline - == f"docker run --rm -v {docky.output_dir}:/output_pydra:rw -w /output_pydra {docky.inputs.image} {' '.join(cmd)}" - ) - - with Submitter(plugin=plugin) as sub: - docky(submitter=sub) - res = docky.result() - assert res.output.stdout.strip() == " ".join(cmd[1:]) - assert res.output.return_code == 0 - - -@no_win -@need_docker -def test_docker_2_dockerflag(plugin): +@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +def test_docker_2(results_function, plugin): """a command with arguments, cmd and args given as executable - using ShellComandTask with container_info=("docker", image) + with and without submitter """ cmd = ["echo", "hail", "pydra"] - shocky = ShellCommandTask( - name="shocky", executable=cmd, container_info=("docker", "busybox") - ) - assert ( - shocky.cmdline - == f"docker run --rm -v {shocky.output_dir}:/output_pydra:rw -w /output_pydra {shocky.inputs.image} {' '.join(cmd)}" + docky = ShellCommandTask( + name="docky", executable=cmd, environment=Docker(image="busybox") ) - - with Submitter(plugin=plugin) as sub: - shocky(submitter=sub) - res = shocky.result() + # cmdline doesn't know anything about docker + assert docky.cmdline == " ".join(cmd) + res = results_function(docky, plugin) assert res.output.stdout.strip() == " ".join(cmd[1:]) assert res.output.return_code == 0 @no_win @need_docker -def test_docker_2a_nosubm(): - """a command with arguments, using executable and args - no submitter - """ - cmd_exec = "echo" - cmd_args = ["hail", "pydra"] - # separate command into exec + args - docky = DockerTask( - name="docky", executable=cmd_exec, args=cmd_args, image="busybox" - ) - assert docky.inputs.executable == "echo" - assert ( - docky.cmdline - == f"docker run --rm -v {docky.output_dir}:/output_pydra:rw -w /output_pydra {docky.inputs.image} {cmd_exec} {' '.join(cmd_args)}" - ) - - res = docky() - assert res.output.stdout.strip() == " ".join(cmd_args) - assert res.output.return_code == 0 - - -@no_win -@need_docker -def test_docker_2a(plugin): +@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +def test_docker_2a(results_function, plugin): """a command with arguments, using executable and args using submitter """ cmd_exec = "echo" cmd_args = ["hail", "pydra"] # separate command into exec + args - docky = DockerTask( - name="docky", executable=cmd_exec, args=cmd_args, image="busybox" + docky = ShellCommandTask( + name="docky", + executable=cmd_exec, + args=cmd_args, + environment=Docker(image="busybox"), ) assert docky.inputs.executable == "echo" - assert ( - docky.cmdline - == f"docker run --rm -v {docky.output_dir}:/output_pydra:rw -w /output_pydra {docky.inputs.image} {cmd_exec} {' '.join(cmd_args)}" - ) + assert docky.cmdline == f"{cmd_exec} {' '.join(cmd_args)}" - with Submitter(plugin=plugin) as sub: - docky(submitter=sub) - res = docky.result() + res = results_function(docky, plugin) assert res.output.stdout.strip() == " ".join(cmd_args) assert res.output.return_code == 0 -@no_win -@need_docker -@pytest.mark.skip(reason="we probably don't want to support bindings as an input") -def test_docker_3(plugin, tmp_path): - """a simple command in container with bindings, - creating directory in tmp dir and checking if it is in the container - """ - # creating a new directory - tmp_path.mkdir("new_dir") - cmd = ["ls", "/tmp_dir"] - docky = DockerTask(name="docky", executable=cmd, image="busybox") - # binding tmp directory to the container - docky.inputs.bindings = [(str(tmp_path), "/tmp_dir", "ro")] - - with Submitter(plugin=plugin) as sub: - docky(submitter=sub) - - res = docky.result() - assert res.output.stdout == "new_dir\n" - assert res.output.return_code == 0 - - -@no_win -@need_docker -@pytest.mark.skip(reason="we probably don't want to support bindings as an input") -def test_docker_3_dockerflag(plugin, tmp_path): - """a simple command in container with bindings, - creating directory in tmp dir and checking if it is in the container - using ShellComandTask with container_info=("docker", image) - """ - # creating a new directory - tmp_path.mkdir("new_dir") - cmd = ["ls", "/tmp_dir"] - shocky = ShellCommandTask( - name="shocky", container_info=("docker", "busybox"), executable=cmd - ) - # binding tmp directory to the container - shocky.inputs.bindings = [(str(tmp_path), "/tmp_dir", "ro")] - - with Submitter(plugin=plugin) as sub: - shocky(submitter=sub) - - res = shocky.result() - assert res.output.stdout == "new_dir\n" - assert res.output.return_code == 0 - - -@no_win -@need_docker -@pytest.mark.skip(reason="we probably don't want to support bindings as an input") -def test_docker_3_dockerflagbind(plugin, tmp_path): - """a simple command in container with bindings, - creating directory in tmp dir and checking if it is in the container - using ShellComandTask with container_info=("docker", image) - """ - # creating a new directory - tmp_path.mkdir("new_dir") - cmd = ["ls", "/tmp_dir"] - shocky = ShellCommandTask( - name="shocky", - container_info=("docker", "busybox", [(str(tmp_path), "/tmp_dir", "ro")]), - executable=cmd, - ) - - with Submitter(plugin=plugin) as sub: - shocky(submitter=sub) - - res = shocky.result() - assert res.output.stdout == "new_dir\n" - assert res.output.return_code == 0 - - -@no_win -@need_docker -@pytest.mark.skip(reason="we probably don't want to support bindings as an input") -def test_docker_4(plugin, tmp_path): - """task reads the file that is bounded to the container - specifying bindings, - """ - with open(tmp_path / "file_pydra.txt"), "w" as f: - f.write("hello from pydra") - - cmd = ["cat", "/tmp_dir/file_pydra.txt"] - docky = DockerTask( - name="docky_cat", - image="busybox", - executable=cmd, - bindings=[(str(tmp_path), "/tmp_dir", "ro")], - strip=True, - ) - - with Submitter(plugin=plugin) as sub: - docky(submitter=sub) - - res = docky.result() - assert res.output.stdout == "hello from pydra" - assert res.output.return_code == 0 - - -@no_win -@need_docker -@pytest.mark.skip(reason="we probably don't want to support bindings as an input") -def test_docker_4_dockerflag(plugin, tmp_path): - """task reads the file that is bounded to the container - specifying bindings, - using ShellComandTask with container_info=("docker", image, bindings) - """ - with open(tmp_path / "file_pydra.txt"), "w" as f: - f.write("hello from pydra") - - cmd = ["cat", "/tmp_dir/file_pydra.txt"] - shocky = ShellCommandTask( - name="shocky", - container_info=("docker", "busybox", [(str(tmp_path), "/tmp_dir", "ro")]), - executable=cmd, - strip=True, - ) - - with Submitter(plugin=plugin) as sub: - shocky(submitter=sub) - - res = shocky.result() - assert res.output.stdout == "hello from pydra" - assert res.output.return_code == 0 - - # tests with State @no_win @need_docker -def test_docker_st_1(plugin): +@pytest.mark.parametrize("results_function", [result_no_submitter, result_submitter]) +def test_docker_st_1(results_function, plugin): """commands without arguments in container splitter = executable """ cmd = ["pwd", "whoami"] - docky = DockerTask(name="docky", image="busybox").split( + docky = ShellCommandTask(name="docky", environment=Docker(image="busybox")).split( "executable", executable=cmd ) assert docky.state.splitter == "docky.executable" - # for ii, el in enumerate(docky.cmdline): - # assert ( - # el - # == f"docker run --rm -v {docky.output_dir[ii]}:/output_pydra:rw -w /output_pydra {docky.inputs.image} {cmd[ii]}" - # ) - - res = docky(plugin=plugin) - assert res[0].output.stdout == "/output_pydra\n" + res = results_function(docky, plugin) + assert res[0].output.stdout == f"/mnt/pydra{docky.output_dir[0]}\n" assert res[1].output.stdout == "root\n" assert res[0].output.return_code == res[1].output.return_code == 0 -@no_win -@need_docker -def test_docker_st_2(plugin): - """command with arguments in docker, checking the distribution - splitter = image - """ - cmd = ["cat", "/etc/issue"] - docky = DockerTask(name="docky", executable=cmd).split( - "image", image=["debian", "ubuntu"] - ) - assert docky.state.splitter == "docky.image" - - # for ii, el in enumerate(docky.cmdline): - # assert ( - # el - # == f"docker run --rm -v {docky.output_dir[ii]}:/output_pydra:rw -w /output_pydra {docky.inputs.image[ii]} {' '.join(cmd)}" - # ) - - res = docky(plugin=plugin) - assert "Debian" in res[0].output.stdout - assert "Ubuntu" in res[1].output.stdout - assert res[0].output.return_code == res[1].output.return_code == 0 - - -@no_win -@need_docker -def test_docker_st_3(plugin): - """outer splitter image and executable""" - cmd = ["whoami", ["cat", "/etc/issue"]] - docky = DockerTask(name="docky").split( - ["image", "executable"], executable=cmd, image=["debian", "ubuntu"] - ) - assert docky.state.splitter == ["docky.image", "docky.executable"] - res = docky(plugin=plugin) - - assert res[0].output.stdout == "root\n" - assert "Debian" in res[1].output.stdout - assert res[2].output.stdout == "root\n" - assert "Ubuntu" in res[3].output.stdout - - -@no_win -@need_docker -def test_docker_st_4(plugin): - """outer splitter image and executable, combining with images""" - cmd = ["whoami", ["cat", "/etc/issue"]] - docky = ( - DockerTask(name="docky") - .split(["image", "executable"], executable=cmd, image=["debian", "ubuntu"]) - .combine("image") - ) - assert docky.state.splitter == ["docky.image", "docky.executable"] - assert docky.state.combiner == ["docky.image"] - assert docky.state.splitter_final == "docky.executable" - - # for ii, el in enumerate(docky.cmdline): - # i, j = ii // 2, ii % 2 - # if j == 0: - # cmd_str = "whoami" - # else: - # cmd_str = " ".join(["cat", "/etc/issue"]) - # assert ( - # el - # == f"docker run --rm -v {docky.output_dir[ii]}:/output_pydra:rw -w /output_pydra {docky.inputs.image[i]} {cmd_str}" - # ) - - res = docky(plugin=plugin) - - # checking the first command - res_cmd1 = res[0] - assert res_cmd1[0].output.stdout == "root\n" - assert res_cmd1[1].output.stdout == "root\n" - - # checking the second command - res_cmd2 = res[1] - assert "Debian" in res_cmd2[0].output.stdout - assert "Ubuntu" in res_cmd2[1].output.stdout - - -# tests with workflows - - -@no_win -@need_docker -@pytest.mark.skip(reason="we probably don't want to support bindings as an input") -def test_wf_docker_1(plugin, tmp_path): - """a workflow with two connected task - the first one read the file that is bounded to the container, - the second uses echo - """ - with open(tmp_path / "file_pydra.txt"), "w" as f: - f.write("hello from pydra") - - wf = Workflow(name="wf", input_spec=["cmd1", "cmd2"]) - wf.inputs.cmd1 = ["cat", "/tmp_dir/file_pydra.txt"] - wf.inputs.cmd2 = ["echo", "message from the previous task:"] - wf.add( - DockerTask( - name="docky_cat", - image="busybox", - executable=wf.lzin.cmd1, - bindings=[(str(tmp_path), "/tmp_dir", "ro")], - strip=True, - ) - ) - wf.add( - DockerTask( - name="docky_echo", - image="ubuntu", - executable=wf.lzin.cmd2, - args=wf.docky_cat.lzout.stdout, - strip=True, - ) - ) - wf.set_output([("out", wf.docky_echo.lzout.stdout)]) - - with pytest.raises(Exception) as excinfo: - wf.docky_echo.cmdline - assert "can't return cmdline" in str(excinfo.value) - - with Submitter(plugin=plugin) as sub: - wf(submitter=sub) - - res = wf.result() - assert res.output.out == "message from the previous task: hello from pydra" - - -@no_win -@need_docker -@pytest.mark.skip(reason="we probably don't want to support bindings as an input") -def test_wf_docker_1_dockerflag(plugin, tmp_path): - """a workflow with two connected task - the first one read the file that is bounded to the container, - the second uses echo - using ShellComandTask with container_info - """ - with open(tmp_path / "file_pydra.txt"), "w" as f: - f.write("hello from pydra") - - wf = Workflow(name="wf", input_spec=["cmd1", "cmd2"]) - wf.inputs.cmd1 = ["cat", "/tmp_dir/file_pydra.txt"] - wf.inputs.cmd2 = ["echo", "message from the previous task:"] - wf.add( - ShellCommandTask( - name="shocky_cat", - container_info=("docker", "busybox", [(str(tmp_path), "/tmp_dir", "ro")]), - executable=wf.lzin.cmd1, - strip=True, - ) - ) - wf.add( - ShellCommandTask( - name="shocky_echo", - executable=wf.lzin.cmd2, - args=wf.shocky_cat.lzout.stdout, - strip=True, - container_info=("docker", "ubuntu"), - ) - ) - wf.set_output([("out", wf.shocky_echo.lzout.stdout)]) - - with Submitter(plugin=plugin) as sub: - wf(submitter=sub) - - res = wf.result() - assert res.output.out == "message from the previous task: hello from pydra" - - -@no_win -@need_docker -@pytest.mark.skip(reason="we probably don't want to support bindings as an input") -def test_wf_docker_2pre(plugin, tmp_path, data_tests_dir): - """a workflow with two connected task that run python scripts - the first one creates a text file and the second one reads the file - """ - - cmd1 = ["python", "/scripts/saving.py", "-f", "/outputs/tmp.txt"] - dt = DockerTask( - name="save", - image="python:3.7-alpine", - executable=cmd1, - bindings=[(str(tmp_path), "/outputs"), (str(data_tests_dir), "/scripts", "ro")], - strip=True, - ) - res = dt(plugin=plugin) - assert res.output.stdout == "/outputs/tmp.txt" - - -@no_win -@need_docker -@pytest.mark.skip(reason="we probably don't want to support bindings as an input") -def test_wf_docker_2(plugin, tmp_path, data_tests_dir): - """a workflow with two connected task that run python scripts - the first one creates a text file and the second one reads the file - """ - - wf = Workflow(name="wf", input_spec=["cmd1", "cmd2"]) - wf.inputs.cmd1 = ["python", "/scripts/saving.py", "-f", "/outputs/tmp.txt"] - wf.inputs.cmd2 = ["python", "/scripts/loading.py", "-f"] - wf.add( - DockerTask( - name="save", - image="python:3.7-alpine", - executable=wf.lzin.cmd1, - bindings=[ - (str(tmp_path), "/outputs"), - (str(data_tests_dir), "/scripts", "ro"), - ], - strip=True, - ) - ) - wf.add( - DockerTask( - name="load", - image="python:3.7-alpine", - executable=wf.lzin.cmd2, - args=wf.save.lzout.stdout, - bindings=[ - (str(tmp_path), "/outputs"), - (str(data_tests_dir), "/scripts", "ro"), - ], - strip=True, - ) - ) - wf.set_output([("out", wf.load.lzout.stdout)]) - - with Submitter(plugin=plugin) as sub: - wf(submitter=sub) - - res = wf.result() - assert res.output.out == "Hello!" - - -@no_win -@need_docker -@pytest.mark.skip(reason="we probably don't want to support bindings as an input") -def test_wf_docker_3(plugin, tmp_path): - """a workflow with two connected task - the first one read the file that contains the name of the image, - the output is passed to the second task as the image used to run the task - """ - with open(tmp_path / "image.txt"), "w" as f: - f.write("ubuntu") - - wf = Workflow(name="wf", input_spec=["cmd1", "cmd2"]) - wf.inputs.cmd1 = ["cat", "/tmp_dir/image.txt"] - wf.inputs.cmd2 = ["echo", "image passed to the second task:"] - wf.add( - DockerTask( - name="docky_cat", - image="busybox", - executable=wf.lzin.cmd1, - bindings=[(str(tmp_path), "/tmp_dir", "ro")], - strip=True, - ) - ) - wf.add( - DockerTask( - name="docky_echo", - image=wf.docky_cat.lzout.stdout, - executable=wf.lzin.cmd2, - args=wf.docky_cat.lzout.stdout, - strip=True, - ) - ) - wf.set_output([("out", wf.docky_echo.lzout.stdout)]) - - with Submitter(plugin=plugin) as sub: - wf(submitter=sub) - - res = wf.result() - assert res.output.out == "image passed to the second task: ubuntu" - - # tests with customized output_spec @@ -626,8 +129,11 @@ def test_docker_outputspec_1(plugin, tmp_path): fields=[("newfile", File, "newfile_tmp.txt")], bases=(ShellOutSpec,), ) - docky = DockerTask( - name="docky", image="ubuntu", executable=cmd, output_spec=my_output_spec + docky = ShellCommandTask( + name="docky", + environment=Docker(image="ubuntu"), + executable=cmd, + output_spec=my_output_spec, ) with Submitter(plugin=plugin) as sub: @@ -666,12 +172,12 @@ def test_docker_inputspec_1(tmp_path): ), ) ], - bases=(DockerSpec,), + bases=(ShellSpec,), ) - docky = DockerTask( + docky = ShellCommandTask( name="docky", - image="busybox", + environment=Docker(image="busybox"), executable=cmd, file=filename, input_spec=my_input_spec, @@ -706,107 +212,14 @@ def test_docker_inputspec_1a(tmp_path): ), ) ], - bases=(DockerSpec,), - ) - - docky = DockerTask( - name="docky", - image="busybox", - executable=cmd, - input_spec=my_input_spec, - strip=True, - ) - - res = docky() - assert res.output.stdout == "hello from pydra" - - -@no_win -@need_docker -@pytest.mark.skip(reason="we probably don't want to support bindings as an input") -def test_docker_inputspec_1b(tmp_path): - """a simple customized input spec for docker task - instead of using automatic binding I provide the bindings - and name of the file inside the container - """ - filename = str(tmp_path / "file_pydra.txt") - with open(filename, "w") as f: - f.write("hello from pydra") - - cmd = "cat" - - my_input_spec = SpecInfo( - name="Input", - fields=[ - ( - "file", - attr.ib( - type=File, - metadata={ - "mandatory": True, - "position": 1, - "argstr": "", - "help_string": "input file", - "container_path": True, - }, - ), - ) - ], - bases=(DockerSpec,), - ) - - docky = DockerTask( - name="docky", - image="busybox", - executable=cmd, - # container_path is set to True, so providing the filename inside the container - file="/in_container/file_pydra.txt", - bindings=[(str(tmp_path), "/in_container")], - input_spec=my_input_spec, - strip=True, - ) - - res = docky() - assert res.output.stdout == "hello from pydra" - - -@no_win -@need_docker -def test_docker_inputspec_1_dockerflag(tmp_path): - """a simple customized input spec for docker task - using ShellTask with container_info - """ - filename = str(tmp_path / "file_pydra.txt") - with open(filename, "w") as f: - f.write("hello from pydra") - - cmd = "cat" - - my_input_spec = SpecInfo( - name="Input", - fields=[ - ( - "file", - attr.ib( - type=File, - metadata={ - "mandatory": True, - "position": 1, - "argstr": "", - "help_string": "input file", - }, - ), - ) - ], bases=(ShellSpec,), ) docky = ShellCommandTask( name="docky", + environment=Docker(image="busybox"), executable=cmd, - file=filename, input_spec=my_input_spec, - container_info=("docker", "busybox"), strip=True, ) @@ -855,12 +268,12 @@ def test_docker_inputspec_2(plugin, tmp_path): ), ), ], - bases=(DockerSpec,), + bases=(ShellSpec,), ) - docky = DockerTask( + docky = ShellCommandTask( name="docky", - image="busybox", + environment=Docker(image="busybox"), executable=cmd, file1=filename_1, input_spec=my_input_spec, @@ -914,12 +327,12 @@ def test_docker_inputspec_2a_except(plugin, tmp_path): ), ), ], - bases=(DockerSpec,), + bases=(ShellSpec,), ) - docky = DockerTask( + docky = ShellCommandTask( name="docky", - image="busybox", + environment=Docker(image="busybox"), executable=cmd, file2=filename_2, input_spec=my_input_spec, @@ -975,12 +388,12 @@ def test_docker_inputspec_2a(plugin, tmp_path): ), ), ], - bases=(DockerSpec,), + bases=(ShellSpec,), ) - docky = DockerTask( + docky = ShellCommandTask( name="docky", - image="busybox", + environment=Docker(image="busybox"), executable=cmd, file2=filename_2, input_spec=my_input_spec, @@ -1018,12 +431,12 @@ def test_docker_inputspec_3(plugin, tmp_path): ), ) ], - bases=(DockerSpec,), + bases=(ShellSpec,), ) - docky = DockerTask( + docky = ShellCommandTask( name="docky", - image="busybox", + environment=Docker(image="busybox"), executable=cmd, file=filename, input_spec=my_input_spec, @@ -1036,51 +449,6 @@ def test_docker_inputspec_3(plugin, tmp_path): assert cmdline == docky.cmdline -@no_win -@need_docker -@pytest.mark.skip(reason="we probably don't want to support container_path") -def test_docker_inputspec_3a(plugin, tmp_path): - """input file does not exist in the local file system, - but metadata["container_path"] is not used, - so exception is raised - """ - filename = "/_proc/1/cgroup" - - cmd = "cat" - - my_input_spec = SpecInfo( - name="Input", - fields=[ - ( - "file", - attr.ib( - type=File, - metadata={ - "mandatory": True, - "position": 1, - "argstr": "", - "help_string": "input file", - }, - ), - ) - ], - bases=(DockerSpec,), - ) - - docky = DockerTask( - name="docky", - image="busybox", - executable=cmd, - file=filename, - input_spec=my_input_spec, - strip=True, - ) - - with pytest.raises(Exception) as excinfo: - docky() - assert "use field.metadata['container_path']=True" in str(excinfo.value) - - @no_win @need_docker def test_docker_cmd_inputspec_copyfile_1(plugin, tmp_path): @@ -1121,12 +489,12 @@ def test_docker_cmd_inputspec_copyfile_1(plugin, tmp_path): ), ), ], - bases=(DockerSpec,), + bases=(ShellSpec,), ) - docky = DockerTask( + docky = ShellCommandTask( name="docky", - image="busybox", + environment=Docker(image="busybox"), executable=cmd, input_spec=my_input_spec, orig_file=str(file), @@ -1176,12 +544,12 @@ def test_docker_inputspec_state_1(plugin, tmp_path): ), ) ], - bases=(DockerSpec,), + bases=(ShellSpec,), ) - docky = DockerTask( + docky = ShellCommandTask( name="docky", - image="busybox", + environment=Docker(image="busybox"), executable=cmd, input_spec=my_input_spec, strip=True, @@ -1225,12 +593,12 @@ def test_docker_inputspec_state_1b(plugin, tmp_path): ), ) ], - bases=(DockerSpec,), + bases=(ShellSpec,), ) - docky = DockerTask( + docky = ShellCommandTask( name="docky", - image="busybox", + environment=Docker(image="busybox"), executable=cmd, input_spec=my_input_spec, strip=True, @@ -1267,16 +635,16 @@ def test_docker_wf_inputspec_1(plugin, tmp_path): ), ) ], - bases=(DockerSpec,), + bases=(ShellSpec,), ) wf = Workflow(name="wf", input_spec=["cmd", "file"]) wf.inputs.cmd = cmd wf.inputs.file = filename - docky = DockerTask( + docky = ShellCommandTask( name="docky", - image="busybox", + environment=Docker(image="busybox"), executable=wf.lzin.cmd, file=wf.lzin.file, input_spec=my_input_spec, @@ -1322,16 +690,16 @@ def test_docker_wf_state_inputspec_1(plugin, tmp_path): ), ) ], - bases=(DockerSpec,), + bases=(ShellSpec,), ) wf = Workflow(name="wf", input_spec=["cmd", "file"]) wf.split(file=[str(file_1), str(file_2)]) wf.inputs.cmd = cmd - docky = DockerTask( + docky = ShellCommandTask( name="docky", - image="busybox", + environment=Docker(image="busybox"), executable=wf.lzin.cmd, file=wf.lzin.file, input_spec=my_input_spec, @@ -1378,15 +746,15 @@ def test_docker_wf_ndst_inputspec_1(plugin, tmp_path): ), ) ], - bases=(DockerSpec,), + bases=(ShellSpec,), ) wf = Workflow(name="wf", input_spec=["cmd", "file"]) wf.inputs.cmd = cmd - docky = DockerTask( + docky = ShellCommandTask( name="docky", - image="busybox", + environment=Docker(image="busybox"), executable=wf.lzin.cmd, file=wf.lzin.file, input_spec=my_input_spec, diff --git a/pydra/engine/tests/test_environments.py b/pydra/engine/tests/test_environments.py new file mode 100644 index 000000000..bd05d9dae --- /dev/null +++ b/pydra/engine/tests/test_environments.py @@ -0,0 +1,539 @@ +from pathlib import Path + +from ..environments import Native, Docker, Singularity +from ..task import ShellCommandTask +from ..submitter import Submitter +from ..specs import ( + ShellSpec, + SpecInfo, + File, +) +from .utils import no_win, need_docker, need_singularity + +import attr +import pytest + + +def makedir(path, name): + newdir = path / name + newdir.mkdir() + return newdir + + +def test_native_1(tmp_path): + """simple command, no arguments""" + newcache = lambda x: makedir(tmp_path, x) + + cmd = ["whoami"] + shelly = ShellCommandTask( + name="shelly", executable=cmd, cache_dir=newcache("shelly") + ) + assert shelly.cmdline == " ".join(cmd) + + env_res = Native().execute(shelly) + shelly() + assert env_res == shelly.output_ + + shelly_call = ShellCommandTask( + name="shelly_call", executable=cmd, cache_dir=newcache("shelly_call") + ) + shelly_call(environment=Native()) + assert env_res == shelly_call.output_ + + shelly_subm = ShellCommandTask( + name="shelly_subm", executable=cmd, cache_dir=newcache("shelly_subm") + ) + with Submitter(plugin="cf") as sub: + shelly_subm(submitter=sub, environment=Native()) + assert env_res == shelly_subm.result().output.__dict__ + + +@no_win +@need_docker +def test_docker_1(tmp_path): + """docker env: simple command, no arguments""" + newcache = lambda x: makedir(tmp_path, x) + + cmd = ["whoami"] + docker = Docker(image="busybox") + shelly = ShellCommandTask( + name="shelly", executable=cmd, cache_dir=newcache("shelly") + ) + assert shelly.cmdline == " ".join(cmd) + env_res = docker.execute(shelly) + + shelly_env = ShellCommandTask( + name="shelly", + executable=cmd, + cache_dir=newcache("shelly_env"), + environment=docker, + ) + shelly_env() + assert env_res == shelly_env.output_ == shelly_env.result().output.__dict__ + + shelly_call = ShellCommandTask( + name="shelly", executable=cmd, cache_dir=newcache("shelly_call") + ) + shelly_call(environment=docker) + assert env_res == shelly_call.output_ == shelly_call.result().output.__dict__ + + +@no_win +@need_docker +@pytest.mark.parametrize( + "docker", + [ + Docker(image="busybox"), + Docker(image="busybox", tag="latest", xargs="--rm"), + Docker(image="busybox", xargs=["--rm"]), + ], +) +def test_docker_1_subm(tmp_path, docker): + """docker env with submitter: simple command, no arguments""" + newcache = lambda x: makedir(tmp_path, x) + + cmd = ["whoami"] + docker = Docker(image="busybox") + shelly = ShellCommandTask( + name="shelly", executable=cmd, cache_dir=newcache("shelly") + ) + assert shelly.cmdline == " ".join(cmd) + env_res = docker.execute(shelly) + + shelly_env = ShellCommandTask( + name="shelly", + executable=cmd, + cache_dir=newcache("shelly_env"), + environment=docker, + ) + with Submitter(plugin="cf") as sub: + shelly_env(submitter=sub) + assert env_res == shelly_env.result().output.__dict__ + + shelly_call = ShellCommandTask( + name="shelly", executable=cmd, cache_dir=newcache("shelly_call") + ) + with Submitter(plugin="cf") as sub: + shelly_call(submitter=sub, environment=docker) + assert env_res == shelly_call.result().output.__dict__ + + +@no_win +@need_singularity +def test_singularity_1(tmp_path): + """singularity env: simple command, no arguments""" + newcache = lambda x: makedir(tmp_path, x) + + cmd = ["whoami"] + sing = Singularity(image="docker://alpine") + shelly = ShellCommandTask( + name="shelly", executable=cmd, cache_dir=newcache("shelly") + ) + assert shelly.cmdline == " ".join(cmd) + env_res = sing.execute(shelly) + + shelly_env = ShellCommandTask( + name="shelly", + executable=cmd, + cache_dir=newcache("shelly_env"), + environment=sing, + ) + shelly_env() + assert env_res == shelly_env.output_ == shelly_env.result().output.__dict__ + + shelly_call = ShellCommandTask( + name="shelly", executable=cmd, cache_dir=newcache("shelly_call") + ) + shelly_call(environment=sing) + assert env_res == shelly_call.output_ == shelly_call.result().output.__dict__ + + +@no_win +@need_singularity +def test_singularity_1_subm(tmp_path, plugin): + """docker env with submitter: simple command, no arguments""" + newcache = lambda x: makedir(tmp_path, x) + + cmd = ["whoami"] + sing = Singularity(image="docker://alpine") + shelly = ShellCommandTask( + name="shelly", executable=cmd, cache_dir=newcache("shelly") + ) + assert shelly.cmdline == " ".join(cmd) + env_res = sing.execute(shelly) + + shelly_env = ShellCommandTask( + name="shelly", + executable=cmd, + cache_dir=newcache("shelly_env"), + environment=sing, + ) + with Submitter(plugin=plugin) as sub: + shelly_env(submitter=sub) + assert env_res == shelly_env.result().output.__dict__ + + shelly_call = ShellCommandTask( + name="shelly", executable=cmd, cache_dir=newcache("shelly_call") + ) + with Submitter(plugin=plugin) as sub: + shelly_call(submitter=sub, environment=sing) + for key in [ + "stdout", + "return_code", + ]: # singularity gives info about cashed image in stderr + assert env_res[key] == shelly_call.result().output.__dict__[key] + + +def create_shelly_inputfile(tempdir, filename, name, executable): + """creating a task with a simple input_spec""" + my_input_spec = SpecInfo( + name="Input", + fields=[ + ( + "file", + attr.ib( + type=File, + metadata={ + "position": 1, + "help_string": "files", + "mandatory": True, + "argstr": "", + }, + ), + ) + ], + bases=(ShellSpec,), + ) + + kwargs = {} if filename is None else {"file": filename} + shelly = ShellCommandTask( + name=name, + executable=executable, + cache_dir=makedir(tempdir, name), + input_spec=my_input_spec, + **kwargs, + ) + return shelly + + +def test_shell_fileinp(tmp_path): + """task with a file in the command/input""" + input_dir = makedir(tmp_path, "inputs") + filename = input_dir / "file.txt" + with open(filename, "w") as f: + f.write("hello ") + + shelly = create_shelly_inputfile( + tempdir=tmp_path, filename=filename, name="shelly", executable=["cat"] + ) + env_res = Native().execute(shelly) + + shelly_env = create_shelly_inputfile( + tempdir=tmp_path, filename=filename, name="shelly_env", executable=["cat"] + ) + shelly_env.environment = Native() + shelly_env() + assert env_res == shelly_env.output_ == shelly_env.result().output.__dict__ + + shelly_call = create_shelly_inputfile( + tempdir=tmp_path, filename=filename, name="shelly_call", executable=["cat"] + ) + shelly_call(environment=Native()) + assert env_res == shelly_call.output_ == shelly_call.result().output.__dict__ + + +def test_shell_fileinp_st(tmp_path): + """task (with a splitter) with a file in the command/input""" + input_dir = makedir(tmp_path, "inputs") + filename_1 = input_dir / "file_1.txt" + with open(filename_1, "w") as f: + f.write("hello ") + + filename_2 = input_dir / "file_2.txt" + with open(filename_2, "w") as f: + f.write("hi ") + + filename = [filename_1, filename_2] + + shelly_env = create_shelly_inputfile( + tempdir=tmp_path, filename=None, name="shelly_env", executable=["cat"] + ) + shelly_env.environment = Native() + shelly_env.split(file=filename) + shelly_env() + assert shelly_env.result()[0].output.stdout.strip() == "hello" + assert shelly_env.result()[1].output.stdout.strip() == "hi" + + shelly_call = create_shelly_inputfile( + tempdir=tmp_path, filename=None, name="shelly_call", executable=["cat"] + ) + shelly_call.split(file=filename) + shelly_call(environment=Native()) + assert shelly_call.result()[0].output.stdout.strip() == "hello" + assert shelly_call.result()[1].output.stdout.strip() == "hi" + + +@no_win +@need_docker +def test_docker_fileinp(tmp_path): + """docker env: task with a file in the command/input""" + docker = Docker(image="busybox") + + input_dir = makedir(tmp_path, "inputs") + filename = input_dir / "file.txt" + with open(filename, "w") as f: + f.write("hello ") + + shelly = create_shelly_inputfile( + tempdir=tmp_path, filename=filename, name="shelly", executable=["cat"] + ) + env_res = docker.execute(shelly) + + shelly_env = create_shelly_inputfile( + tempdir=tmp_path, filename=filename, name="shelly_env", executable=["cat"] + ) + shelly_env.environment = docker + shelly_env() + + assert env_res == shelly_env.output_ == shelly_env.result().output.__dict__ + + shelly_call = create_shelly_inputfile( + tempdir=tmp_path, filename=filename, name="shelly_call", executable=["cat"] + ) + shelly_call(environment=docker) + assert env_res == shelly_call.output_ == shelly_call.result().output.__dict__ + + +@no_win +@need_docker +def test_docker_fileinp_subm(tmp_path, plugin): + """docker env with a submitter: task with a file in the command/input""" + docker = Docker(image="busybox") + + input_dir = makedir(tmp_path, "inputs") + filename = input_dir / "file.txt" + with open(filename, "w") as f: + f.write("hello ") + + shelly = create_shelly_inputfile( + tempdir=tmp_path, filename=filename, name="shelly", executable=["cat"] + ) + env_res = docker.execute(shelly) + + shelly_env = create_shelly_inputfile( + tempdir=tmp_path, filename=filename, name="shelly_env", executable=["cat"] + ) + shelly_env.environment = docker + with Submitter(plugin=plugin) as sub: + shelly_env(submitter=sub) + assert env_res == shelly_env.result().output.__dict__ + + shelly_call = create_shelly_inputfile( + tempdir=tmp_path, filename=filename, name="shelly_call", executable=["cat"] + ) + with Submitter(plugin=plugin) as sub: + shelly_call(submitter=sub, environment=docker) + assert env_res == shelly_call.result().output.__dict__ + + +@no_win +@need_docker +def test_docker_fileinp_st(tmp_path): + """docker env: task (with a splitter) with a file in the command/input""" + docker = Docker(image="busybox") + + input_dir = makedir(tmp_path, "inputs") + filename_1 = input_dir / "file_1.txt" + with open(filename_1, "w") as f: + f.write("hello ") + + filename_2 = input_dir / "file_2.txt" + with open(filename_2, "w") as f: + f.write("hi ") + + filename = [filename_1, filename_2] + + shelly_env = create_shelly_inputfile( + tempdir=tmp_path, filename=None, name="shelly_env", executable=["cat"] + ) + shelly_env.environment = docker + shelly_env.split(file=filename) + shelly_env() + assert shelly_env.result()[0].output.stdout.strip() == "hello" + assert shelly_env.result()[1].output.stdout.strip() == "hi" + + shelly_call = create_shelly_inputfile( + tempdir=tmp_path, filename=None, name="shelly_call", executable=["cat"] + ) + shelly_call.split(file=filename) + shelly_call(environment=docker) + assert shelly_call.result()[0].output.stdout.strip() == "hello" + assert shelly_call.result()[1].output.stdout.strip() == "hi" + + +def create_shelly_outputfile(tempdir, filename, name, executable="cp"): + """creating a task with an input_spec that contains a template""" + my_input_spec = SpecInfo( + name="Input", + fields=[ + ( + "file_orig", + attr.ib( + type=File, + metadata={"position": 2, "help_string": "new file", "argstr": ""}, + ), + ), + ( + "file_copy", + attr.ib( + type=str, + metadata={ + "output_file_template": "{file_orig}_copy", + "help_string": "output file", + "argstr": "", + }, + ), + ), + ], + bases=(ShellSpec,), + ) + + kwargs = {} if filename is None else {"file_orig": filename} + shelly = ShellCommandTask( + name=name, + executable=executable, + cache_dir=makedir(tempdir, name), + input_spec=my_input_spec, + **kwargs, + ) + return shelly + + +def test_shell_fileout(tmp_path): + """task with a file in the output""" + input_dir = makedir(tmp_path, "inputs") + filename = input_dir / "file.txt" + with open(filename, "w") as f: + f.write("hello ") + + # execute does not create the cashedir, so this part will fail, + # but I guess we don't want to use it this way anyway + # shelly = create_shelly_outputfile(tempdir=tmp_path, filename=filename, name="shelly") + # env_res = Native().execute(shelly) + + shelly_env = create_shelly_outputfile( + tempdir=tmp_path, filename=filename, name="shelly_env" + ) + shelly_env.environment = Native() + shelly_env() + assert ( + Path(shelly_env.result().output.file_copy) + == shelly_env.output_dir / "file_copy.txt" + ) + + shelly_call = create_shelly_outputfile( + tempdir=tmp_path, filename=filename, name="shelly_call" + ) + shelly_call(environment=Native()) + assert ( + Path(shelly_call.result().output.file_copy) + == shelly_call.output_dir / "file_copy.txt" + ) + + +def test_shell_fileout_st(tmp_path): + """task (with a splitter) with a file in the output""" + input_dir = makedir(tmp_path, "inputs") + filename_1 = input_dir / "file_1.txt" + with open(filename_1, "w") as f: + f.write("hello ") + + filename_2 = input_dir / "file_2.txt" + with open(filename_2, "w") as f: + f.write("hi ") + + filename = [filename_1, filename_2] + + shelly_env = create_shelly_outputfile( + tempdir=tmp_path, filename=None, name="shelly_env" + ) + shelly_env.environment = Native() + shelly_env.split(file_orig=filename) + shelly_env() + assert ( + Path(shelly_env.result()[0].output.file_copy) + == shelly_env.output_dir[0] / "file_1_copy.txt" + ) + assert ( + Path(shelly_env.result()[1].output.file_copy) + == shelly_env.output_dir[1] / "file_2_copy.txt" + ) + + shelly_call = create_shelly_outputfile( + tempdir=tmp_path, filename=None, name="shelly_call" + ) + shelly_call.split(file_orig=filename) + shelly_call(environment=Native()) + assert ( + Path(shelly_call.result()[0].output.file_copy) + == shelly_call.output_dir[0] / "file_1_copy.txt" + ) + assert ( + Path(shelly_call.result()[1].output.file_copy) + == shelly_call.output_dir[1] / "file_2_copy.txt" + ) + + +@no_win +@need_docker +def test_docker_fileout(tmp_path): + """docker env: task with a file in the output""" + docker_env = Docker(image="busybox") + + input_dir = makedir(tmp_path, "inputs") + filename = input_dir / "file.txt" + with open(filename, "w") as f: + f.write("hello ") + + shelly_env = create_shelly_outputfile( + tempdir=tmp_path, filename=filename, name="shelly_env" + ) + shelly_env.environment = docker_env + shelly_env() + assert ( + Path(shelly_env.result().output.file_copy) + == shelly_env.output_dir / "file_copy.txt" + ) + + +@no_win +@need_docker +def test_docker_fileout_st(tmp_path): + """docker env: task (with a splitter) with a file in the output""" + docker_env = Docker(image="busybox") + + input_dir = makedir(tmp_path, "inputs") + filename_1 = input_dir / "file_1.txt" + with open(filename_1, "w") as f: + f.write("hello ") + + filename_2 = input_dir / "file_2.txt" + with open(filename_2, "w") as f: + f.write("hi ") + + filename = [filename_1, filename_2] + + shelly_env = create_shelly_outputfile( + tempdir=tmp_path, filename=None, name="shelly_env" + ) + shelly_env.environment = docker_env + shelly_env.split(file_orig=filename) + shelly_env() + assert ( + Path(shelly_env.result()[0].output.file_copy) + == shelly_env.output_dir[0] / "file_1_copy.txt" + ) + assert ( + Path(shelly_env.result()[1].output.file_copy) + == shelly_env.output_dir[1] / "file_2_copy.txt" + ) diff --git a/pydra/engine/tests/test_singularity.py b/pydra/engine/tests/test_singularity.py index 2dc239da7..791575adc 100644 --- a/pydra/engine/tests/test_singularity.py +++ b/pydra/engine/tests/test_singularity.py @@ -3,10 +3,11 @@ import pytest import attr -from ..task import SingularityTask, DockerTask, ShellCommandTask +from ..task import ShellCommandTask from ..submitter import Submitter from ..core import Workflow -from ..specs import ShellOutSpec, SpecInfo, File, SingularitySpec +from ..specs import ShellOutSpec, SpecInfo, File, ShellSpec +from ..environments import Singularity need_docker = pytest.mark.skipif( @@ -29,18 +30,18 @@ def test_singularity_1_nosubm(tmp_path): """ cmd = "pwd" image = "docker://alpine" - singu = SingularityTask( - name="singu", executable=cmd, image=image, cache_dir=tmp_path - ) - assert singu.inputs.image == "docker://alpine" - assert singu.inputs.container == "singularity" - assert ( - singu.cmdline - == f"singularity exec -B {singu.output_dir}:/output_pydra:rw --pwd /output_pydra {image} {cmd}" + singu = ShellCommandTask( + name="singu", + executable=cmd, + environment=Singularity(image=image), + cache_dir=tmp_path, ) + assert singu.environment.image == "docker://alpine" + assert isinstance(singu.environment, Singularity) + assert singu.cmdline == cmd res = singu() - assert "output_pydra" in res.output.stdout + assert "/mnt/pydra" in res.output.stdout assert res.output.return_code == 0 @@ -51,13 +52,13 @@ def test_singularity_2_nosubm(tmp_path): """ cmd = ["echo", "hail", "pydra"] image = "docker://alpine" - singu = SingularityTask( - name="singu", executable=cmd, image=image, cache_dir=tmp_path - ) - assert ( - singu.cmdline - == f"singularity exec -B {singu.output_dir}:/output_pydra:rw --pwd /output_pydra {image} {' '.join(cmd)}" + singu = ShellCommandTask( + name="singu", + executable=cmd, + environment=Singularity(image=image), + cache_dir=tmp_path, ) + assert singu.cmdline == " ".join(cmd) res = singu() assert res.output.stdout.strip() == " ".join(cmd[1:]) @@ -71,42 +72,18 @@ def test_singularity_2(plugin, tmp_path): """ cmd = ["echo", "hail", "pydra"] image = "docker://alpine" - singu = SingularityTask( - name="singu", executable=cmd, image=image, cache_dir=tmp_path - ) - assert ( - singu.cmdline - == f"singularity exec -B {singu.output_dir}:/output_pydra:rw --pwd /output_pydra {image} {' '.join(cmd)}" - ) - - with Submitter(plugin=plugin) as sub: - singu(submitter=sub) - res = singu.result() - assert res.output.stdout.strip() == " ".join(cmd[1:]) - assert res.output.return_code == 0 - -@need_singularity -def test_singularity_2_singuflag(plugin, tmp_path): - """a command with arguments, cmd and args given as executable - using ShellComandTask with container_info=("singularity", image) - """ - cmd = ["echo", "hail", "pydra"] - image = "docker://alpine" - shingu = ShellCommandTask( - name="shingu", + singu = ShellCommandTask( + name="singu", executable=cmd, - container_info=("singularity", image), + environment=Singularity(image=image), cache_dir=tmp_path, ) - assert ( - shingu.cmdline - == f"singularity exec -B {shingu.output_dir}:/output_pydra:rw --pwd /output_pydra {image} {' '.join(cmd)}" - ) + assert singu.cmdline == " ".join(cmd) with Submitter(plugin=plugin) as sub: - shingu(submitter=sub) - res = shingu.result() + singu(submitter=sub) + res = singu.result() assert res.output.stdout.strip() == " ".join(cmd[1:]) assert res.output.return_code == 0 @@ -120,17 +97,14 @@ def test_singularity_2a(plugin, tmp_path): cmd_args = ["hail", "pydra"] # separate command into exec + args image = "docker://alpine" - singu = SingularityTask( + singu = ShellCommandTask( name="singu", executable=cmd_exec, args=cmd_args, - image=image, + environment=Singularity(image=image), cache_dir=tmp_path, ) - assert ( - singu.cmdline - == f"singularity exec -B {singu.output_dir}:/output_pydra:rw --pwd /output_pydra {image} {cmd_exec} {' '.join(cmd_args)}" - ) + assert singu.cmdline == f"{cmd_exec} {' '.join(cmd_args)}" with Submitter(plugin=plugin) as sub: singu(submitter=sub) @@ -139,84 +113,6 @@ def test_singularity_2a(plugin, tmp_path): assert res.output.return_code == 0 -@need_singularity -@pytest.mark.skip(reason="we probably don't want to support bindings as an input") -def test_singularity_3(plugin, tmp_path): - """a simple command in container with bindings, - creating directory in tmp dir and checking if it is in the container - """ - # creating a new directory - (tmp_path / "new_dir").mkdir() - cmd = ["ls", "/tmp_dir"] - image = "docker://alpine" - singu = SingularityTask( - name="singu", executable=cmd, image=image, cache_dir=tmp_path - ) - # binding tmp directory to the container - singu.inputs.bindings = [(str(tmp_path), "/tmp_dir", "ro")] - - with Submitter(plugin=plugin) as sub: - singu(submitter=sub) - - res = singu.result() - assert "new_dir\n" in res.output.stdout - assert res.output.return_code == 0 - - -@need_singularity -@pytest.mark.skip(reason="we probably don't want to support bindings as an input") -def test_singularity_3_singuflag(plugin, tmp_path): - """a simple command in container with bindings, - creating directory in tmp dir and checking if it is in the container - using ShellComandTask with container_info=("singularity", image) - """ - # creating a new directory - (tmp_path / "new_dir").mkdir() - cmd = ["ls", "/tmp_dir"] - image = "docker://alpine" - shingu = SingularityTask( - name="singu", - executable=cmd, - container_info=("singularity", image), - cache_dir=tmp_path, - ) - # binding tmp directory to the container - shingu.inputs.bindings = [(str(tmp_path), "/tmp_dir", "ro")] - - with Submitter(plugin=plugin) as sub: - shingu(submitter=sub) - - res = shingu.result() - assert "new_dir\n" in res.output.stdout - assert res.output.return_code == 0 - - -@need_singularity -@pytest.mark.skip(reason="we probably don't want to support bindings as an input") -def test_singularity_3_singuflagbind(plugin, tmp_path): - """a simple command in container with bindings, - creating directory in tmp dir and checking if it is in the container - using ShellComandTask with container_info=("singularity", image, bindings) - """ - # creating a new directory - (tmp_path / "new_dir").mkdir() - cmd = ["ls", "/tmp_dir"] - image = "docker://alpine" - shingu = SingularityTask( - name="singu", - executable=cmd, - container_info=("singularity", image, [(str(tmp_path), "/tmp_dir", "ro")]), - cache_dir=tmp_path, - ) - - with Submitter(plugin=plugin) as sub: - shingu(submitter=sub) - - res = shingu.result() - assert "new_dir\n" in res.output.stdout - assert res.output.return_code == 0 - - # tests with State @@ -227,64 +123,33 @@ def test_singularity_st_1(plugin, tmp_path): """ cmd = ["pwd", "ls"] image = "docker://alpine" - singu = SingularityTask(name="singu", image=image, cache_dir=tmp_path).split( - "executable", executable=cmd - ) + singu = ShellCommandTask( + name="singu", environment=Singularity(image=image), cache_dir=tmp_path + ).split("executable", executable=cmd) assert singu.state.splitter == "singu.executable" res = singu(plugin=plugin) - assert "/output_pydra" in res[0].output.stdout + assert "/mnt/pydra" in res[0].output.stdout assert res[1].output.stdout == "" assert res[0].output.return_code == res[1].output.return_code == 0 -@need_singularity -def test_singularity_st_2(plugin, tmp_path): - """command with arguments in docker, checking the distribution - splitter = image - """ - cmd = ["cat", "/etc/issue"] - image = ["docker://alpine", "docker://ubuntu"] - singu = SingularityTask(name="singu", executable=cmd, cache_dir=tmp_path).split( - "image", image=image - ) - assert singu.state.splitter == "singu.image" - - res = singu(plugin=plugin) - assert "Alpine" in res[0].output.stdout - assert "Ubuntu" in res[1].output.stdout - assert res[0].output.return_code == res[1].output.return_code == 0 - - -@need_singularity -def test_singularity_st_3(plugin, tmp_path): - """outer splitter image and executable""" - cmd = ["pwd", ["cat", "/etc/issue"]] - image = ["docker://alpine", "docker://ubuntu"] - singu = SingularityTask(name="singu", cache_dir=tmp_path).split( - ["image", "executable"], executable=cmd, image=image - ) - assert singu.state.splitter == ["singu.image", "singu.executable"] - res = singu(plugin=plugin) - - assert "/output_pydra" in res[0].output.stdout - assert "Alpine" in res[1].output.stdout - assert "/output_pydra" in res[2].output.stdout - assert "Ubuntu" in res[3].output.stdout - - @need_singularity @need_slurm +@pytest.mark.skip(reason="TODO, xfail incorrect") @pytest.mark.xfail( reason="slurm can complain if the number of submitted jobs exceeds the limit" ) @pytest.mark.parametrize("n", [10, 50, 100]) -def test_singularity_st_4(tmp_path, n): +def test_singularity_st_2(tmp_path, n): """splitter over args (checking bigger splitters if slurm available)""" args_n = list(range(n)) image = "docker://alpine" - singu = SingularityTask( - name="singu", executable="echo", image=image, cache_dir=tmp_path + singu = ShellCommandTask( + name="singu", + executable="echo", + environment=Singularity(image=image), + cache_dir=tmp_path, ).split("args", args=args_n) assert singu.state.splitter == "singu.args" res = singu(plugin="slurm") @@ -293,90 +158,6 @@ def test_singularity_st_4(tmp_path, n): assert res[0].output.return_code == res[1].output.return_code == 0 -@need_singularity -@pytest.mark.skip(reason="we probably don't want to support bindings as an input") -def test_wf_singularity_1(plugin, tmp_path): - """a workflow with two connected task - the first one read the file that is bounded to the container, - the second uses echo - """ - with open((tmp_path / "file_pydra.txt"), "w") as f: - f.write("hello from pydra") - - image = "docker://alpine" - wf = Workflow(name="wf", input_spec=["cmd1", "cmd2"], cache_dir=tmp_path) - wf.inputs.cmd1 = ["cat", "/tmp_dir/file_pydra.txt"] - wf.inputs.cmd2 = ["echo", "message from the previous task:"] - wf.add( - SingularityTask( - name="singu_cat", - image=image, - executable=wf.lzin.cmd1, - bindings=[(str(tmp_path), "/tmp_dir", "ro")], - strip=True, - ) - ) - wf.add( - SingularityTask( - name="singu_echo", - image=image, - executable=wf.lzin.cmd2, - args=wf.singu_cat.lzout.stdout, - strip=True, - ) - ) - wf.set_output([("out", wf.singu_echo.lzout.stdout)]) - - with Submitter(plugin=plugin) as sub: - wf(submitter=sub) - - res = wf.result() - assert res.output.out == "message from the previous task: hello from pydra" - - -@need_docker -@need_singularity -@pytest.mark.skip(reason="we probably don't want to support bindings as an input") -def test_wf_singularity_1a(plugin, tmp_path): - """a workflow with two connected task - using both containers: Docker and Singul. - the first one read the file that is bounded to the container, - the second uses echo - """ - with open((tmp_path / "file_pydra.txt"), "w") as f: - f.write("hello from pydra") - - image_sing = "docker://alpine" - image_doc = "ubuntu" - wf = Workflow(name="wf", input_spec=["cmd1", "cmd2"], cache_dir=tmp_path) - wf.inputs.cmd1 = ["cat", "/tmp_dir/file_pydra.txt"] - wf.inputs.cmd2 = ["echo", "message from the previous task:"] - wf.add( - SingularityTask( - name="singu_cat", - image=image_sing, - executable=wf.lzin.cmd1, - bindings=[(str(tmp_path), "/tmp_dir", "ro")], - strip=True, - ) - ) - wf.add( - DockerTask( - name="singu_echo", - image=image_doc, - executable=wf.lzin.cmd2, - args=wf.singu_cat.lzout.stdout, - strip=True, - ) - ) - wf.set_output([("out", wf.singu_echo.lzout.stdout)]) - - with Submitter(plugin=plugin) as sub: - wf(submitter=sub) - - res = wf.result() - assert res.output.out == "message from the previous task: hello from pydra" - - # tests with customized output_spec @@ -394,9 +175,9 @@ def test_singularity_outputspec_1(plugin, tmp_path): fields=[("newfile", File, "newfile_tmp.txt")], bases=(ShellOutSpec,), ) - singu = SingularityTask( + singu = ShellCommandTask( name="singu", - image=image, + environment=Singularity(image=image), executable=cmd, output_spec=my_output_spec, cache_dir=tmp_path, @@ -439,12 +220,12 @@ def test_singularity_inputspec_1(plugin, tmp_path): ), ) ], - bases=(SingularitySpec,), + bases=(ShellSpec,), ) - singu = SingularityTask( + singu = ShellCommandTask( name="singu", - image=image, + environment=Singularity(image=image), executable=cmd, file=filename, input_spec=my_input_spec, @@ -480,12 +261,12 @@ def test_singularity_inputspec_1a(plugin, tmp_path): ), ) ], - bases=(SingularitySpec,), + bases=(ShellSpec,), ) - singu = SingularityTask( + singu = ShellCommandTask( name="singu", - image=image, + environment=Singularity(image=image), executable=cmd, input_spec=my_input_spec, strip=True, @@ -537,12 +318,12 @@ def test_singularity_inputspec_2(plugin, tmp_path): ), ), ], - bases=(SingularitySpec,), + bases=(ShellSpec,), ) - singu = SingularityTask( + singu = ShellCommandTask( name="singu", - image=image, + environment=Singularity(image=image), executable=cmd, file1=filename_1, input_spec=my_input_spec, @@ -597,12 +378,12 @@ def test_singularity_inputspec_2a_except(plugin, tmp_path): ), ), ], - bases=(SingularitySpec,), + bases=(ShellSpec,), ) - singu = SingularityTask( + singu = ShellCommandTask( name="singu", - image=image, + environment=Singularity(image=image), executable=cmd, file2=filename_2, input_spec=my_input_spec, @@ -657,12 +438,12 @@ def test_singularity_inputspec_2a(plugin, tmp_path): ), ), ], - bases=(SingularitySpec,), + bases=(ShellSpec,), ) - singu = SingularityTask( + singu = ShellCommandTask( name="singu", - image=image, + environment=Singularity(image=image), executable=cmd, file2=filename_2, input_spec=my_input_spec, @@ -714,12 +495,12 @@ def test_singularity_cmd_inputspec_copyfile_1(plugin, tmp_path): ), ), ], - bases=(SingularitySpec,), + bases=(ShellSpec,), ) - singu = SingularityTask( + singu = ShellCommandTask( name="singu", - image=image, + environment=Singularity(image=image), executable=cmd, input_spec=my_input_spec, orig_file=str(file), @@ -739,7 +520,7 @@ def test_singularity_cmd_inputspec_copyfile_1(plugin, tmp_path): @need_singularity -def test_singularity_inputspec_state_1(plugin, tmp_path): +def test_singularity_inputspec_state_1(tmp_path): """a customised input spec for a singularity file with a splitter, splitter is on files """ @@ -770,12 +551,12 @@ def test_singularity_inputspec_state_1(plugin, tmp_path): ), ) ], - bases=(SingularitySpec,), + bases=(ShellSpec,), ) - singu = SingularityTask( + singu = ShellCommandTask( name="singu", - image=image, + environment=Singularity(image=image), executable=cmd, input_spec=my_input_spec, strip=True, @@ -820,12 +601,12 @@ def test_singularity_inputspec_state_1b(plugin, tmp_path): ), ) ], - bases=(SingularitySpec,), + bases=(ShellSpec,), ) - singu = SingularityTask( + singu = ShellCommandTask( name="singu", - image=image, + environment=Singularity(image=image), executable=cmd, input_spec=my_input_spec, strip=True, @@ -863,16 +644,16 @@ def test_singularity_wf_inputspec_1(plugin, tmp_path): ), ) ], - bases=(SingularitySpec,), + bases=(ShellSpec,), ) wf = Workflow(name="wf", input_spec=["cmd", "file"], cache_dir=tmp_path) wf.inputs.cmd = cmd wf.inputs.file = filename - singu = SingularityTask( + singu = ShellCommandTask( name="singu", - image=image, + environment=Singularity(image=image), executable=wf.lzin.cmd, file=wf.lzin.file, input_spec=my_input_spec, @@ -919,15 +700,15 @@ def test_singularity_wf_state_inputspec_1(plugin, tmp_path): ), ) ], - bases=(SingularitySpec,), + bases=(ShellSpec,), ) wf = Workflow(name="wf", input_spec=["cmd", "file"], cache_dir=tmp_path) wf.inputs.cmd = cmd - singu = SingularityTask( + singu = ShellCommandTask( name="singu", - image=image, + environment=Singularity(image=image), executable=wf.lzin.cmd, file=wf.lzin.file, input_spec=my_input_spec, @@ -976,16 +757,16 @@ def test_singularity_wf_ndst_inputspec_1(plugin, tmp_path): ), ) ], - bases=(SingularitySpec,), + bases=(ShellSpec,), ) wf = Workflow(name="wf", input_spec=["cmd", "file"], cache_dir=tmp_path) wf.inputs.cmd = cmd wf.inputs.file = filename - singu = SingularityTask( + singu = ShellCommandTask( name="singu", - image=image, + environment=Singularity(image=image), executable=wf.lzin.cmd, input_spec=my_input_spec, strip=True, diff --git a/pydra/engine/tests/test_specs.py b/pydra/engine/tests/test_specs.py index cf4f01751..0370d92a5 100644 --- a/pydra/engine/tests/test_specs.py +++ b/pydra/engine/tests/test_specs.py @@ -11,9 +11,7 @@ Runtime, Result, ShellSpec, - ContainerSpec, - DockerSpec, - SingularitySpec, + # ContainerSpec, LazyIn, LazyOut, LazyField, @@ -53,35 +51,6 @@ def test_shellspec(): assert hasattr(spec, "args") -container_attrs = ["image", "container", "container_xargs"] - - -def test_container(): - with pytest.raises(TypeError): - spec = ContainerSpec() - spec = ContainerSpec( - executable="ls", image="busybox", container="docker" - ) # (execute, args, image, cont) - assert all([hasattr(spec, attr) for attr in container_attrs]) - assert hasattr(spec, "executable") - - -def test_docker(): - with pytest.raises(TypeError): - spec = DockerSpec(executable="ls") - spec = DockerSpec(executable="ls", image="busybox") - assert all(hasattr(spec, attr) for attr in container_attrs) - assert getattr(spec, "container") == "docker" - - -def test_singularity(): - with pytest.raises(TypeError): - spec = SingularitySpec() - spec = SingularitySpec(executable="ls", image="busybox") - assert all(hasattr(spec, attr) for attr in container_attrs) - assert getattr(spec, "container") == "singularity" - - class NodeTesting: @attrs.define() class Input: diff --git a/pydra/engine/tests/test_task.py b/pydra/engine/tests/test_task.py index 070d550d5..8ff3ce6d4 100644 --- a/pydra/engine/tests/test_task.py +++ b/pydra/engine/tests/test_task.py @@ -5,10 +5,9 @@ import cloudpickle as cp from pathlib import Path import json -import glob as glob from ... import mark from ..core import Workflow -from ..task import AuditFlag, ShellCommandTask, DockerTask, SingularityTask +from ..task import AuditFlag, ShellCommandTask from ...utils.messenger import FileMessenger, PrintMessenger, collect_messages from .utils import gen_basic_wf from ..specs import ( @@ -1325,58 +1324,6 @@ def test_shell_cmd(tmpdir): assert res.output.stdout == " ".join(cmd[1:]) + "\n" -def test_container_cmds(tmpdir): - containy = DockerTask(name="containy", executable="pwd") - with pytest.raises(AttributeError) as excinfo: - containy.cmdline - assert "mandatory" in str(excinfo.value) - containy.inputs.image = "busybox" - assert containy.cmdline - - -@no_win -def test_docker_cmd(tmpdir): - docky = DockerTask(name="docky", executable="pwd", image="busybox") - assert ( - docky.cmdline - == f"docker run --rm -v {docky.output_dir}:/output_pydra:rw -w /output_pydra busybox pwd" - ) - docky.inputs.container_xargs = ["--rm", "-it"] - assert ( - docky.cmdline - == f"docker run --rm -it -v {docky.output_dir}:/output_pydra:rw -w /output_pydra busybox pwd" - ) - # TODO: we probably don't want to support container_path - # docky.inputs.bindings = [ - # ("/local/path", "/container/path", "ro"), - # ("/local2", "/container2", None), - # ] - # assert docky.cmdline == ( - # "docker run --rm -it -v /local/path:/container/path:ro" - # f" -v /local2:/container2:rw -v {docky.output_dir}:/output_pydra:rw -w /output_pydra busybox pwd" - # ) - - -@no_win -def test_singularity_cmd(tmpdir): - # todo how this should be done? - image = "library://sylabsed/linux/alpine" - singu = SingularityTask(name="singi", executable="pwd", image=image) - assert ( - singu.cmdline - == f"singularity exec -B {singu.output_dir}:/output_pydra:rw --pwd /output_pydra {image} pwd" - ) - # TODO: we probably don't want to support container_path - # singu.inputs.bindings = [ - # ("/local/path", "/container/path", "ro"), - # ("/local2", "/container2", None), - # ] - # assert singu.cmdline == ( - # "singularity exec -B /local/path:/container/path:ro" - # f" -B /local2:/container2:rw -B {singu.output_dir}:/output_pydra:rw --pwd /output_pydra {image} pwd" - # ) - - def test_functask_callable(tmpdir): # no submitter or plugin foo = funaddtwo(a=1) diff --git a/pydra/engine/tests/utils.py b/pydra/engine/tests/utils.py index a219a397b..5b0858866 100644 --- a/pydra/engine/tests/utils.py +++ b/pydra/engine/tests/utils.py @@ -18,6 +18,9 @@ shutil.which("docker") is None or sp.call(["docker", "info"]), reason="no docker within the container", ) +need_singularity = pytest.mark.skipif( + shutil.which("singularity") is None, reason="no singularity available" +) no_win = pytest.mark.skipif( sys.platform.startswith("win"), reason="docker command not adjusted for windows docker", diff --git a/pydra/engine/workers.py b/pydra/engine/workers.py index f56e3a3d1..97be3579a 100644 --- a/pydra/engine/workers.py +++ b/pydra/engine/workers.py @@ -131,19 +131,19 @@ def __init__(self, **kwargs): """Initialize worker.""" logger.debug("Initialize SerialWorker") - def run_el(self, interface, rerun=False, **kwargs): + def run_el(self, interface, rerun=False, environment=None, **kwargs): """Run a task.""" - return self.exec_serial(interface, rerun=rerun) + return self.exec_serial(interface, rerun=rerun, environment=environment) def close(self): """Return whether the task is finished.""" - async def exec_serial(self, runnable, rerun=False): + async def exec_serial(self, runnable, rerun=False, environment=None): if isinstance(runnable, TaskBase): - return runnable._run(rerun) + return runnable._run(rerun, environment=environment) else: # it could be tuple that includes pickle files with tasks and inputs ind, task_main_pkl, _ = runnable - return load_and_run(task_main_pkl, ind, rerun) + return load_and_run(task_main_pkl, ind, rerun, environment=environment) async def fetch_finished(self, futures): await asyncio.gather(*futures) @@ -165,19 +165,21 @@ def __init__(self, n_procs=None): # self.loop = asyncio.get_event_loop() logger.debug("Initialize ConcurrentFuture") - def run_el(self, runnable, rerun=False, **kwargs): + def run_el(self, runnable, rerun=False, environment=None, **kwargs): """Run a task.""" assert self.loop, "No event loop available to submit tasks" - return self.exec_as_coro(runnable, rerun=rerun) + return self.exec_as_coro(runnable, rerun=rerun, environment=environment) - async def exec_as_coro(self, runnable, rerun=False): + async def exec_as_coro(self, runnable, rerun=False, environment=None): """Run a task (coroutine wrapper).""" if isinstance(runnable, TaskBase): - res = await self.loop.run_in_executor(self.pool, runnable._run, rerun) + res = await self.loop.run_in_executor( + self.pool, runnable._run, rerun, environment + ) else: # it could be tuple that includes pickle files with tasks and inputs ind, task_main_pkl, task_orig = runnable res = await self.loop.run_in_executor( - self.pool, load_and_run, task_main_pkl, ind, rerun + self.pool, load_and_run, task_main_pkl, ind, rerun, environment ) return res @@ -215,7 +217,7 @@ def __init__(self, loop=None, max_jobs=None, poll_delay=1, sbatch_args=None): self.sbatch_args = sbatch_args or "" self.error = {} - def run_el(self, runnable, rerun=False): + def run_el(self, runnable, rerun=False, environment=None): """Worker submission API.""" script_dir, batch_script = self._prepare_runscripts(runnable, rerun=rerun) if (script_dir / script_dir.parts[1]) == gettempdir(): @@ -443,7 +445,7 @@ def __init__( self.default_qsub_args = default_qsub_args self.max_mem_free = max_mem_free - def run_el(self, runnable, rerun=False): + def run_el(self, runnable, rerun=False): # TODO: add env """Worker submission API.""" ( script_dir,