Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean executor fails #777

Merged
merged 16 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 101 additions & 40 deletions notebooks/workflow_example.ipynb

Large diffs are not rendered by default.

33 changes: 32 additions & 1 deletion pyiron_contrib/workflow/composite.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ class Composite(Node, ABC):
By default, `run()` will be called on all owned nodes have output connections but no
input connections (i.e. the upstream-most nodes), but this can be overridden to
specify particular nodes to use instead.
The `run()` method (and `update()`, and calling the workflow, when these result in
a run), return a new dot-accessible dictionary of keys and values created from the
composite output IO panel.

Does not specify `input` and `output` as demanded by the parent class; this
requirement is still passed on to children.
Expand Down Expand Up @@ -92,15 +95,33 @@ def __init__(
label: str,
*args,
parent: Optional[Composite] = None,
run_on_updates: bool = True,
strict_naming: bool = True,
**kwargs,
):
super().__init__(*args, label=label, parent=parent, **kwargs)
super().__init__(
*args,
label=label,
parent=parent,
run_on_updates=run_on_updates,
**kwargs
)
self.strict_naming: bool = strict_naming
self.nodes: DotDict[str:Node] = DotDict()
self.add: NodeAdder = NodeAdder(self)
self.starting_nodes: None | list[Node] = None

@property
def executor(self) -> None:
return None

@executor.setter
def executor(self, new_executor):
if new_executor is not None:
raise NotImplementedError(
"Running composite nodes with an executor is not yet supported"
)

def to_dict(self):
return {
"label": self.label,
Expand All @@ -115,12 +136,22 @@ def upstream_nodes(self) -> list[Node]:
if node.outputs.connected and not node.inputs.connected
]

@property
def on_run(self):
return self.run_graph

@staticmethod
def run_graph(self):
starting_nodes = (
self.upstream_nodes if self.starting_nodes is None else self.starting_nodes
)
for node in starting_nodes:
node.run()
return DotDict(self.outputs.to_value_dict())

@property
def run_args(self) -> dict:
return {"self": self}

def add_node(self, node: Node, label: Optional[str] = None) -> None:
"""
Expand Down
23 changes: 17 additions & 6 deletions pyiron_contrib/workflow/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ class Function(Node):
on call.
This invokes an `update()` call, which can in turn invoke `run()` if
`run_on_updates` is set to `True`.
`run()` returns the output of the executed function, or a futures object if the
node is set to use an executor.
Calling the node or executing an `update()` returns the same thing as running, if
the node is run, or `None` if it is not set to run on updates or not ready to run.

Args:
node_function (callable): The function determining the behaviour of the node.
Expand Down Expand Up @@ -163,10 +167,12 @@ class Function(Node):
{'p1': 2, 'm1': 1}

Input data can be provided to both initialization and on call as ordered args
or keyword kwargs, e.g.:
or keyword kwargs.
When running, updating, or calling the node, the output of the wrapped function
(if it winds up getting run in the conditional cases of updating and calling) is
returned:
>>> plus_minus_1(2, y=3)
>>> plus_minus_1.outputs.to_value_dict()
{'p1': 3, 'm1': 2}
(3, 2)

Finally, we might stop these updates from happening automatically, even when
all the input data is present and available:
Expand All @@ -180,8 +186,7 @@ class Function(Node):

With these flags set, the node requires us to manually call a run:
>>> plus_minus_1.run()
>>> plus_minus_1.outputs.to_value_dict()
{'p1': 1, 'm1': -1}
(-1, 1)

So function nodes have the most basic level of protection that they won't run
if they haven't seen any input data.
Expand Down Expand Up @@ -360,6 +365,7 @@ def __init__(
super().__init__(
label=label if label is not None else node_function.__name__,
parent=parent,
run_on_updates=run_on_updates,
# **kwargs,
)

Expand All @@ -379,7 +385,6 @@ def __init__(
)
self._verify_that_channels_requiring_update_all_exist()

self.run_on_updates = run_on_updates
self._batch_update_input(*args, **kwargs)

if update_on_instantiation:
Expand Down Expand Up @@ -535,6 +540,12 @@ def on_run(self):
def run_args(self) -> dict:
kwargs = self.inputs.to_value_dict()
if "self" in self._input_args:
if self.executor is not None:
raise NotImplementedError(
f"The node {self.label} cannot be run on an executor because it "
f"uses the `self` argument and this functionality is not yet "
f"implemented"
)
kwargs["self"] = self
return kwargs

Expand Down
30 changes: 21 additions & 9 deletions pyiron_contrib/workflow/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import warnings
from abc import ABC, abstractmethod
from concurrent.futures import Future
from typing import Optional, TYPE_CHECKING
from typing import Any, Optional, TYPE_CHECKING

from pyiron_contrib.executors import CloudpickleProcessPoolExecutor
from pyiron_contrib.workflow.files import DirectoryObject
Expand Down Expand Up @@ -45,6 +45,16 @@ class Node(HasToDict, ABC):
By default, nodes' signals input comes with `run` and `ran` IO ports which force
the `run()` method and which emit after `finish_run()` is completed, respectfully.

The `run()` method returns a representation of the node output (possible a futures
object, if the node is running on an executor), and consequently `update()` also
returns this output if the node is `ready` and has `run_on_updates = True`.

Calling an already instantiated node allows its input channels to be updated using
keyword arguments corresponding to the channel labels, performing a batch-update of
all supplied input and then calling `update()`.
As such, calling the node _also_ returns a representation of the output (or `None`
if the node is not set to run on updates, or is otherwise unready to run).

Nodes have a status, which is currently represented by the `running` and `failed`
boolean flags.
Their value is controlled automatically in the defined `run` and `finish_run`
Expand Down Expand Up @@ -154,7 +164,7 @@ def outputs(self) -> Outputs:

@property
@abstractmethod
def on_run(self) -> callable[..., tuple]:
def on_run(self) -> callable[..., Any | tuple]:
"""
What the node actually does!
"""
Expand All @@ -167,7 +177,7 @@ def run_args(self) -> dict:
"""
return {}

