-
-
Notifications
You must be signed in to change notification settings - Fork 718
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Select queued tasks in stimuli, not transitions #7402
Conversation
TODO: clean up implementation. This feels ugly, mostly because the stimuli are ugly and inconsistent.
Bit tidier. Broken for `client_releases_keys` (which is also still untidy).
`peen(2)` would give you an empty iterator, so the behavior is inconsistent if you specify 1
now fully split from `bulk_schedule`. this is feeling cleaner.
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 18 files ± 0 18 suites ±0 7h 49m 35s ⏱️ - 9m 5s For more details on these failures, see this check. Results for commit 1450522. ± Comparison against base commit 7fb9c48. ♻️ This comment has been updated with latest results. |
@gen_cluster( | ||
client=True, | ||
nthreads=[("", 1)] * 3, | ||
config={"distributed.scheduler.worker-saturation": 1.0}, | ||
) | ||
async def test_queued_release_multiple_workers(c, s, *workers): | ||
async with Client(s.address, asynchronous=True) as c2: | ||
event = Event(client=c2) | ||
|
||
rootish_threshold = s.total_nthreads * 2 + 1 | ||
|
||
first_batch = c.map( | ||
lambda i: event.wait(), | ||
range(rootish_threshold), | ||
key=[f"first-{i}" for i in range(rootish_threshold)], | ||
) | ||
await async_wait_for(lambda: s.queued, 5) | ||
|
||
second_batch = c2.map( | ||
lambda i: event.wait(), | ||
range(rootish_threshold), | ||
key=[f"second-{i}" for i in range(rootish_threshold)], | ||
fifo_timeout=0, | ||
) | ||
await async_wait_for(lambda: second_batch[0].key in s.tasks, 5) | ||
|
||
# All of the second batch should be queued after the first batch | ||
assert [ts.key for ts in s.queued.sorted()] == [ | ||
f.key | ||
for f in itertools.chain(first_batch[s.total_nthreads :], second_batch) | ||
] | ||
|
||
# Cancel the first batch. | ||
# Use `Client.close` instead of `del first_batch` because deleting futures sends cancellation | ||
# messages one at a time. We're testing here that when multiple workers have open slots, we don't | ||
# recommend the same queued tasks for every worker, so we need a bulk cancellation operation. | ||
await c.close() | ||
del c, first_batch | ||
|
||
await async_wait_for(lambda: len(s.tasks) == len(second_batch), 5) | ||
|
||
# Second batch should move up the queue and start processing | ||
assert len(s.queued) == len(second_batch) - s.total_nthreads, list( | ||
s.queued.sorted() | ||
) | ||
|
||
await event.set() | ||
await c2.gather(second_batch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test feels a bit heavy weight considering that we have a couple of very simple reproducers, see #7396 (comment)
If you feel strongly about this test, fine, but please add the other two very simple reproducers as well. Regardless of all the intricate timings and queuing, etc. The reproducers there should be true regardless of what internals look like
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added the reproducer from #7396 (comment) as well (with an Event to avoid timing issues):
distributed/distributed/tests/test_scheduler.py
Lines 375 to 383 in 7952579
@gen_cluster(client=True, nthreads=[("", 1)]) | |
async def test_forget_tasks_while_processing(c, s, a, b): | |
events = [Event() for _ in range(10)] | |
futures = c.map(Event.wait, events) | |
await events[0].set() | |
await futures[0] | |
await c.close() | |
assert not s.tasks |
This test is for #7401, which is a different issue (and I think maybe the same as #7398). Client.close
is the only codepath that can cause client_releases_keys
to be called with multiple keys, which is the case that breaks.
EDIT: Client.restart
is the other code path that calls client_releases_keys
with multiple keys...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a simple test with restart
as well (fails on main, reproducer for #7398):
distributed/distributed/tests/test_scheduler.py
Lines 386 to 394 in 31975e5
@gen_cluster(client=True, nthreads=[("", 1)]) | |
async def test_restart_while_processing(c, s, a, b): | |
events = [Event() for _ in range(10)] | |
futures = c.map(Event.wait, events) | |
await events[0].set() | |
await futures[0] | |
await c.restart() | |
assert not s.tasks |
I'm OK with the approach of using a stimulus |
fails on main
slots_available = sum( | ||
_task_slots_available(ws, self.WORKER_SATURATION) | ||
for ws in self.idle_task_count | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is hardly worth mentioning, but potential for a negligible scheduler performance change here (which may not actually be possible):
Now, every time a task completes, we run _task_slots_available
on all idle workers. Before, we only ran it on the one worker that just completed a task. In most of those cases, len(self.idle_task_count)
should be 1, so no difference.
I mention this because _task_slots_available
does already show up in py-spy profiles of the scheduler (usually around 0.5-1%), because it's already called frequently. This would maybe allow it to be called even more frequently.
But I don't think it can actually run more than it needs to:
-
When there are more threads than root tasks, so many workers are idle,
queued
would be empty, so this wouldn't run. -
When there are more root tasks than threads, and we're queuing, at most one worker should ever be idle: as soon as it becomes idle, it gets another queued task and is no longer idle.
The one exception is
client.close()
releasing many processing tasks at once, while a different client has tasks on the queue. In that case, there's a lot of rescheduling to do, and we do need to look at all the workers that just became idle, so no unnecessary work there either.
tl;dr I don't see a theoretical way for this to be a problem, but I haven't benchmarked or profiled to be sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for highlighting.
Apart from the theoretical analysis, this is also something we can easily optimize if it becomes a problem, e.g. saturation * nthreads
is not something we need to compute every time and even the processing - long running
could be replaced with a counter we inc/dec during transitions.
TLDR Not concerned and "fixing" it right now feels premature
slots_available = sum( | ||
_task_slots_available(ws, self.WORKER_SATURATION) | ||
for ws in self.idle_task_count | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for highlighting.
Apart from the theoretical analysis, this is also something we can easily optimize if it becomes a problem, e.g. saturation * nthreads
is not something we need to compute every time and even the processing - long running
could be replaced with a counter we inc/dec during transitions.
TLDR Not concerned and "fixing" it right now feels premature
Before, we selected new queued tasks as part of transitions: when one task exited
processing
, we recommended another task fromqueued->processing
. Now, we select queued tasks in response to stimuli: first we do one cycle of transitions as usual, then check if there are open threads, and only then do a second cycle of transitions moving tasks fromqueued->processing
.By moving the
queued->processing
transition after any possiblequeued->released
transition, this ensures we won't accidentally overwrite thequeued->released
recommendation (#7396).See discussion in #7396 (comment).
Closes #7396, closes #7401, closes #7398.
cc @fjetter @crusaderky
pre-commit run --all-files