-
-
Notifications
You must be signed in to change notification settings - Fork 720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Scheduler.reschedule() works only by accident #6339
Scheduler.reschedule() works only by accident #6339
Conversation
78b81d8
to
0469601
Compare
Looks like a valid failing test |
Hilarious. The test passed when the worker was NOT calling Scheduler.reschedule due to mismatched signature. Now that it does, it breaks the state machine. Which has suddenly bumped up the importance and scope of this PR. This now closes #6340. |
d3dfa55
to
3f20835
Compare
2e4522e
to
611c85d
Compare
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files ±0 15 suites ±0 10h 23m 2s ⏱️ + 1m 37s For more details on these failures, see this check. Results for commit e7bb31b. ± Comparison against base commit a8eb3b2. ♻️ This comment has been updated with latest results. |
611c85d
to
b3ffa87
Compare
|
||
for future in x: | ||
s.reschedule(key=future.key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test remained green if you removed these lines
await f | ||
s.set_restrictions(worker={f.key: a.address}) | ||
assert s.tasks[f.key].worker_restrictions == {a.address} | ||
s.reschedule(f) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was silently doing nothing (future f
is not in s.tasks). The task was executed only once, on b.
distributed/tests/test_worker.py
Outdated
async def test_reschedule(c, s, a, b): | ||
await s.extensions["stealing"].stop() | ||
@pytest.mark.slow | ||
@pytest.mark.parametrize("long_running", [False, True]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added test for _transition_long_running_rescheduled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code looks good to me, feel free to ignore the comments.
@@ -6568,7 +6568,9 @@ async def get_story(self, keys_or_stimuli: Iterable[str]) -> list[tuple]: | |||
|
|||
transition_story = story | |||
|
|||
def reschedule(self, key=None, worker=None): | |||
def reschedule( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out-of-scope comment: Is there a better name that highlights that reschedule
does not actually reschedule, i.e., it does not schedule the task somewhere else, it merely cancels the previous scheduling decision? For example, deschedule
might be better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well no. A transition to released will automatically kick the task back to waiting.
The functional description of the method is accurate. The "released" bit is an implementation detail.
I'm adding a comment to explain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense from that perspective, I was thinking that the function doesn't include the rescheduling bit, but in the end you're right, the releasing/descheduling automatically achieves the aim of rescheduling.
e7bb31b
to
f0bf899
Compare
f0bf899
to
950e4bc
Compare
950e4bc
to
355cc0f
Compare
assert any(isinstance(ev, RescheduleEvent) for ev in a.state.stimulus_log) | ||
assert all(f.key in b.data for f in futures) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the two tests below are new
@@ -2201,6 +2196,7 @@ def _transition_released_forgotten( | |||
("cancelled", "missing"): _transition_cancelled_released, | |||
("cancelled", "waiting"): _transition_cancelled_waiting, | |||
("cancelled", "forgotten"): _transition_cancelled_forgotten, | |||
("cancelled", "rescheduled"): _transition_cancelled_released, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without this test_cancelled_reschedule would cause a RecursionError
355cc0f
to
9410072
Compare
@hendrikmakait there have been substantial changes from your review; could you give it a second pass please? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch with the rescheduling of cancelled tasks!
Co-authored-by: Hendrik Makait <[email protected]>
Supersedes #6307
Closes #6340