-
-
Notifications
You must be signed in to change notification settings - Fork 719
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
cancelled/resumed->long-running transitions #6916
Conversation
e1206de
to
a6557e5
Compare
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files + 3 15 suites +3 6h 32m 52s ⏱️ + 1h 55m 48s For more details on these failures, see this check. Results for commit b9ebe23. ± Comparison against base commit 817ead3. ♻️ This comment has been updated with latest results. |
|
||
|
||
@gen_cluster(client=True, nthreads=[("", 1)], timeout=2) | ||
async def test_secede_cancelled_or_resumed_scheduler(c, s, a): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that this second test does not test the resumed(fetch) use case. However, the first test above demonstrates that the cancelled and resumed(fetch) use cases are indistinguishable from the scheduler's side.
distributed/worker_state_machine.py
Outdated
def _transition_cancelled_long_running( | ||
self, ts: TaskState, compute_duration: float, *, stimulus_id: str | ||
) -> RecsInstrs: | ||
"""This transition also serves resumed(fetch) -> long-running""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not terribly happy about this, but the two alternatives (call it _transition_generic_long_running
, when it's definitely not generic, and copy-pasting the whole thing to a _transition_resumed_long_running
) seemed worse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_transition_cancelled_or_resumed_long_running
? I'd be happier with a verbose but accurate name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see how this probably the only way to solve the problem, but it feels a little odd to me.
What if we just didn't allow cancelled tasks to secede
? While secede
is running, we control the thread—we could just raise an error, and refuse to call tpe_secede()
or submit the SecedeEvent
. So cancelled->long_running
would remain an impossible transition. If the exception causes the task to fail, we ignore it anyway. If the user code decides to handle the exception in some way, that's fine, but they'll still never be able to trigger a cancelled->long_running
transition, since secede
would refuse to do it.
In most cases, we can't cancel the running thread. But secede
is the rare case where we do have the opportunity. Seems like it would be simpler to just not have to worry about this transition?
distributed/worker_state_machine.py
Outdated
def _transition_cancelled_long_running( | ||
self, ts: TaskState, compute_duration: float, *, stimulus_id: str | ||
) -> RecsInstrs: | ||
"""This transition also serves resumed(fetch) -> long-running""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_transition_cancelled_or_resumed_long_running
? I'd be happier with a verbose but accurate name.
distributed/worker_state_machine.py
Outdated
self.executing.discard(ts) | ||
self.long_running.add(ts) | ||
|
||
# Do not send LongRunningMsg |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To clarify: the idea is that we don't send the message right now, because the task is cancelled, so from the scheduler's perspective, it's not running anymore on this worker, therefore the scheduler shouldn't hear updates from this worker about that task (xref #6956).
Instead, we postpone sending the LongRunningMsg until the task is un-cancelled. Only then will we send the message, since we know it's relevant.
This seems worth a longer comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overhauled comment
assert ws.processing | ||
|
||
await ev4.set() | ||
assert await x == 123 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait, so the expected, correct behavior is that you release a future, submit a new future with the same key, and get back the old (cancelled) future's result instead of the new one? That seems pretty wrong to me.
I'm aware that this could happen even for normal tasks, not just long-running, and it's just a consequence of not cancelling the thread, and keeping the TaskState around until the thread finishes. But from an API and user perspective, that seems wrong. I didn't think keys needed to be unique over the lifetime of the cluster, just that they needed to be unique among all the currently-active keys (and once a client saw a key as released, then it could safely consider it inactive).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, but this is how it works. I spent several weeks trying and failing to make it become sensible: #6844
This is a pretty rare use case: a user submits a task with a manually-defined key; then before the task has had the time to finish, it submits a different task with the same key.
Honestly, I feel that the blame should sit on the user entirely here, and figuring out what went wrong should be pretty straightforward. It also should not really happen except when prototyping from a notebook, unless there are key collisions which will cause all sort of weird behaviour anyway.
This makes me very nervous, because |
All review comments have been addressed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed offline. Though using sync
to run a function on the event loop in secede
(and raising an error if the task was already cancelled) would be possible and avoid threading race conditions, it would be difficult to test thoroughly. The benefit of secede
raising an error if cancelled is also probably small, as most tasks would call secede
right away, so there'd be very little time for the task to be cancelled in between. The state-machine-based approach here is much easier to test, so we'll go with this.
2ce602e
to
b9ebe23
Compare
Closes #6709