From aaa399f362f3020994d60ef64ce0f49d56e43e86 Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Sun, 17 Sep 2023 22:30:49 -0400 Subject: [PATCH 1/2] adding environment to run_el for slurm worker --- pydra/engine/workers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pydra/engine/workers.py b/pydra/engine/workers.py index 022b89e9e..a39956f0f 100644 --- a/pydra/engine/workers.py +++ b/pydra/engine/workers.py @@ -217,7 +217,7 @@ def __init__(self, loop=None, max_jobs=None, poll_delay=1, sbatch_args=None): self.sbatch_args = sbatch_args or "" self.error = {} - def run_el(self, runnable, rerun=False): # TODO: add env + def run_el(self, runnable, rerun=False, environment=None): """Worker submission API.""" script_dir, batch_script = self._prepare_runscripts(runnable, rerun=rerun) if (script_dir / script_dir.parts[1]) == gettempdir(): From 7d39b4d4deec482970e98e3e74dcb585673a5789 Mon Sep 17 00:00:00 2001 From: Dorota Jarecka Date: Mon, 18 Sep 2023 21:37:33 -0400 Subject: [PATCH 2/2] cleaning DockerTask remains --- pydra/__init__.py | 3 +- pydra/engine/__init__.py | 3 +- pydra/engine/specs.py | 7 - pydra/engine/task.py | 111 -------------- pydra/engine/tests/test_dockertask.py | 200 +------------------------ pydra/engine/tests/test_singularity.py | 2 +- pydra/engine/tests/test_specs.py | 9 -- pydra/engine/tests/test_task.py | 55 +------ 8 files changed, 6 insertions(+), 384 deletions(-) diff --git a/pydra/__init__.py b/pydra/__init__.py index 34e1b2578..1422e46fd 100644 --- a/pydra/__init__.py +++ b/pydra/__init__.py @@ -15,14 +15,13 @@ import attr from . import mark -from .engine import AuditFlag, DockerTask, ShellCommandTask, Submitter, Workflow, specs +from .engine import AuditFlag, ShellCommandTask, Submitter, Workflow, specs __all__ = ( "Submitter", "Workflow", "AuditFlag", "ShellCommandTask", - "DockerTask", "specs", "mark", ) diff --git a/pydra/engine/__init__.py b/pydra/engine/__init__.py index 411af8a1d..9f46b7ea3 100644 --- a/pydra/engine/__init__.py +++ b/pydra/engine/__init__.py @@ -1,12 +1,11 @@ """The core of the workflow engine.""" from .submitter import Submitter from .core import Workflow -from .task import AuditFlag, ShellCommandTask, DockerTask +from .task import AuditFlag, ShellCommandTask from . import specs __all__ = [ "AuditFlag", - "DockerTask", "ShellCommandTask", "Submitter", "Workflow", diff --git a/pydra/engine/specs.py b/pydra/engine/specs.py index c31705be7..9de048d85 100644 --- a/pydra/engine/specs.py +++ b/pydra/engine/specs.py @@ -693,13 +693,6 @@ class ContainerSpec(ShellSpec): ) -@attr.s(auto_attribs=True, kw_only=True) -class DockerSpec(ContainerSpec): - """Particularize container specifications to the Docker engine.""" - - container: str = attr.ib("docker", metadata={"help_string": "container"}) - - @attr.s(auto_attribs=True, kw_only=True) class SingularitySpec(ContainerSpec): """Particularize container specifications to Singularity.""" diff --git a/pydra/engine/task.py b/pydra/engine/task.py index bcf5e3ef3..6cd476f12 100644 --- a/pydra/engine/task.py +++ b/pydra/engine/task.py @@ -58,7 +58,6 @@ ShellSpec, ShellOutSpec, ContainerSpec, - DockerSpec, SingularitySpec, attr_fields, ) @@ -225,33 +224,6 @@ class ShellCommandTask(TaskBase): input_spec = None output_spec = None - def __new__(cls, container_info=None, *args, **kwargs): - if not container_info: - return super().__new__(cls) - - if len(container_info) == 2: - type_cont, image = container_info - else: - raise Exception( - f"container_info has to have 2 elements, but {container_info} provided" - ) - - if type_cont == "docker": - # changing base class of spec if user defined - if "input_spec" in kwargs: - kwargs["input_spec"].bases = (DockerSpec,) - return DockerTask(image=image, *args, **kwargs) - elif type_cont == "singularity": - # changing base class of spec if user defined - if "input_spec" in kwargs: - kwargs["input_spec"].bases = (SingularitySpec,) - return SingularityTask(image=image, *args, **kwargs) - else: - raise Exception( - f"first element of container_info has to be " - f"docker or singularity, but {container_info[0]} provided" - ) - def __init__( self, audit_flags: AuditFlag = AuditFlag.NONE, @@ -735,89 +707,6 @@ def _prepare_bindings(self): SUPPORTED_COPY_MODES = FileSet.CopyMode.any - FileSet.CopyMode.symlink -class DockerTask(ContainerTask): - """Extend shell command task for containerized execution with the Docker Engine.""" - - init = False - - def __init__( - self, - name=None, - audit_flags: AuditFlag = AuditFlag.NONE, - cache_dir=None, - input_spec: ty.Optional[SpecInfo] = None, - messenger_args=None, - messengers=None, - output_cpath="/output_pydra", - output_spec: ty.Optional[SpecInfo] = None, - rerun=False, - strip=False, - **kwargs, - ): - """ - Initialize this task. - - Parameters - ---------- - name : :obj:`str` - Name of this task. - audit_flags : :obj:`pydra.utils.messenger.AuditFlag` - Auditing configuration - cache_dir : :obj:`os.pathlike` - Cache directory - input_spec : :obj:`pydra.engine.specs.SpecInfo` - Specification of inputs. - messenger_args : - TODO - messengers : - TODO - output_cpath : :obj:`str` - Output path within the container filesystem. - output_spec : :obj:`pydra.engine.specs.BaseSpec` - Specification of inputs. - strip : :obj:`bool` - TODO - - """ - if not self.init: - if input_spec is None: - input_spec = SpecInfo(name="Inputs", fields=[], bases=(DockerSpec,)) - super().__init__( - name=name, - input_spec=input_spec, - output_spec=output_spec, - audit_flags=audit_flags, - messengers=messengers, - messenger_args=messenger_args, - cache_dir=cache_dir, - strip=strip, - output_cpath=output_cpath, - rerun=rerun, - **kwargs, - ) - self.inputs.container_xargs = ["--rm"] - self.init = True - - @property - def container_args(self): - """Get container-specific CLI arguments, returns a list if the task has a state""" - if is_lazy(self.inputs): - raise Exception("can't return container_args, self.inputs has LazyFields") - self.container_check("docker") - if self.state: - raise NotImplementedError - - cargs = ["docker", "run"] - if self.inputs.container_xargs is not None: - cargs.extend(self.inputs.container_xargs) - - cargs.extend(self.binds("-v")) - cargs.extend(["-w", str(self.output_cpath)]) - cargs.append(self.inputs.image) - - return cargs - - class SingularityTask(ContainerTask): """Extend shell command task for containerized execution with Singularity.""" diff --git a/pydra/engine/tests/test_dockertask.py b/pydra/engine/tests/test_dockertask.py index cd50f7a3c..5ccf37e29 100644 --- a/pydra/engine/tests/test_dockertask.py +++ b/pydra/engine/tests/test_dockertask.py @@ -2,10 +2,10 @@ import pytest import attr -from ..task import DockerTask, ShellCommandTask +from ..task import ShellCommandTask from ..submitter import Submitter from ..core import Workflow -from ..specs import ShellOutSpec, SpecInfo, File, DockerSpec, ShellSpec +from ..specs import ShellOutSpec, SpecInfo, File, ShellSpec from ..environments import Docker from .utils import no_win, need_docker, result_submitter, result_no_submitter @@ -113,202 +113,6 @@ def test_docker_st_1(results_function, plugin): assert res[0].output.return_code == res[1].output.return_code == 0 -# tests with workflows - - -# TODO: to remove or update -# @no_win -# @need_docker -# @pytest.mark.skip(reason="we probably don't want to support bindings as an input") -# def test_wf_docker_1(plugin, tmp_path): -# """a workflow with two connected task -# the first one read the file that is bounded to the container, -# the second uses echo -# """ -# with open(tmp_path / "file_pydra.txt"), "w" as f: -# f.write("hello from pydra") -# -# wf = Workflow(name="wf", input_spec=["cmd1", "cmd2"]) -# wf.inputs.cmd1 = ["cat", "/tmp_dir/file_pydra.txt"] -# wf.inputs.cmd2 = ["echo", "message from the previous task:"] -# wf.add( -# DockerTask( -# name="docky_cat", -# image="busybox", -# executable=wf.lzin.cmd1, -# bindings=[(str(tmp_path), "/tmp_dir", "ro")], -# strip=True, -# ) -# ) -# wf.add( -# DockerTask( -# name="docky_echo", -# image="ubuntu", -# executable=wf.lzin.cmd2, -# args=wf.docky_cat.lzout.stdout, -# strip=True, -# ) -# ) -# wf.set_output([("out", wf.docky_echo.lzout.stdout)]) -# -# with pytest.raises(Exception) as excinfo: -# wf.docky_echo.cmdline -# assert "can't return cmdline" in str(excinfo.value) -# -# with Submitter(plugin=plugin) as sub: -# wf(submitter=sub) -# -# res = wf.result() -# assert res.output.out == "message from the previous task: hello from pydra" -# -# -# @no_win -# @need_docker -# @pytest.mark.skip(reason="we probably don't want to support bindings as an input") -# def test_wf_docker_1_dockerflag(plugin, tmp_path): -# """a workflow with two connected task -# the first one read the file that is bounded to the container, -# the second uses echo -# using ShellComandTask with container_info -# """ -# with open(tmp_path / "file_pydra.txt"), "w" as f: -# f.write("hello from pydra") -# -# wf = Workflow(name="wf", input_spec=["cmd1", "cmd2"]) -# wf.inputs.cmd1 = ["cat", "/tmp_dir/file_pydra.txt"] -# wf.inputs.cmd2 = ["echo", "message from the previous task:"] -# wf.add( -# ShellCommandTask( -# name="shocky_cat", -# container_info=("docker", "busybox", [(str(tmp_path), "/tmp_dir", "ro")]), -# executable=wf.lzin.cmd1, -# strip=True, -# ) -# ) -# wf.add( -# ShellCommandTask( -# name="shocky_echo", -# executable=wf.lzin.cmd2, -# args=wf.shocky_cat.lzout.stdout, -# strip=True, -# container_info=("docker", "ubuntu"), -# ) -# ) -# wf.set_output([("out", wf.shocky_echo.lzout.stdout)]) -# -# with Submitter(plugin=plugin) as sub: -# wf(submitter=sub) -# -# res = wf.result() -# assert res.output.out == "message from the previous task: hello from pydra" -# -# -# @no_win -# @need_docker -# @pytest.mark.skip(reason="we probably don't want to support bindings as an input") -# def test_wf_docker_2pre(plugin, tmp_path, data_tests_dir): -# """a workflow with two connected task that run python scripts -# the first one creates a text file and the second one reads the file -# """ -# -# cmd1 = ["python", "/scripts/saving.py", "-f", "/outputs/tmp.txt"] -# dt = DockerTask( -# name="save", -# image="python:3.7-alpine", -# executable=cmd1, -# bindings=[(str(tmp_path), "/outputs"), (str(data_tests_dir), "/scripts", "ro")], -# strip=True, -# ) -# res = dt(plugin=plugin) -# assert res.output.stdout == "/outputs/tmp.txt" -# -# -# @no_win -# @need_docker -# @pytest.mark.skip(reason="we probably don't want to support bindings as an input") -# def test_wf_docker_2(plugin, tmp_path, data_tests_dir): -# """a workflow with two connected task that run python scripts -# the first one creates a text file and the second one reads the file -# """ -# -# wf = Workflow(name="wf", input_spec=["cmd1", "cmd2"]) -# wf.inputs.cmd1 = ["python", "/scripts/saving.py", "-f", "/outputs/tmp.txt"] -# wf.inputs.cmd2 = ["python", "/scripts/loading.py", "-f"] -# wf.add( -# DockerTask( -# name="save", -# image="python:3.7-alpine", -# executable=wf.lzin.cmd1, -# bindings=[ -# (str(tmp_path), "/outputs"), -# (str(data_tests_dir), "/scripts", "ro"), -# ], -# strip=True, -# ) -# ) -# wf.add( -# DockerTask( -# name="load", -# image="python:3.7-alpine", -# executable=wf.lzin.cmd2, -# args=wf.save.lzout.stdout, -# bindings=[ -# (str(tmp_path), "/outputs"), -# (str(data_tests_dir), "/scripts", "ro"), -# ], -# strip=True, -# ) -# ) -# wf.set_output([("out", wf.load.lzout.stdout)]) -# -# with Submitter(plugin=plugin) as sub: -# wf(submitter=sub) -# -# res = wf.result() -# assert res.output.out == "Hello!" -# -# -# @no_win -# @need_docker -# @pytest.mark.skip(reason="we probably don't want to support bindings as an input") -# def test_wf_docker_3(plugin, tmp_path): -# """a workflow with two connected task -# the first one read the file that contains the name of the image, -# the output is passed to the second task as the image used to run the task -# """ -# with open(tmp_path / "image.txt"), "w" as f: -# f.write("ubuntu") -# -# wf = Workflow(name="wf", input_spec=["cmd1", "cmd2"]) -# wf.inputs.cmd1 = ["cat", "/tmp_dir/image.txt"] -# wf.inputs.cmd2 = ["echo", "image passed to the second task:"] -# wf.add( -# DockerTask( -# name="docky_cat", -# image="busybox", -# executable=wf.lzin.cmd1, -# bindings=[(str(tmp_path), "/tmp_dir", "ro")], -# strip=True, -# ) -# ) -# wf.add( -# DockerTask( -# name="docky_echo", -# image=wf.docky_cat.lzout.stdout, -# executable=wf.lzin.cmd2, -# args=wf.docky_cat.lzout.stdout, -# strip=True, -# ) -# ) -# wf.set_output([("out", wf.docky_echo.lzout.stdout)]) -# -# with Submitter(plugin=plugin) as sub: -# wf(submitter=sub) -# -# res = wf.result() -# assert res.output.out == "image passed to the second task: ubuntu" - - # tests with customized output_spec diff --git a/pydra/engine/tests/test_singularity.py b/pydra/engine/tests/test_singularity.py index 2dc239da7..bb6506aa9 100644 --- a/pydra/engine/tests/test_singularity.py +++ b/pydra/engine/tests/test_singularity.py @@ -3,7 +3,7 @@ import pytest import attr -from ..task import SingularityTask, DockerTask, ShellCommandTask +from ..task import SingularityTask, ShellCommandTask from ..submitter import Submitter from ..core import Workflow from ..specs import ShellOutSpec, SpecInfo, File, SingularitySpec diff --git a/pydra/engine/tests/test_specs.py b/pydra/engine/tests/test_specs.py index cf4f01751..51af95a75 100644 --- a/pydra/engine/tests/test_specs.py +++ b/pydra/engine/tests/test_specs.py @@ -12,7 +12,6 @@ Result, ShellSpec, ContainerSpec, - DockerSpec, SingularitySpec, LazyIn, LazyOut, @@ -66,14 +65,6 @@ def test_container(): assert hasattr(spec, "executable") -def test_docker(): - with pytest.raises(TypeError): - spec = DockerSpec(executable="ls") - spec = DockerSpec(executable="ls", image="busybox") - assert all(hasattr(spec, attr) for attr in container_attrs) - assert getattr(spec, "container") == "docker" - - def test_singularity(): with pytest.raises(TypeError): spec = SingularitySpec() diff --git a/pydra/engine/tests/test_task.py b/pydra/engine/tests/test_task.py index 070d550d5..8ff3ce6d4 100644 --- a/pydra/engine/tests/test_task.py +++ b/pydra/engine/tests/test_task.py @@ -5,10 +5,9 @@ import cloudpickle as cp from pathlib import Path import json -import glob as glob from ... import mark from ..core import Workflow -from ..task import AuditFlag, ShellCommandTask, DockerTask, SingularityTask +from ..task import AuditFlag, ShellCommandTask from ...utils.messenger import FileMessenger, PrintMessenger, collect_messages from .utils import gen_basic_wf from ..specs import ( @@ -1325,58 +1324,6 @@ def test_shell_cmd(tmpdir): assert res.output.stdout == " ".join(cmd[1:]) + "\n" -def test_container_cmds(tmpdir): - containy = DockerTask(name="containy", executable="pwd") - with pytest.raises(AttributeError) as excinfo: - containy.cmdline - assert "mandatory" in str(excinfo.value) - containy.inputs.image = "busybox" - assert containy.cmdline - - -@no_win -def test_docker_cmd(tmpdir): - docky = DockerTask(name="docky", executable="pwd", image="busybox") - assert ( - docky.cmdline - == f"docker run --rm -v {docky.output_dir}:/output_pydra:rw -w /output_pydra busybox pwd" - ) - docky.inputs.container_xargs = ["--rm", "-it"] - assert ( - docky.cmdline - == f"docker run --rm -it -v {docky.output_dir}:/output_pydra:rw -w /output_pydra busybox pwd" - ) - # TODO: we probably don't want to support container_path - # docky.inputs.bindings = [ - # ("/local/path", "/container/path", "ro"), - # ("/local2", "/container2", None), - # ] - # assert docky.cmdline == ( - # "docker run --rm -it -v /local/path:/container/path:ro" - # f" -v /local2:/container2:rw -v {docky.output_dir}:/output_pydra:rw -w /output_pydra busybox pwd" - # ) - - -@no_win -def test_singularity_cmd(tmpdir): - # todo how this should be done? - image = "library://sylabsed/linux/alpine" - singu = SingularityTask(name="singi", executable="pwd", image=image) - assert ( - singu.cmdline - == f"singularity exec -B {singu.output_dir}:/output_pydra:rw --pwd /output_pydra {image} pwd" - ) - # TODO: we probably don't want to support container_path - # singu.inputs.bindings = [ - # ("/local/path", "/container/path", "ro"), - # ("/local2", "/container2", None), - # ] - # assert singu.cmdline == ( - # "singularity exec -B /local/path:/container/path:ro" - # f" -B /local2:/container2:rw -B {singu.output_dir}:/output_pydra:rw --pwd /output_pydra {image} pwd" - # ) - - def test_functask_callable(tmpdir): # no submitter or plugin foo = funaddtwo(a=1)