Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Env #706

Merged
merged 2 commits into from
Sep 19, 2023
Merged

Env #706

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions pydra/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
import attr

from . import mark
from .engine import AuditFlag, DockerTask, ShellCommandTask, Submitter, Workflow, specs
from .engine import AuditFlag, ShellCommandTask, Submitter, Workflow, specs

__all__ = (
"Submitter",
"Workflow",
"AuditFlag",
"ShellCommandTask",
"DockerTask",
"specs",
"mark",
)
Expand Down
3 changes: 1 addition & 2 deletions pydra/engine/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
"""The core of the workflow engine."""
from .submitter import Submitter
from .core import Workflow
from .task import AuditFlag, ShellCommandTask, DockerTask
from .task import AuditFlag, ShellCommandTask
from . import specs

__all__ = [
"AuditFlag",
"DockerTask",
"ShellCommandTask",
"Submitter",
"Workflow",
Expand Down
7 changes: 0 additions & 7 deletions pydra/engine/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -693,13 +693,6 @@ class ContainerSpec(ShellSpec):
)


@attr.s(auto_attribs=True, kw_only=True)
class DockerSpec(ContainerSpec):
"""Particularize container specifications to the Docker engine."""

container: str = attr.ib("docker", metadata={"help_string": "container"})


@attr.s(auto_attribs=True, kw_only=True)
class SingularitySpec(ContainerSpec):
"""Particularize container specifications to Singularity."""
Expand Down
111 changes: 0 additions & 111 deletions pydra/engine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
ShellSpec,
ShellOutSpec,
ContainerSpec,
DockerSpec,
SingularitySpec,
attr_fields,
)
Expand Down Expand Up @@ -225,33 +224,6 @@ class ShellCommandTask(TaskBase):
input_spec = None
output_spec = None

def __new__(cls, container_info=None, *args, **kwargs):
if not container_info:
return super().__new__(cls)

if len(container_info) == 2:
type_cont, image = container_info
else:
raise Exception(
f"container_info has to have 2 elements, but {container_info} provided"
)

if type_cont == "docker":
# changing base class of spec if user defined
if "input_spec" in kwargs:
kwargs["input_spec"].bases = (DockerSpec,)
return DockerTask(image=image, *args, **kwargs)
elif type_cont == "singularity":
# changing base class of spec if user defined
if "input_spec" in kwargs:
kwargs["input_spec"].bases = (SingularitySpec,)
return SingularityTask(image=image, *args, **kwargs)
else:
raise Exception(
f"first element of container_info has to be "
f"docker or singularity, but {container_info[0]} provided"
)

