Skip to content

Commit

Permalink
added more descriptive debug message to pydra submitter
Browse files Browse the repository at this point in the history
  • Loading branch information
tclose committed Sep 20, 2022
1 parent 76c933e commit 7fd0848
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 2 deletions.
2 changes: 1 addition & 1 deletion pydra/engine/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def load_result(checksum, cache_locations):
if not cache_locations:
return None
# TODO: if there are issues with loading, we might need to
# TODO: sleep and repeat loads (after checkin that there are no lock files!)
# TODO: sleep and repeat loads (after checking that there are no lock files!)
for location in cache_locations:
if (location / checksum).exists():
result_file = location / checksum / "_result.pklz"
Expand Down
38 changes: 37 additions & 1 deletion pydra/engine/submitter.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Handle execution backends."""
import asyncio
import pickle
from uuid import uuid4
from .workers import WORKERS
from .core import is_workflow
Expand Down Expand Up @@ -167,9 +168,14 @@ async def expand_workflow(self, wf, rerun=False):
# don't block the event loop!
await asyncio.sleep(1)
if ii > 60:
blocked = _list_blocked_tasks(graph_copy)
get_runnable_tasks(graph_copy)
raise Exception(
"graph is not empty, but not able to get more tasks "
"- something is wrong (e.g. with the filesystem)"
"- something may have gone wrong when retrieving the results "
"of predecessor tasks caused by a file-system error or a bug "
"in the internal workflow logic.\n\nBlocked tasks\n-------------\n"
+ "\n".join(blocked)
)
for task in tasks:
# grab inputs if needed
Expand Down Expand Up @@ -281,3 +287,33 @@ async def prepare_runnable_with_state(runnable):
runnable.state.prepare_inputs()
logger.debug(f"Expanding {runnable} into {len(runnable.state.states_val)} states")
return runnable.pickle_task()


def _list_blocked_tasks(graph):
"""Generates a list of tasks that can't be run and predecessors that are blocking
them to help debugging of broken workflows"""
blocked = []
for tsk in graph.sorted_nodes:
blocking = []
for pred in graph.predecessors[tsk.name]:
if not pred.done:
matching_name = []
for cache_loc in tsk.cache_locations:
for tsk_work_dir in cache_loc.iterdir():
if (tsk_work_dir / "_task.pklz").exists():
with open(tsk_work_dir / "_task.pklz", "rb") as f:
saved_tsk = pickle.load(f)
if saved_tsk.name == pred.name:
matching_name.append(
f"{saved_tsk.name} ({tsk_work_dir.name})"
)
blocking.append(pred, ", ".join(matching_name))
if blocking:
blocked.append(
f"\n{tsk.name} ({tsk.checksum}) is blocked by "
+ "; ".join(
f"{pred.name} ({pred.checksum}), which matches names of [{matching}]"
for pred, matching in blocking
)
)
return blocked

0 comments on commit 7fd0848

Please sign in to comment.