def process_run_result(self, run_output: tuple) -> None:
def process_run_result(self, run_output: Any | tuple) -> None:
"""
What to _do_ with the results of `on_run` once you have them.

Expand All @@ -176,7 +186,7 @@ def process_run_result(self, run_output: tuple) -> None:
"""
pass

def run(self) -> None:
def run(self) -> Any | tuple | Future:
"""
Executes the functionality of the node defined in `on_run`.
Handles the status of the node, and communicating with any remote
Expand All @@ -195,18 +205,19 @@ def run(self) -> None:
self.running = False
self.failed = True
raise e
self.finish_run(run_output)
return self.finish_run(run_output)
elif isinstance(self.executor, CloudpickleProcessPoolExecutor):
self.future = self.executor.submit(self.on_run, **self.run_args)
self.future.add_done_callback(self.finish_run)
return self.future
else:
raise NotImplementedError(
"We currently only support executing the node functionality right on "
"the main python process or with a "
"pyiron_contrib.workflow.util.CloudpickleProcessPoolExecutor."
)

def finish_run(self, run_output: tuple | Future):
def finish_run(self, run_output: tuple | Future) -> Any | tuple:
"""
Switch the node status, process the run result, then fire the ran signal.

Expand All @@ -224,6 +235,7 @@ def finish_run(self, run_output: tuple | Future):
try:
self.process_run_result(run_output)
self.signals.output.ran()
return run_output
except Exception as e:
self.failed = True
raise e
Expand All @@ -234,9 +246,9 @@ def _build_signal_channels(self) -> Signals:
signals.output.ran = OutputSignal("ran", self)
return signals

def update(self) -> None:
def update(self) -> Any | tuple | Future | None:
if self.run_on_updates and self.ready:
self.run()
return self.run()

@property
def working_directory(self):
Expand Down Expand Up @@ -300,4 +312,4 @@ def _batch_update_input(self, **kwargs):

