diff --git a/pydra/engine/core.py b/pydra/engine/core.py index df969448cd..62f08013b0 100644 --- a/pydra/engine/core.py +++ b/pydra/engine/core.py @@ -438,7 +438,7 @@ def _run(self, rerun=False, **kwargs): with SoftFileLock(lockfile): if not (rerun or self.task_rerun): result = self.result() - if result is not None: + if result is not None and not result.errored: return result # adding info file with the checksum in case the task was cancelled # and the lockfile has to be removed @@ -1017,7 +1017,7 @@ async def _run(self, submitter=None, rerun=False, **kwargs): # retrieve cached results if not (rerun or self.task_rerun): result = self.result() - if result is not None: + if result is not None and not result.errored: return result # adding info file with the checksum in case the task was cancelled # and the lockfile has to be removed diff --git a/pydra/engine/tests/test_task.py b/pydra/engine/tests/test_task.py index 1a29847664..463974d379 100644 --- a/pydra/engine/tests/test_task.py +++ b/pydra/engine/tests/test_task.py @@ -1361,3 +1361,41 @@ def fun_error(x): # the error traceback should be a list and should point to a specific line in the function assert isinstance(error_tb, list) assert "in fun_error" in error_tb[-2] + + +def test_rerun_errored(tmpdir, capfd): + """Test rerunning a task containing errors. + Only the errored tasks should be rerun""" + + @mark.task + def pass_odds(x): + if x % 2 == 0: + print(f"x%2 = {x % 2} (error)\n") + raise Exception("even error") + else: + print(f"x%2 = {x % 2}\n") + return x + + task = pass_odds(name="pass_odds", x=[1, 2, 3, 4, 5], cache_dir=tmpdir).split("x") + + with pytest.raises(Exception, match="even error"): + task() + with pytest.raises(Exception, match="even error"): + task() + + out, err = capfd.readouterr() + stdout_lines = out.splitlines() + + tasks_run = 0 + errors_found = 0 + + for line in stdout_lines: + if "x%2" in line: + tasks_run += 1 + if "(error)" in line: + errors_found += 1 + + # There should have been 5 messages of the form "x%2 = XXX" after calling task() the first time + # and another 2 messagers after calling the second time + assert tasks_run == 7 + assert errors_found == 4 diff --git a/pydra/engine/tests/test_workflow.py b/pydra/engine/tests/test_workflow.py index ac95a0d3f9..c6745d011f 100644 --- a/pydra/engine/tests/test_workflow.py +++ b/pydra/engine/tests/test_workflow.py @@ -3,6 +3,7 @@ import time import attr from pathlib import Path +import logging from .utils import ( add2, @@ -4687,3 +4688,43 @@ def one_arg_inner(start_number): res = test_outer.result() assert res[0].output.res2 == 23 and res[1].output.res2 == 23 + + +def test_rerun_errored(tmpdir, capfd): + """Test rerunning a workflow containing errors. + Only the errored tasks and workflow should be rerun""" + + @mark.task + def pass_odds(x): + if x % 2 == 0: + print(f"x%2 = {x % 2} (error)\n") + raise Exception("even error") + else: + print(f"x%2 = {x % 2}\n") + return x + + wf = Workflow(name="wf", input_spec=["x"], cache_dir=tmpdir) + wf.add(pass_odds(name="pass_odds", x=[1, 2, 3, 4, 5]).split("x")) + wf.set_output([("out", wf.pass_odds.lzout.out)]) + + with pytest.raises(Exception): + wf() + with pytest.raises(Exception): + wf() + + out, err = capfd.readouterr() + stdout_lines = out.splitlines() + + tasks_run = 0 + errors_found = 0 + + for line in stdout_lines: + if "x%2" in line: + tasks_run += 1 + if "(error)" in line: + errors_found += 1 + + # There should have been 5 messages of the form "x%2 = XXX" after calling task() the first time + # and another 2 messagers after calling the second time + assert tasks_run == 7 + assert errors_found == 4