def __init__(
self,
audit_flags: AuditFlag = AuditFlag.NONE,
Expand Down Expand Up @@ -735,89 +707,6 @@ def _prepare_bindings(self):
SUPPORTED_COPY_MODES = FileSet.CopyMode.any - FileSet.CopyMode.symlink


class DockerTask(ContainerTask):
"""Extend shell command task for containerized execution with the Docker Engine."""

init = False

def __init__(
self,
name=None,
audit_flags: AuditFlag = AuditFlag.NONE,
cache_dir=None,
input_spec: ty.Optional[SpecInfo] = None,
messenger_args=None,
messengers=None,
output_cpath="/output_pydra",
output_spec: ty.Optional[SpecInfo] = None,
rerun=False,
strip=False,
**kwargs,
):
"""
Initialize this task.

Parameters
----------
name : :obj:`str`
Name of this task.
audit_flags : :obj:`pydra.utils.messenger.AuditFlag`
Auditing configuration
cache_dir : :obj:`os.pathlike`
Cache directory
input_spec : :obj:`pydra.engine.specs.SpecInfo`
Specification of inputs.
messenger_args :
TODO
messengers :
TODO
output_cpath : :obj:`str`
Output path within the container filesystem.
output_spec : :obj:`pydra.engine.specs.BaseSpec`
Specification of inputs.
strip : :obj:`bool`
TODO

"""
if not self.init:
if input_spec is None:
input_spec = SpecInfo(name="Inputs", fields=[], bases=(DockerSpec,))
super().__init__(
name=name,
input_spec=input_spec,
output_spec=output_spec,
audit_flags=audit_flags,
messengers=messengers,
messenger_args=messenger_args,
cache_dir=cache_dir,
strip=strip,
output_cpath=output_cpath,
rerun=rerun,
**kwargs,
)
self.inputs.container_xargs = ["--rm"]
self.init = True

@property
def container_args(self):
"""Get container-specific CLI arguments, returns a list if the task has a state"""
if is_lazy(self.inputs):
raise Exception("can't return container_args, self.inputs has LazyFields")
self.container_check("docker")
if self.state:
raise NotImplementedError

cargs = ["docker", "run"]
if self.inputs.container_xargs is not None:
cargs.extend(self.inputs.container_xargs)

cargs.extend(self.binds("-v"))
cargs.extend(["-w", str(self.output_cpath)])
cargs.append(self.inputs.image)

return cargs


class SingularityTask(ContainerTask):
"""Extend shell command task for containerized execution with Singularity."""

Expand Down
200 changes: 2 additions & 198 deletions pydra/engine/tests/test_dockertask.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import pytest
import attr

from ..task import DockerTask, ShellCommandTask
from ..task import ShellCommandTask
from ..submitter import Submitter
from ..core import Workflow
from ..specs import ShellOutSpec, SpecInfo, File, DockerSpec, ShellSpec
from ..specs import ShellOutSpec, SpecInfo, File, ShellSpec
from ..environments import Docker
from .utils import no_win, need_docker, result_submitter, result_no_submitter

Expand Down Expand Up @@ -113,202 +113,6 @@ def test_docker_st_1(results_function, plugin):
assert res[0].output.return_code == res[1].output.return_code == 0


# tests with workflows


# TODO: to remove or update
# @no_win
# @need_docker
# @pytest.mark.skip(reason="we probably don't want to support bindings as an input")
# def test_wf_docker_1(plugin, tmp_path):
# """a workflow with two connected task
# the first one read the file that is bounded to the container,
# the second uses echo
# """
# with open(tmp_path / "file_pydra.txt"), "w" as f:
# f.write("hello from pydra")
#
# wf = Workflow(name="wf", input_spec=["cmd1", "cmd2"])
# wf.inputs.cmd1 = ["cat", "/tmp_dir/file_pydra.txt"]
# wf.inputs.cmd2 = ["echo", "message from the previous task:"]
# wf.add(
# DockerTask(
# name="docky_cat",
# image="busybox",
# executable=wf.lzin.cmd1,
# bindings=[(str(tmp_path), "/tmp_dir", "ro")],
# strip=True,
# )
# )
# wf.add(
# DockerTask(
# name="docky_echo",
# image="ubuntu",
# executable=wf.lzin.cmd2,
# args=wf.docky_cat.lzout.stdout,
# strip=True,
# )
# )
# wf.set_output([("out", wf.docky_echo.lzout.stdout)])
#
# with pytest.raises(Exception) as excinfo:
# wf.docky_echo.cmdline
# assert "can't return cmdline" in str(excinfo.value)
#
# with Submitter(plugin=plugin) as sub:
# wf(submitter=sub)
#
# res = wf.result()
# assert res.output.out == "message from the previous task: hello from pydra"
#
#
# @no_win
# @need_docker
# @pytest.mark.skip(reason="we probably don't want to support bindings as an input")
# def test_wf_docker_1_dockerflag(plugin, tmp_path):
# """a workflow with two connected task
# the first one read the file that is bounded to the container,
# the second uses echo
# using ShellComandTask with container_info
# """
# with open(tmp_path / "file_pydra.txt"), "w" as f:
# f.write("hello from pydra")
#
# wf = Workflow(name="wf", input_spec=["cmd1", "cmd2"])
# wf.inputs.cmd1 = ["cat", "/tmp_dir/file_pydra.txt"]
# wf.inputs.cmd2 = ["echo", "message from the previous task:"]
# wf.add(
# ShellCommandTask(
# name="shocky_cat",
# container_info=("docker", "busybox", [(str(tmp_path), "/tmp_dir", "ro")]),
# executable=wf.lzin.cmd1,
# strip=True,
# )
# )
# wf.add(
# ShellCommandTask(
# name="shocky_echo",
# executable=wf.lzin.cmd2,
# args=wf.shocky_cat.lzout.stdout,
# strip=True,
# container_info=("docker", "ubuntu"),
# )
# )
# wf.set_output([("out", wf.shocky_echo.lzout.stdout)])
#
# with Submitter(plugin=plugin) as sub:
# wf(submitter=sub)
#
# res = wf.result()
# assert res.output.out == "message from the previous task: hello from pydra"
#
#
# @no_win
# @need_docker
# @pytest.mark.skip(reason="we probably don't want to support bindings as an input")
# def test_wf_docker_2pre(plugin, tmp_path, data_tests_dir):
# """a workflow with two connected task that run python scripts
# the first one creates a text file and the second one reads the file
# """
#
# cmd1 = ["python", "/scripts/saving.py", "-f", "/outputs/tmp.txt"]
# dt = DockerTask(
# name="save",
# image="python:3.7-alpine",
# executable=cmd1,
# bindings=[(str(tmp_path), "/outputs"), (str(data_tests_dir), "/scripts", "ro")],
# strip=True,
# )
# res = dt(plugin=plugin)
# assert res.output.stdout == "/outputs/tmp.txt"
#
#
# @no_win
# @need_docker
# @pytest.mark.skip(reason="we probably don't want to support bindings as an input")
# def test_wf_docker_2(plugin, tmp_path, data_tests_dir):
# """a workflow with two connected task that run python scripts
# the first one creates a text file and the second one reads the file
# """
#
# wf = Workflow(name="wf", input_spec=["cmd1", "cmd2"])
# wf.inputs.cmd1 = ["python", "/scripts/saving.py", "-f", "/outputs/tmp.txt"]
# wf.inputs.cmd2 = ["python", "/scripts/loading.py", "-f"]
# wf.add(
# DockerTask(
# name="save",
# image="python:3.7-alpine",
# executable=wf.lzin.cmd1,
# bindings=[
# (str(tmp_path), "/outputs"),
# (str(data_tests_dir), "/scripts", "ro"),
# ],
# strip=True,
# )
# )
# wf.add(
# DockerTask(
# name="load",
# image="python:3.7-alpine",
# executable=wf.lzin.cmd2,
# args=wf.save.lzout.stdout,
# bindings=[
# (str(tmp_path), "/outputs"),
# (str(data_tests_dir), "/scripts", "ro"),
# ],
# strip=True,
# )
# )
# wf.set_output([("out", wf.load.lzout.stdout)])
#
# with Submitter(plugin=plugin) as sub:
# wf(submitter=sub)
#
# res = wf.result()
# assert res.output.out == "Hello!"
#
#
# @no_win
# @need_docker
# @pytest.mark.skip(reason="we probably don't want to support bindings as an input")
# def test_wf_docker_3(plugin, tmp_path):
# """a workflow with two connected task
# the first one read the file that contains the name of the image,
# the output is passed to the second task as the image used to run the task
# """
# with open(tmp_path / "image.txt"), "w" as f:
# f.write("ubuntu")
#
# wf = Workflow(name="wf", input_spec=["cmd1", "cmd2"])
# wf.inputs.cmd1 = ["cat", "/tmp_dir/image.txt"]
# wf.inputs.cmd2 = ["echo", "image passed to the second task:"]
# wf.add(
# DockerTask(
# name="docky_cat",
# image="busybox",
# executable=wf.lzin.cmd1,
# bindings=[(str(tmp_path), "/tmp_dir", "ro")],
# strip=True,
# )
# )
# wf.add(
# DockerTask(
# name="docky_echo",
# image=wf.docky_cat.lzout.stdout,
# executable=wf.lzin.cmd2,
# args=wf.docky_cat.lzout.stdout,
# strip=True,
# )
# )
# wf.set_output([("out", wf.docky_echo.lzout.stdout)])
#
# with Submitter(plugin=plugin) as sub:
# wf(submitter=sub)
#
# res = wf.result()
# assert res.output.out == "image passed to the second task: ubuntu"


# tests with customized output_spec


Expand Down
Loading
Loading