def __call__(self, **kwargs) -> None:
self._batch_update_input(**kwargs)
self.update()
return self.update()
24 changes: 20 additions & 4 deletions pyiron_contrib/workflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,13 @@ class Workflow(Composite):
These input keys can be used when calling the workflow to update the input. In
our example, the nodes update automatically when their input gets updated, so
all we need to do to see updated workflow output is update the input:
>>> wf(first_x=10)
>>> wf.outputs.second_y.value
>>> out = wf(first_x=10)
>>> out
{'second_y': 12}

Note: this _looks_ like a dictionary, but has some extra convenience that we
can dot-access data:
>>> out.second_y
12

Workflows also give access to packages of pre-built nodes under different
Expand Down Expand Up @@ -125,8 +130,19 @@ class Workflow(Composite):
integrity of workflows when they're used somewhere else?
"""

def __init__(self, label: str, *nodes: Node, strict_naming=True):
super().__init__(label=label, parent=None, strict_naming=strict_naming)
def __init__(
self,
label: str,
*nodes: Node,
run_on_updates: bool = True,
strict_naming=True
):
super().__init__(
label=label,
parent=None,
run_on_updates=run_on_updates,
strict_naming=strict_naming,
)

for node in nodes:
self.add_node(node)
Expand Down
80 changes: 79 additions & 1 deletion tests/unit/workflow/test_function.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import unittest
from concurrent.futures import Future
from sys import version_info
from typing import Optional, Union
import unittest
import warnings

from pyiron_contrib.executors import CloudpickleProcessPoolExecutor
from pyiron_contrib.workflow.channels import NotData
from pyiron_contrib.workflow.files import DirectoryObject
from pyiron_contrib.workflow.function import (
Expand Down Expand Up @@ -279,6 +281,13 @@ def with_self(self, x: float) -> float:
msg="Function functions should be able to modify attributes on the node object."
)

node.executor = CloudpickleProcessPoolExecutor
with self.assertRaises(NotImplementedError):
# Submitting node_functions that use self is still raising
# TypeError: cannot pickle '_thread.lock' object
# For now we just fail cleanly
node.run()

def with_messed_self(x: float, self) -> float:
return x + 0.1

Expand Down Expand Up @@ -334,6 +343,75 @@ def test_call(self):
# there should just be a warning that the data didn't get updated
node(some_randome_kwaaaaarg="foo")

def test_return_value(self):
node = Function(plus_one)

with self.subTest("Run on main process"):
return_on_call = node(1)
self.assertEqual(
return_on_call,
plus_one(1),
msg="Run output should be returned on call"
)

return_on_update = node.update()
self.assertEqual(
return_on_update,
plus_one(1),
msg="Run output should be returned on update"
)

node.run_on_updates = False
return_on_update_without_run = node.update()
self.assertIsNone(
return_on_update_without_run,
msg="When not running on updates, the update should not return anything"
)
return_on_call_without_run = node(2)
self.assertIsNone(
return_on_call_without_run,
msg="When not running on updates, the call should not return anything"
)
return_on_explicit_run = node.run()
self.assertEqual(
return_on_explicit_run,
plus_one(2),
msg="On explicit run, the most recent input data should be used and the "
"result should be returned"
)

with self.subTest("Run on executor"):
node.executor = CloudpickleProcessPoolExecutor()
node.run_on_updates = False

return_on_update_without_run = node.update()
self.assertIsNone(
return_on_update_without_run,
msg="When not running on updates, the update should not return "
"anything whether there is an executor or not"
)
return_on_explicit_run = node.run()
self.assertIsInstance(
return_on_explicit_run,
Future,
msg="Running with an executor should return the future"
)
with self.assertRaises(RuntimeError):
# The executor run should take a second
# So we can double check that attempting to run while already running
# raises an error
node.run()
node.future.result() # Wait for the remote execution to finish

node.run_on_updates = True
return_on_update_with_run = node.update()
self.assertIsInstance(
return_on_update_with_run,
Future,
msg="Updating should return the same as run when we get a run from the "
"update, obviously..."
)
node.future.result() # Wait for the remote execution to finish

@unittest.skipUnless(version_info[0] == 3 and version_info[1] >= 10, "Only supported for 3.10+")
class TestSlow(unittest.TestCase):
Expand Down
Loading