Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make rootish heuristic sensitive to size of task group #8005

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

mrocklin
Copy link
Member

Currently we require rootish tasks to belong to a task group that has no more than five dependencies.

Sometimes (as in a recent Xarray workload showed to me by @dcherian) this doesn't work because there are a few dependencies, but still far fewer than there are tasks in the task group. In his case there were 6000 tasks in the task group and 6 dependencies of that group, and so it
was erroneously classified as non-rootish.

This PR proposes a change to the logic, where we accept tasks whose groups have <1% of the number of dependencies as they have tasks in the group. So in Deepak's case because there are fewer than 6000 * 1% == 60 dependencies, these tasks get classified as rootish.

Future note, we're doing this dynamically now, maybe we should be looking at data stored instead of number of tasks.

Currently we require rootish tasks to belong to a task group that has no
more than five dependencies.

Sometimes (as in a recent Xarray workload showed to me by @dcherian)
this doesn't work because there are a few dependencies, but still far
fewer than there are tasks in the task group.  In his case there were
6000 tasks in the task group and 6 dependencies of that group, and so it
 was erroneously classified as non-rootish.

This PR proposes a change to the logic, where we accept tasks whose
groups have <1% of the number of dependencies as they have tasks in the
group.  So in Deepak's case because there are fewer than 6000 * 1% == 60
dependencies, these tasks get classified as rootish.

Future note, we're doing this dynamically now, maybe we should be
looking at data stored instead of number of tasks.
@mrocklin mrocklin requested a review from fjetter as a code owner July 15, 2023 15:54
@github-actions
Copy link
Contributor

github-actions bot commented Jul 15, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       19 files  +       15         19 suites  +15   11h 43m 56s ⏱️ + 10h 37m 9s
  3 744 tests +     435    3 629 ✔️ +     553     106 💤  -    127    9 +  9 
34 869 runs  +28 352  33 180 ✔️ +27 276  1 667 💤 +1 054  22 +22 

For more details on these failures, see this check.

Results for commit e3d393b. ± Comparison against base commit 9d9702e.

♻️ This comment has been updated with latest results.

@mrocklin
Copy link
Member Author

maybe we should be looking at data stored instead of number of tasks

To be clear above when I say "data stored" I mean number of bytes. If a large task group depends on relatively little data then we should probably think of it as rootish because running these tasks won't allow us to release lots of data

@dcherian
Copy link

Can confirm this fixes the issue in #7274 (comment)

All open_dataset tasks are now detected as root-ish
image

Thanks matt!

@mrocklin
Copy link
Member Author

🎉

@mrocklin
Copy link
Member Author

@fjetter I think I'm probably handing this off to you. I think that the most useful thing is that we've identified an issue with @dcherian 's workload and the underlying problem (the magic number of five tasks in the rootish heuristic). The specific solution here of "change 5 tasks to 1% of tasks" is kinda arbitrary but maybe OK.

I'm hopeful that at least identifying the problem is useful to you.

@fjetter
Copy link
Member

fjetter commented Jul 17, 2023

I'll have a look asap

@fjetter
Copy link
Member

fjetter commented Jul 18, 2023

One of our rechunking stress tests is failing. This indicates two problems

  • There is a consistency problem in our scheduler connected to queued tasks which is actually causing the failure
  • I noticed that rechunk-split tasks are now classified as root-ish in this stress test. This should typically not be happening and we have to verify this new heuristic with other workloads as well.

Digging into both will take a bit of time

@mrocklin
Copy link
Member Author

The first problem I think I leave to you 🙂

Context on the second problem (maybe just for me, but maybe also for others). There are 3000 tasks in the rechunk-split group that depend on a group with only 6 tasks (which is less than 30). The amount of data in each group is the same.

Some options:

  1. Only allow a group to be rootish if its dependencies are also rootish. This would certainly solve this problem (and be true to the name rootish) but would make situations like reading in from disk on shuffles or other cases less good.
  2. Accelerate my comment before about looking at data sizes. That's really what we're after here. The dependency group in this case is the same size as thie split group. There's no reason to hold off on these tasks. They'll create about as much as they release.

@mrocklin
Copy link
Member Author

So we could do something like this:

class TaskGroup:
    ...
    @property
    def nbytes_expected(self) -> int:
        n_done = self.states["memory"] + self.states["released"] + self.states["forgotton"]
        fraction_done = n_done / len(self)

        if not n_done:
            return 0

        return self.nbytes_total / fraction_done

    def is_rootish(self):
        return self.nbytes_expected > 100 * sum(dep.nbytes_expected for dep in
                                                self.dependencies)

class TaskState:
    ...
    def is_rootish(self):
        return self.group.is_rootish()

