Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queuing does not prevent root task overproduction unless you have enough tasks #7273

Open
gjoseph92 opened this issue Nov 8, 2022 · 1 comment

Comments

@gjoseph92
Copy link
Collaborator

gjoseph92 commented Nov 8, 2022

Queuing #6614 is meant to prevent root task overproduction #5555. And it's shown to be very effective at doing so: #7128.

However, due to the heuristic of what counts as a "root-ish" task, it'll only stop root task overproduction if you have > total_nthreads * 2 root tasks.

Overproduction can occur any time there are > total_nthreads root tasks. So in this middle case, queuing won't kick in and the worker-saturation value won't be respected.

This would be confusing behavior to users. If you make your problem size smaller, or make your cluster bigger—two things that you'd expect to reduce per-worker memory usage—you may cross an opaque magic threshold at which your workload suddenly uses up to 2x more memory.

EDIT:

To be clear, I propose a two-character change to fix this. Just drop the * 2 part:

diff --git a/distributed/scheduler.py b/distributed/scheduler.py
index b99e3f19..df20e807 100644
--- a/distributed/scheduler.py
+++ b/distributed/scheduler.py
@@ -3033,7 +3033,7 @@ class SchedulerState:
         tg = ts.group
         # TODO short-circuit to True if `not ts.dependencies`?
         return (
-            len(tg) > self.total_nthreads * 2
+            len(tg) > self.total_nthreads
             and len(tg.dependencies) < 5
             and sum(map(len, tg.dependencies)) < 5
         )

The * 2 is a number @mrocklin and I just made up back in #4967. There wasn't any benchmarking or empirical reason for it. Just saying > nthreads is more logical and easier to justify.

@gjoseph92
Copy link
Collaborator Author

An easy reproducer: test_graph_execution_width fails if you run it on graph sizes in this range.

diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py
index 48ad99db..0294b524 100644
--- a/distributed/tests/test_scheduler.py
+++ b/distributed/tests/test_scheduler.py
@@ -326,13 +326,14 @@ async def test_decide_worker_rootish_while_last_worker_is_retiring(c, s, a):
         await wait(xs + ys)
 
 
+@pytest.mark.parametrize("n_roots", [6, 9, 16, 32])
 @pytest.mark.slow
 @gen_cluster(
     nthreads=[("", 2)] * 4,
     client=True,
     config={"distributed.scheduler.worker-saturation": 1.0},
 )
-async def test_graph_execution_width(c, s, *workers):
+async def test_graph_execution_width(c, s, *workers, n_roots):
     """
     Test that we don't execute the graph more breadth-first than necessary.
 
@@ -357,7 +358,7 @@ async def test_graph_execution_width(c, s, *workers):
                 self.log.append(self.count)
                 type(self).count -= 1
 
-    roots = [delayed(Refcount)() for _ in range(32)]
+    roots = [delayed(Refcount)() for _ in range(n_roots)]
     passthrough1 = [delayed(slowidentity)(r, delay=0) for r in roots]
     passthrough2 = [delayed(slowidentity)(r, delay=0) for r in passthrough1]
     done = [delayed(lambda r: None)(r) for r in passthrough2]
FAILED distributed/tests/test_scheduler.py::test_graph_execution_width[9] - AssertionError: assert 9 <= 8
FAILED distributed/tests/test_scheduler.py::test_graph_execution_width[16] - AssertionError: assert 16 <= 8
======================================================================== 2 failed, 2 passed in 3.07s ========================================================================

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant