-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FIX: Type error in blocking task list, detailed exception message #623
Conversation
Codecov ReportPatch coverage:
Additional details and impacted files@@ Coverage Diff @@
## master #623 +/- ##
==========================================
+ Coverage 81.17% 81.71% +0.54%
==========================================
Files 20 20
Lines 4393 4392 -1
Branches 1264 0 -1264
==========================================
+ Hits 3566 3589 +23
+ Misses 823 803 -20
+ Partials 4 0 -4
Flags with carried forward coverage won't be shown. Click here to find out more.
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here. ☔ View full report at Codecov. |
Is it possible to add a test that exercises this branch? |
I've been trying to think how to do that. How would I create a workflow with a broken execution graph, where some of the nodes are unrunnable? (I can't remember the exact case that motivated me to write that error explanation code) |
Is it possible to create a loop like nodeA -> nodeB -> nodeA? |
I tried this @mark.task
def add_together(x, y):
return x + y
def test_wf_with_blocked_tasks(plugin_dask_opt, tmpdir):
wf = Workflow(name="wf_with_blocked_tasks", input_spec=["x"])
wf.add(add_together(name="taska", x=wf.lzin.x))
wf.add(add_together(name="taskb", x=wf.taska.lzout.out, y=wf.taska.lzout.out))
# Create a cyclic loop
wf.taska.y = wf.taskb.lzout.out
wf.set_output([("out", wf.taskb.lzout.out)])
wf.inputs.x = [1]
wf.cache_dir = tmpdir
# with pytest.raises(Exception) as exc:
with Submitter(plugin=plugin_dask_opt) as sub:
sub(wf) but it produces the following traceback instead
|
@tclose - do you remember if the changes indeed helped at the time you had the issue? |
@djarecka The changes were what I used to track down and debug the issue I was having. It was something like one node failing and it not getting picked up and then downstream nodes sitting there blocked, but I can't remember what exactly now |
I expect it would help a lot to debug the intermittent failures I'm experiencing atm too |
Why is codecov complaining the coverage drops by 5% when I only changed two lines that aren't getting hit? |
Because codecov is garbage. |
sorry, but it's still not completely clear to me how this helps with debugging, since you're commenting |
sorry, that isn't very clear. It is just that that exception is triggered by |
I've tried a few ways to hit this branch but keep getting stuck. If we merge these changes, I should be able to have a better idea of what the issue that is causing this branch to be hit in my code and then could try to design a test around that. |
Ah, I remember now. I fixed this bug in my local branch and managed to hit the issue again. It is caused by the hash of the task being altered at some point in the workflow (somewhere deep in a nested object I imagine) and then the downstream node not being able to find the predecessor node at the hash it expects to. The error message helped a lot :) Still not sure how to generate a test for it easily though. |
Do we coerce inputs to a different type at runtime? That might change the hash. |
I don't remember seeing that. I'm passing around fairly deep objects, which was not really what I expect Pydra was designed for, so I ran into trouble before where one of nested objects was not being copied between nodes and a downstream node updated the object that was used to generate the upstream nodes hash. |
Using the insight (see #622) that this error is caused by hashes of node inputs being unstable, I have managed to come up with a test to hit this error. Seems as though this is something that needs a bit deeper thought though, as relying on hashes not changing is a bit of a weakness. |
|
||
@mark.task | ||
def alter_input(x): | ||
x.a = 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... Moving comment that grew too large to the main thread.
(Moved from an inline comment because I realized it grew out of hand:) Interesting reproduction. Here's a thought for how we could change this to make this use case smoother: User writes a function that modifies its inputs. After execution, the runner re-checks the input hash and raises an error on change, rather than waiting for a workflow deadlock. The user then re-marks with This doesn't resolve the hash nondeterminism. That's caused by Lines 672 to 674 in b5fe4c0
Given that it's based on the string representation, you could probably resolve that on your end by sorting the set in That said, there's a StackOverflow on this exact problem, and the answer is basically that there's no way to hash arbitrary objects. But we could use @functools.singledispatch
def hash_obj(obj: object) -> bytes:
# Works for generic objects with __dict__
dict_rep = ":".join(":".join(key, hash_obj(val)) for key, val in obj.__dict__.items())
return sha256(f"{obj.__class__}:{dict_rep}".encode()).hexdigest()
# Then define something for builtin classes with no __dict__
@hash_obj.register
def _(obj: int) -> bytes: ...
@hash_obj.register
def _(obj: str) -> bytes: ...
@hash_obj.register
def _(obj: dict) -> bytes: ... We could even handle things like @pydra.util.hash_obj.register
def _(obj: File) -> bytes:
... or provide a registration function pydra.utils.register_hash(File, myhashfun) |
yes, this is interesting case. I think I was assuming that input should not changed and I did not include tests that do it... I like the approach @effigies is suggesting: checking the input after execution. As for the issue with hashes, I've already started adding some extra rules for specific subjects, e.g. for the numpy arrays, after I had issues. But giving option to the user might ba a good solution (this probably can be in a different PR) |
This issue raises plenty of good points. I noticed these two:
In my experience, 1) should be avoided as much as possible to simplify hashing and pickling between processes, as well as ensuring better task composition. I believe tasks and workflows should be considered the same way as public functions in a library: use rich types internally to model the domain, expose primitive types externally to limit coupling. I like @effigies idea of registering a hashing handler in the worst case scenario. I am a bit confused as to why we would want to support 2). |
Not so much support as make it safe to work with those sorts of functions. IIRC, the initial goal of the In any event, if someone has an unsafe function, they're going to need to either rewrite or wrap it themselves, or we can provide a standard tool with straightforward semantics: def wrap_unsafe(func):
@wraps(func)
def wrapper(*args, **kwargs):
return func(*deepcopy(args), **deepcopy(kwargs))
return wrapper So we could write: safe_task = task(wrap_unsafe(unsafe_func)) And if somebody has control over the function but is just lazy, they could write: @task
@wrap_unsafe
def f(x):
x.a = 2
return x |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably move these discussions out of a PR that's ready to merge...
Types of changes
Summary
Fixes bug described in #622
Checklist