This might get a little expensive. If so we might want to maintain a counter on the TaskGroup for how many tasks we've kept up with nbytes.

@mrocklin
Copy link
Member Author

Maybe something like this:

index 369fd0706..5a93799ef 100644
--- a/distributed/scheduler.py
+++ b/distributed/scheduler.py
@@ -1035,6 +1035,10 @@ class TaskGroup:
     #: The total number of bytes that this task group has produced
     nbytes_total: int

+    #: The total number of tasks this task group has finished
+    n_finished: int
+    n_total: int
+
     #: The total amount of time spent on all tasks in this TaskGroup
     duration: float

@@ -1082,6 +1086,8 @@ class TaskGroup:
         self.states = dict.fromkeys(ALL_TASK_STATES, 0)
         self.dependencies = set()
         self.nbytes_total = 0
+        self.n_finished = 0
+        self.n_total = 0
         self.duration = 0
         self.types = set()
         self.start = 0.0
@@ -1105,6 +1111,7 @@ class TaskGroup:

     def add(self, other: TaskState) -> None:
         self.states[other.state] += 1
+        self.n_total += 1
         other.group = self

     def __repr__(self) -> str:
@@ -1119,7 +1126,7 @@ class TaskGroup:
         )

     def __len__(self) -> int:
-        return sum(self.states.values())
+        return self.n_total

     def _to_dict_no_nest(self, *, exclude: Container[str] = ()) -> dict[str, Any]:
         """Dictionary representation for debugging purposes.
@@ -1149,6 +1156,21 @@ class TaskGroup:
             for state, count in self.states.items()
         )

+    @property
+    def nbytes_expected(self) -> int:
+        if not n_finished:
+            return 0
+
+        return self.nbytes_total * self.n_total / self.n_finished
+
+    def is_rootish(self):
+        if self.resource_restrictions or self.worker_restrictions or self.host_restrictions:
+            return False
+        if not self.nbytes_total:
+            return False
+        return self.nbytes_expected > 100 * sum(dep.nbytes_expected for dep in
+                                                self.dependencies)
+

 class TaskState:
     """A simple object holding information about a task.
@@ -1448,6 +1470,7 @@ class TaskState:
         if old_nbytes >= 0:
             diff -= old_nbytes
         self.group.nbytes_total += diff
+        self.group.n_finished += 1
         for ws in self.who_has:
             ws.nbytes += diff
         self.nbytes = nbytes
@@ -2279,7 +2302,7 @@ class SchedulerState:
         """
         ts = self.tasks[key]

-        if self.is_rootish(ts):
+        if ts.group.is_rootish():
             # NOTE: having two root-ish methods is temporary. When the feature flag is

@mrocklin
Copy link
Member Author

Also, I was looking into rootishness and it looks like decide_worker has been sharded into lots of different possibilities with rootish and queued, which made me sad (but I can see how this would have made sense)

@fjetter
Copy link
Member

fjetter commented Jul 31, 2023

I'm a bit confused about the suggestion of measuring nbytes. We need this kind of information before we're scheduling anything. We either have to decide to schedule greedily or queue up. We only ever get an nbytes measurement once a task has been scheduled.
Is your proposal to schedule some groups as queued and once measurements come in we'll unqueue them? If that's the suggestion I'm a mild -0.5 on this. We've had plenty of consistency problems in the past, particularly around tasks potentially being able to change their status from non-queued to queued or the other way round.

There are a couple of arguments over here #7531

I'm generally also less excited about expanding root-ish logic because everything we've learned so far points to dask.order being the issue here, see dask/dask#9995 for a writeup with some links for further reading.
The fact that we're ultimately only fighting against improper ordering makes me push back on suggestions that complicate the control flow even further. I don't mind tweaking, simplifying or expanding the classification logic but I'd be concerned if we wanted to introduce yet another dynamic component to this.

decide_worker has been sharded into lots of different possibilities with rootish and queued, which made me sad (but I can see how this would have made sense)

You're not alone. If you want to dig into this further I suggest for you and I to have a chat about the current state and where we want this to end up

@mrocklin
Copy link
Member Author

Ah, I failed to think that we were assigning all of these at once. I'm curious, when does task-queuing come into play, only for rootish tasks I take it?

@fjetter
Copy link
Member

fjetter commented Aug 8, 2023

Ah, I failed to think that we were assigning all of these at once. I'm curious, when does task-queuing come into play, only for rootish tasks I take it?

Yes, if a task is classified according to Scheduler.is_rootish we look at the number of already assigned tasks on a worker and if there are more than allowed (allowed are ceil(nthreads * worker-saturation)) then we'll queue the tasks up until there is an available slot

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants