Skip to content

Commit

Permalink
Merge 0b0c71b into 0e3d33c
Browse files Browse the repository at this point in the history
  • Loading branch information
effigies authored Nov 15, 2023
2 parents 0e3d33c + 0b0c71b commit f3c1d35
Show file tree
Hide file tree
Showing 14 changed files with 954 additions and 1,497 deletions.
3 changes: 1 addition & 2 deletions pydra/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
import attr

from . import mark
from .engine import AuditFlag, DockerTask, ShellCommandTask, Submitter, Workflow, specs
from .engine import AuditFlag, ShellCommandTask, Submitter, Workflow, specs

__all__ = (
"Submitter",
"Workflow",
"AuditFlag",
"ShellCommandTask",
"DockerTask",
"specs",
"mark",
)
Expand Down
3 changes: 1 addition & 2 deletions pydra/engine/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
"""The core of the workflow engine."""
from .submitter import Submitter
from .core import Workflow
from .task import AuditFlag, ShellCommandTask, DockerTask
from .task import AuditFlag, ShellCommandTask
from . import specs

__all__ = [
"AuditFlag",
"DockerTask",
"ShellCommandTask",
"Submitter",
"Workflow",
Expand Down
23 changes: 14 additions & 9 deletions pydra/engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,13 @@ def cont_dim(self, cont_dim):
self._cont_dim = cont_dim

def __call__(
self, submitter=None, plugin=None, plugin_kwargs=None, rerun=False, **kwargs
self,
submitter=None,
plugin=None,
plugin_kwargs=None,
rerun=False,
environment=None,
**kwargs,
):
"""Make tasks callable themselves."""
from .submitter import Submitter
Expand All @@ -449,9 +455,9 @@ def __call__(
if submitter:
with submitter as sub:
self.inputs = attr.evolve(self.inputs, **kwargs)
res = sub(self)
res = sub(self, environment=environment)
else: # tasks without state could be run without a submitter
res = self._run(rerun=rerun, **kwargs)
res = self._run(rerun=rerun, environment=environment, **kwargs)
return res

def _modify_inputs(self):
Expand Down Expand Up @@ -501,7 +507,7 @@ def _populate_filesystem(self, checksum, output_dir):
shutil.rmtree(output_dir)
output_dir.mkdir(parents=False, exist_ok=self.can_resume)

def _run(self, rerun=False, **kwargs):
def _run(self, rerun=False, environment=None, **kwargs):
self.inputs = attr.evolve(self.inputs, **kwargs)
self.inputs.check_fields_input_spec()

Expand All @@ -518,6 +524,7 @@ def _run(self, rerun=False, **kwargs):
return result
cwd = os.getcwd()
self._populate_filesystem(checksum, output_dir)
os.chdir(output_dir)
orig_inputs = self._modify_inputs()
result = Result(output=None, runtime=None, errored=False)
self.hooks.pre_run_task(self)
Expand All @@ -526,7 +533,7 @@ def _run(self, rerun=False, **kwargs):
self.audit.audit_task(task=self)
try:
self.audit.monitor()
self._run_task()
self._run_task(environment=environment)
result.output = self._collect_outputs(output_dir=output_dir)
except Exception:
etype, eval, etr = sys.exc_info()
Expand All @@ -538,7 +545,6 @@ def _run(self, rerun=False, **kwargs):
self.hooks.post_run_task(self, result)
self.audit.finalize_audit(result)
save(output_dir, result=result, task=self)
self.output_ = None
# removing the additional file with the chcksum
(self.cache_dir / f"{self.uid}_info.json").unlink()
# # function etc. shouldn't change anyway, so removing
Expand All @@ -551,15 +557,14 @@ def _run(self, rerun=False, **kwargs):
return result

def _collect_outputs(self, output_dir):
run_output = self.output_
output_klass = make_klass(self.output_spec)
output = output_klass(
**{f.name: attr.NOTHING for f in attr.fields(output_klass)}
)
other_output = output.collect_additional_outputs(
self.inputs, output_dir, run_output
self.inputs, output_dir, self.output_
)
return attr.evolve(output, **run_output, **other_output)
return attr.evolve(output, **self.output_, **other_output)

def split(
self,
Expand Down
157 changes: 157 additions & 0 deletions pydra/engine/environments.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
from .helpers import execute

from pathlib import Path


class Environment:
"""
Base class for environments that are used to execute tasks.
Right now it is asssumed that the environment, including container images,
are available and are not removed at the end
TODO: add setup and teardown methods
"""

def setup(self):
pass

Check warning on line 15 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L15

Added line #L15 was not covered by tests

def execute(self, task):
"""
Execute the task in the environment.
Parameters
----------
task : TaskBase
the task to execute
Returns
-------
output
Output of the task.
"""
raise NotImplementedError

Check warning on line 31 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L31

Added line #L31 was not covered by tests

def teardown(self):
pass

Check warning on line 34 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L34

Added line #L34 was not covered by tests


class Native(Environment):
"""
Native environment, i.e. the tasks are executed in the current python environment.
"""

def execute(self, task):
keys = ["return_code", "stdout", "stderr"]
values = execute(task.command_args(), strip=task.strip)
output = dict(zip(keys, values))
if output["return_code"]:
msg = f"Error running '{task.name}' task with {task.command_args()}:"
if output["stderr"]:
msg += "\n\nstderr:\n" + output["stderr"]
if output["stdout"]:
msg += "\n\nstdout:\n" + output["stdout"]
raise RuntimeError(msg)
return output


class Container(Environment):
"""
Base class for container environments used by Docker and Singularity.
Parameters
----------
image : str
Name of the container image
tag : str
Tag of the container image
root : str
Base path for mounting host directories into the container
xargs : Union[str, List[str]]
Extra arguments to be passed to the container
"""

def __init__(self, image, tag="latest", root="/mnt/pydra", xargs=None):
self.image = image
self.tag = tag
if xargs is None:
xargs = []
elif isinstance(xargs, str):
xargs = xargs.split()
self.xargs = xargs
self.root = root

def bind(self, loc, mode="ro"):
loc_abs = Path(loc).absolute()
return f"{loc_abs}:{self.root}{loc_abs}:{mode}"


class Docker(Container):
"""Docker environment."""

def execute(self, task):
docker_img = f"{self.image}:{self.tag}"
# mounting all input locations
mounts = task.get_bindings(root=self.root)

docker_args = [
"docker",
"run",
"-v",
self.bind(task.cache_dir, "rw"),
*self.xargs,
]
docker_args.extend(
" ".join(
[f"-v {key}:{val[0]}:{val[1]}" for (key, val) in mounts.items()]
).split()
)
docker_args.extend(["-w", f"{self.root}{task.output_dir}"])
keys = ["return_code", "stdout", "stderr"]

values = execute(
docker_args + [docker_img] + task.command_args(root=self.root),
strip=task.strip,
)
output = dict(zip(keys, values))
if output["return_code"]:
if output["stderr"]:
raise RuntimeError(output["stderr"])

Check warning on line 117 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L116-L117

Added lines #L116 - L117 were not covered by tests
else:
raise RuntimeError(output["stdout"])

Check warning on line 119 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L119

Added line #L119 was not covered by tests
return output


class Singularity(Container):
"""Singularity environment."""

def execute(self, task):
singularity_img = f"{self.image}:{self.tag}"

Check warning on line 127 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L127

Added line #L127 was not covered by tests
# mounting all input locations
mounts = task.get_bindings(root=self.root)

Check warning on line 129 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L129

Added line #L129 was not covered by tests

# todo adding xargsy etc
singularity_args = [

Check warning on line 132 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L132

Added line #L132 was not covered by tests
"singularity",
"exec",
"-B",
self.bind(task.cache_dir, "rw"),
*self.xargs,
]
singularity_args.extend(

Check warning on line 139 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L139

Added line #L139 was not covered by tests
" ".join(
[f"-B {key}:{val[0]}:{val[1]}" for (key, val) in mounts.items()]
).split()
)
singularity_args.extend(["--pwd", f"{self.root}{task.output_dir}"])
keys = ["return_code", "stdout", "stderr"]

Check warning on line 145 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L144-L145

Added lines #L144 - L145 were not covered by tests

values = execute(

Check warning on line 147 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L147

Added line #L147 was not covered by tests
singularity_args + [singularity_img] + task.command_args(root=self.root),
strip=task.strip,
)
output = dict(zip(keys, values))
if output["return_code"]:
if output["stderr"]:
raise RuntimeError(output["stderr"])

Check warning on line 154 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L151-L154

Added lines #L151 - L154 were not covered by tests
else:
raise RuntimeError(output["stdout"])
return output

Check warning on line 157 in pydra/engine/environments.py

View check run for this annotation

Codecov / codecov/patch

pydra/engine/environments.py#L156-L157

Added lines #L156 - L157 were not covered by tests
31 changes: 0 additions & 31 deletions pydra/engine/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,37 +676,6 @@ def _check_requires(self, fld, inputs):
return False


@attr.s(auto_attribs=True, kw_only=True)
class ContainerSpec(ShellSpec):
"""Refine the generic command-line specification to container execution."""

image: ty.Union[File, str] = attr.ib(
metadata={"help_string": "image", "mandatory": True}
)
"""The image to be containerized."""
container: ty.Union[File, str, None] = attr.ib(
metadata={"help_string": "container"}
)
"""The container."""
container_xargs: ty.Optional[ty.List[str]] = attr.ib(
default=None, metadata={"help_string": "todo"}
)


@attr.s(auto_attribs=True, kw_only=True)
class DockerSpec(ContainerSpec):
"""Particularize container specifications to the Docker engine."""

container: str = attr.ib("docker", metadata={"help_string": "container"})


@attr.s(auto_attribs=True, kw_only=True)
class SingularitySpec(ContainerSpec):
"""Particularize container specifications to Singularity."""

container: str = attr.ib("singularity", metadata={"help_string": "container type"})


@attr.s
class LazyInterface:
_task: "core.TaskBase" = attr.ib()
Expand Down
15 changes: 9 additions & 6 deletions pydra/engine/submitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,16 @@ def __init__(self, plugin="cf", **kwargs):
raise NotImplementedError(f"No worker for {self.plugin}")
self.worker.loop = self.loop

def __call__(self, runnable, cache_locations=None, rerun=False):
def __call__(self, runnable, cache_locations=None, rerun=False, environment=None):
"""Submitter run function."""
if cache_locations is not None:
runnable.cache_locations = cache_locations
self.loop.run_until_complete(self.submit_from_call(runnable, rerun))
self.loop.run_until_complete(
self.submit_from_call(runnable, rerun, environment)
)
return runnable.result()

async def submit_from_call(self, runnable, rerun):
async def submit_from_call(self, runnable, rerun, environment):
"""
This coroutine should only be called once per Submitter call,
and serves as the bridge between sync/async lands.
Expand All @@ -56,7 +58,7 @@ async def submit_from_call(self, runnable, rerun):
Once Python 3.10 is the minimum, this should probably be refactored into using
structural pattern matching.
"""
if is_workflow(runnable):
if is_workflow(runnable): # TODO: env to wf
# connect and calculate the checksum of the graph before running
runnable._connect_and_propagate_to_tasks(override_task_caches=True)
# 0
Expand All @@ -74,10 +76,11 @@ async def submit_from_call(self, runnable, rerun):
# 2
if runnable.state is None:
# run_el should always return a coroutine
await self.worker.run_el(runnable, rerun=rerun)
print("in SUBM", environment)
await self.worker.run_el(runnable, rerun=rerun, environment=environment)
# 3
else:
await self.expand_runnable(runnable, wait=True, rerun=rerun)
await self.expand_runnable(runnable, wait=True, rerun=rerun) # TODO
return True

async def expand_runnable(self, runnable, wait=False, rerun=False):
Expand Down
Loading

0 comments on commit f3c1d35

Please sign in to comment.