Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] Structural co-assignment #7076

Draft
wants to merge 24 commits into
base: main
Choose a base branch
from

Conversation

gjoseph92
Copy link
Collaborator

@gjoseph92 gjoseph92 commented Sep 27, 2022

This explores explicitly traversing graph structure to determine families of tasks that should be assigned to the same worker.

The core realization is that, although traversing the graph is expensive, if we schedule all the sibling tasks that we find at once, then we're amortizing that cost over all those siblings. So per task, cost should still be reasonable.

A few neat things pop out of this approach:

  • No need for queue rebalancing algorithms at all during scale up/down. We can pick a worker for a rescheduled task and maintain co-assignment during scaling using the same decide_worker logic we use normally.
  • No need for tuning a worker-saturation parameter. Graph structure indicates when over-saturating workers is or isn't useful.
  • Identifying which tasks to speculatively assign to workers is trivial. Implementing STA on top of this is mostly a bookkeeping exercise.
  • At first glance, if you completely remove all other forms of scheduling (worker_objective-based) and apply this structural approach to every task, the results don't even seem that bad.
  • Tests added / passed
  • Passes pre-commit run --all-files

Testing this change in CI to see what fails. Subtle but significant: _all_ recommendations from a transition will be processed on the next cycle. Previously, only recommendations for the current key, or for keys with no recommendations yet, would be processed on the next cycle. If a key already had a recommendation, the recommendation would be updated, but still processed in the original (priority) order.
@gjoseph92 gjoseph92 self-assigned this Sep 27, 2022
@github-actions
Copy link
Contributor

github-actions bot commented Sep 27, 2022

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       15 files  +       1         15 suites  +1   6h 32m 16s ⏱️ + 55m 12s
  3 170 tests +     54    3 085 ✔️ +     66    84 💤  - 12  1 ±0 
23 464 runs  +1 639  22 557 ✔️ +1 700  906 💤  - 61  1 ±0 

For more details on these failures, see this check.

Results for commit 3a0329e. ± Comparison against base commit 8c4133c.

♻️ This comment has been updated with latest results.

the linear chain traversal of the family metric isn't good. more broadly, we need to think about what states we expect tasks in families to be in.

almost certainly, family should be a data structure that's maybe populated top-down in `update_graph`. the only reason we're able to get away with it not being a structure right now is because we assume that all tasks in a family are either not scheduled, or all in processing. there are graph structures where that's probably impossible.
no need for `planned_on` when non-root tasks won't follow this code path anyway. normal `decide_worker` should do a decent job in that case.

we do still need a `decide_worker_from_family` that goes wherever sibling tasks are already in memory or processing. this should basically make rescheduling work as well.
deadlocks when a worker is restarted midway through currently
this almost certainly needs to be revisited. it's a sort of reasonable default, since maxsize is really about what's "widely-shared" or not. and a good definition of widely-shared is that it saturates the cluster.

however, i have a feeling this can also be too small for small clusters. especially in a local cluster with, say, 8 threads, you could get reasonably-sized families split across workers instead of co-assigned.
didn't affect test, just unnecessarily wrong
this fixes a number of things:
* when cluster is larger than tasks, explicitly break up co-assignment to ensure full utilization
* separate max family size from widely-shared size. widely-shared size is directly related to cluster size; max size is just about an upper bound on iteration.
* downstreams larger than max size shouldn't be downstreams
idk if/how this actually matters
@gjoseph92 gjoseph92 changed the title [DNM] Scheduler: process new recommendations immediately [DNM] Structural co-assignment Sep 30, 2022
could have traversed into widely-shared deps
Returning `next` instead of `next(iter(ts.dependents))` was the completely wrong direction.

This reverts commit 63274fa.
maintaining this sort adds a moderate amount of scheduler overhead when workers go in and out of it so often. especially when we had no need for it to be sorted in the first place.
Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for queue rebalancing algorithms at all during scale up/down. We can pick a worker for a rescheduled task and maintain co-assignment during scaling using the same decide_worker logic we use normally.

IIUC maintain in this context does not mean we're maintaining state but rather the opposite, that we're using a stateless algorithm to infer this, correct?

No need for tuning a worker-saturation parameter. Graph structure indicates when over-saturating workers is or isn't useful.

It's still used for stuff like _worker_full. Can't we get rid of this parameter entirely?

Implementing STA on top of this is mostly a bookkeeping exercise.

The worker state machine is also just bookkeeping ;)

I think we're still missing a bit more. This would answer the question of which tasks to schedule speculatively and in which order. The remaining question is how to decide where to put such a task (might be easy but still an unanswered question, I believe)

At first glance, if you completely remove all other forms of scheduling (worker_objective-based) and apply this structural approach to every task, the results don't even seem that bad.

I'm not too surprised but am glad to hear either way. I would like us to move away from duration-measurement based scheduling anyhow (and by extension occupancy)


  • Can you elaborate how the added complexity is actually amortized? The runtime complexity is the thing that truly concerns me and I'd like to understand better how you think this is amortized
  • I have some concerns about widely_shared_cutoff and how it is tied to the cluster size. I would really appreciate us trying to work on a static classification Factor out and instrument task categorization logic - static graph analysis #6922 and try to split off dynamic components (if another loop is necessary, that's fine since it doesn't increase runtime complexity but makes the code much better to maintain)
  • Some of the comments already talk about assumptions that only hold for dask collections. I don't mind baking this in but at the same time I wonder if there is not a better approach then using HLG or similar.
  • I also wonder if dask.order does not offer something similar to family. @eriknw I would appreciate a brief review of the family function. I would love to build on previous work here.

Comment on lines -1310 to -1311
#: (actually a SortedDict, but the sortedcontainers package isn't annotated)
idle: dict[str, WorkerState]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember that parts of the AMM rebalancing rely on this being a dictionary since it makes use if it's insertion ordering

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see idle used anywhere in active_memory_manager.py. Is this something that's planned but not implemented yet? Either way, would idle need to be a SortedDict, or just a plain dict that maintains insertion order?

Now that workers go in and out of idle all the time with the saturation-based definition, maintaining the sort is somewhat expensive, especially given that I don't see us using the sortedness anywhere.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a plain dict that maintains insertion order?

the case I have in memory requries a plain dict (insertion order), not a sorteddict

maybe I remember incorrectly. cc @crusaderky

distributed/scheduler.py Show resolved Hide resolved
return next(iter(ts.dependents))


def family(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I strongly suggest to start this kind of stuff in a new (private) module

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to, but then there's a circular dependency, since this refers to TaskState, WorkerState etc.

If those imports are in a if TYPE_CHECKING that would be okay. I have a feeling future modifications to family might need to access full scheduler state, but as long as it's just annotations we'll be okay.

siblings: set[TaskState] = set()
downstream: set[TaskState] = set()
# TODO maintain `seen` set to avoid repeated traversal? Should we add on the way down, or just back up?
for dts in ts.dependents: # TODO even support multiple dependents?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case this is not apparent to everyone, this loop has highly non-linear runtime.

In essence this is what I proposed earlier in #4864 (comment) (slightly worse due to the walk of the linear chains)

I don't think this a deal breaker but we should discuss

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I started with what you had there and then added linear chain traversal.

The linear chain walking is the unbounded part of this loop. The double-for loop going down dependents and up dependencies is non-linear, but bounded by maxsize.

We'll want to profile this on real workloads. In the bit of initial profiling I've done locally, family was too fast to even show up in a py-spy profile (but updating the idle SortedDict did, which is why I removed it). There's a lot we could think about to further increase performance, but first we should see how much of an issue it actually is.



def _next_in_linear_chain(ts: TaskState, cutoff: int) -> TaskState | None:
if len(ts.dependents) != 1 or len(ts.dependencies) > cutoff:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we not walk tasks with widely shared dependencies? Why wouldn't we want to walk the chain

flowchart BT

A1 --> B
A2 --> B
A3 --> B
A4 --> B
A5 --> B
B --> C
C --> D
Loading

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we re-use / factor out some code from fuse / fuse_linear? IIUC the code there handles more complicated situations that'd also be interesting for us

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think len(ts.dependencies) > cutoff is just about setting an upper bound for iterating through dependencies, to constrain runtime.

The test is effectively requiring len(ts.dependencies) == 1, but ignoring any widely-shared dependencies in there. As a shortcut, if len(ts.dependencies) > cutoff, it's extremely unlike that all of those deps are widely-shared, so we save ourselves from iterating through all of them and bail out early.

There is probably a better way of expressing all this than what I have here. It's a little special-cased for this one circumstance I'm aware of; there are probably others it doesn't cover well yet.

This test is an example of why we exclude widely-shared deps from linear chains:

s s s
/|\ /|\ /\
a a a a a a a a
|\|\|\|\|/|/|/|
| | | | s | | |
r r r r r r r r

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the graph you're showing, assuming we're calling family(A*), we don't have any reason to traverse above B. The goal is to identify the set of immediate sibling tasks which will all need to be in memory at the same time to compute a mutual dependency.

Imagine that the full graph actually looks like this:

flowchart BT

A1 --> B
A2 --> B
A3 --> B
A4 --> B
A5 --> B
B --> C
C --> D

D --> Z
Y --> Z
X1 --> Y
X2 --> Y
X3 --> Y
Loading

It's true that we'd probably want to run As and Xs on the same worker. However, we don't need to consider them siblings, because they don't need to be in memory at the same time. Only D and Y need to be in memory at the same time.

So there's no reason to traverse above B in identifying siblings.

"Good to co-assign" and "direct siblings" are different properties, and I'd like to keep them separate for now. It's much easier to put an upper bound on traversal distance to find direct siblings than it is to find all relatives.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think len(ts.dependencies) > cutoff is just about setting an upper bound for iterating through dependencies, to constrain runtime.

I don't mind introducing a cutoff here to protect from unreasonable traversals but I'm struggling a lot with motivating this cut to be correlated to the number of workers in this case

return {ts.key: "queued"}, {}, {}

# TODO maxsize as config/what?!
fam = family(ts, maxsize=20, widely_shared_cutoff=len(state.workers))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get why widely_shared_cutoff=len(state.workers). At least when reading the code of family I get the impression that this parameter is used in many different places, partially to restrict runtime complexity, partially to assess parallelism. I don't think a "family" should be mixed up with cluster size.

I do believe we can and should differentiate between a static graph analysis and a dynamic assessment of potential of parallelism.

A widely shared dependency, e.g. one that is used by all root-ish tasks is a widely shared dependency no matter how large the cluster is.

If we were able to remove this dynamic component we could calculate this family once at update_graph time and be done with it, reducing the concerns about runtime complexity significantly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I don't think len(state.workers) is the most correct metric here. But I picked it because it's a kind of sensible and very cheap cutoff to use.

A static metric would likely require knowing some things about the overall graph that we're not measuring right now, like the total number of root tasks in the system for instance. Coming up with a better metric is doable, but I didn't want to take it on for this proof-of-concept yet.

Here's why I chose this initially:

I'd first suggest reviewing the conversation in #5325 (comment).

The definition of widely-shared dependency I'm working with is basically: "a dependency whose current location should be ignored during scheduling, because in the process of scheduling all its dependents (without leaving workers idle), it will end up being copied to every worker anyway."

So an obvious heuristic for whether a task will be widely-shared is whether it has more dependents than there are threads. If that's the case, then the dependents will have to run on all workers in order to not leave some idle.

And I just used nworkers instead of nthreads because it's more conservative to avoid dogpiling, and it makes the tests pass.

Otherwise, test_co_assign_scale_up fails before the scale-up even happens. Using a larger cutoff number, the single root task (the Event) is considered the sibling of all the roots. So you effectively get no co-assignment.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do believe we can and should differentiate between a static graph analysis and a dynamic assessment of potential of parallelism.

I like this idea overall though. We may choose what to do with a family dynamically, based on cluster size. But I hope we can make the definition of "what is a family" static. I will look into that.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could calculate this family once at update_graph time and be done with it, reducing the concerns about runtime complexity significantly

It would probably be good to be able to cache the family and never have to recompute it. However, it may not be good to put it all in update_graph. update_graph, and things like dask.order within it, already can block the event loop for extremely long times (easily 30s with large graphs).

One thing I like about the approach here is how it spreads out the family traversal over time, and defers it until it's actually necessary.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"a dependency whose current location should be ignored during scheduling, because in the process of scheduling all its dependents (without leaving workers idle), it will end up being copied to every worker anyway."

I accept that this is not straight forward and I honestly don't care about what magic number we use for initial prototyping. I care mostly about this number being static vs dynamic.

For instance, what you are describing is something like a "max-breadth" metric which I'm pretty sure we can get out of something in dask.order. Counting the number of root tasks (not root-ish, i.e. zero dependencies) would also be trivial. No matter what we're using here, this is static. I'm just trying to understand what is a static and what is a dynamic parameter and it would be helpful for me if we can use different variables for the cases where it's static vs dynamic. At the very least a comment in the code.

I like this idea overall though. We may choose what to do with a family dynamically, based on cluster size. But I hope we can make the definition of "what is a family" static. I will look into that.

Exactly!
Random thoughts without any deeper reflection

  • calculate the total number of detected families and compare with number of workers
  • size of siblings vs workers

It would probably be good to be able to cache the family and never have to recompute it. However, it may not be good to put it all in update_graph. update_graph, and things like dask.order within it, already can block the event loop for extremely long times (easily 30s with large graphs).

Sure, maybe. If we agree this to be a static component we can cache it. If we cache this right away by computing it in update_graph and assigning it to the task or by calculating it on demand is mostly an implementation detail.
The one problem I encounter here either way is cache invalidation. Whenever a new update_graph is called we'd likely need to recompute the entire graph (at least if the two graphs are connected)

One thing I like about the approach here is how it spreads out the family traversal over time, and defers it until it's actually necessary.

Yes, I get that. There are two problems we're talking about right now

  1. Finding a good algorithm for the decision. In this instance, I care about the static component of this calculation the most.
  2. Making sure that the execution of this algorithm is fast and smooth (runtime complexity, event loop blocking, etc.)

A counter argument for JIT computation (in decide*/transition) is that this increases latencies for scheduling decisions compared to AOT (in update_graph). I could see both being valuable. Moving it from the one to the other should be easy enough which is why I care much more about 1.)

Comment on lines +8378 to +8384
if (
sts is ts
or len(sts.dependents)
>= widely_shared_cutoff # ignore widely-shared siblings
or sts in downstream # a->b, b->c, a->c. downstream takes priority.
):
continue
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC this is the only place where widely_shared_cutoff should be interpreted as a cluster size. if so, we should easily be able to split this dynamic component off

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's also important when walking chains up and down, as mentioned in other comments, for cases like

s s s
/|\ /|\ /\
a a a a a a a a
|\|\|\|\|/|/|/|
| | | | s | | |
r r r r r r r r

Copy link
Collaborator Author

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC maintain in this context does not mean we're maintaining state but rather the opposite, that we're using a stateless algorithm to infer this, correct?

Yes, "maintain" as in "still get" co-assignment after cluster scaling. Currently (using priorities) co-assignment is destroyed by scaling.

It's still used for stuff like _worker_full. Can't we get rid of this parameter entirely?

Yes, I'm imagining we'd hardcode it to 1.0. (Because sibling tasks get to cut in line and oversaturate workers, 1.0 becomes such a sensible default we'd probably remove the parameter entirely.)

Comment on lines -1310 to -1311
#: (actually a SortedDict, but the sortedcontainers package isn't annotated)
idle: dict[str, WorkerState]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see idle used anywhere in active_memory_manager.py. Is this something that's planned but not implemented yet? Either way, would idle need to be a SortedDict, or just a plain dict that maintains insertion order?

Now that workers go in and out of idle all the time with the saturation-based definition, maintaining the sort is somewhat expensive, especially given that I don't see us using the sortedness anywhere.

return next(iter(ts.dependents))


def family(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to, but then there's a circular dependency, since this refers to TaskState, WorkerState etc.

If those imports are in a if TYPE_CHECKING that would be okay. I have a feeling future modifications to family might need to access full scheduler state, but as long as it's just annotations we'll be okay.

siblings: set[TaskState] = set()
downstream: set[TaskState] = set()
# TODO maintain `seen` set to avoid repeated traversal? Should we add on the way down, or just back up?
for dts in ts.dependents: # TODO even support multiple dependents?
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I started with what you had there and then added linear chain traversal.

The linear chain walking is the unbounded part of this loop. The double-for loop going down dependents and up dependencies is non-linear, but bounded by maxsize.

We'll want to profile this on real workloads. In the bit of initial profiling I've done locally, family was too fast to even show up in a py-spy profile (but updating the idle SortedDict did, which is why I removed it). There's a lot we could think about to further increase performance, but first we should see how much of an issue it actually is.



def _next_in_linear_chain(ts: TaskState, cutoff: int) -> TaskState | None:
if len(ts.dependents) != 1 or len(ts.dependencies) > cutoff:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think len(ts.dependencies) > cutoff is just about setting an upper bound for iterating through dependencies, to constrain runtime.

The test is effectively requiring len(ts.dependencies) == 1, but ignoring any widely-shared dependencies in there. As a shortcut, if len(ts.dependencies) > cutoff, it's extremely unlike that all of those deps are widely-shared, so we save ourselves from iterating through all of them and bail out early.

There is probably a better way of expressing all this than what I have here. It's a little special-cased for this one circumstance I'm aware of; there are probably others it doesn't cover well yet.

This test is an example of why we exclude widely-shared deps from linear chains:

s s s
/|\ /|\ /\
a a a a a a a a
|\|\|\|\|/|/|/|
| | | | s | | |
r r r r r r r r

return {ts.key: "queued"}, {}, {}

# TODO maxsize as config/what?!
fam = family(ts, maxsize=20, widely_shared_cutoff=len(state.workers))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I don't think len(state.workers) is the most correct metric here. But I picked it because it's a kind of sensible and very cheap cutoff to use.

A static metric would likely require knowing some things about the overall graph that we're not measuring right now, like the total number of root tasks in the system for instance. Coming up with a better metric is doable, but I didn't want to take it on for this proof-of-concept yet.

Here's why I chose this initially:

I'd first suggest reviewing the conversation in #5325 (comment).

The definition of widely-shared dependency I'm working with is basically: "a dependency whose current location should be ignored during scheduling, because in the process of scheduling all its dependents (without leaving workers idle), it will end up being copied to every worker anyway."

So an obvious heuristic for whether a task will be widely-shared is whether it has more dependents than there are threads. If that's the case, then the dependents will have to run on all workers in order to not leave some idle.

And I just used nworkers instead of nthreads because it's more conservative to avoid dogpiling, and it makes the tests pass.

Otherwise, test_co_assign_scale_up fails before the scale-up even happens. Using a larger cutoff number, the single root task (the Event) is considered the sibling of all the roots. So you effectively get no co-assignment.

return {ts.key: "queued"}, {}, {}

# TODO maxsize as config/what?!
fam = family(ts, maxsize=20, widely_shared_cutoff=len(state.workers))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do believe we can and should differentiate between a static graph analysis and a dynamic assessment of potential of parallelism.

I like this idea overall though. We may choose what to do with a family dynamically, based on cluster size. But I hope we can make the definition of "what is a family" static. I will look into that.



def _next_in_linear_chain(ts: TaskState, cutoff: int) -> TaskState | None:
if len(ts.dependents) != 1 or len(ts.dependencies) > cutoff:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the graph you're showing, assuming we're calling family(A*), we don't have any reason to traverse above B. The goal is to identify the set of immediate sibling tasks which will all need to be in memory at the same time to compute a mutual dependency.

Imagine that the full graph actually looks like this:

flowchart BT

A1 --> B
A2 --> B
A3 --> B
A4 --> B
A5 --> B
B --> C
C --> D

D --> Z
Y --> Z
X1 --> Y
X2 --> Y
X3 --> Y
Loading

It's true that we'd probably want to run As and Xs on the same worker. However, we don't need to consider them siblings, because they don't need to be in memory at the same time. Only D and Y need to be in memory at the same time.

So there's no reason to traverse above B in identifying siblings.

"Good to co-assign" and "direct siblings" are different properties, and I'd like to keep them separate for now. It's much easier to put an upper bound on traversal distance to find direct siblings than it is to find all relatives.

Comment on lines +8378 to +8384
if (
sts is ts
or len(sts.dependents)
>= widely_shared_cutoff # ignore widely-shared siblings
or sts in downstream # a->b, b->c, a->c. downstream takes priority.
):
continue
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's also important when walking chains up and down, as mentioned in other comments, for cases like

s s s
/|\ /|\ /\
a a a a a a a a
|\|\|\|\|/|/|/|
| | | | s | | |
r r r r r r r r

@gjoseph92
Copy link
Collaborator Author

gjoseph92 commented Oct 5, 2022

the question of which tasks to schedule speculatively and in which order.

family returns siblings, downstream. downstream will be the set of tasks which should be scheduled speculatively.

The remaining question is how to decide where to put such a task

It should be put on the same worker we're sending siblings to right now.

This would mean the entire family gets sent to the worker at once:

diff --git a/distributed/scheduler.py b/distributed/scheduler.py
index 9f0d602a..6c9add60 100644
--- a/distributed/scheduler.py
+++ b/distributed/scheduler.py
@@ -7780,6 +7780,12 @@ def _queueable_to_processing(
                 # recommendation for the key from the recommendations queue.
                 recommendations[fts.key] = "processing"
 
+        for fts in downstream:
+            if fts.state == "released":
+                state._transition(fts.key, "waiting")
+            assert fts.state == "waiting"
+            update_msgs(worker_msgs, _add_to_processing_speculative(state, fts, ws))
+
     return recommendations, {}, worker_msgs

The neat thing is that, hopefully, this works for downstream tasks too.

For example in

flowchart BT

A --> C
B --> C
D --> E
C --> E
Loading

A and B are siblings, so they get scheduled at once. C is their downstream, so it also gets scheduled on the same worker. When D gets scheduled, C is its sibling. And C is already processing somewhere (via STA). So D goes to the same worker, via decide_worker_from_family's affinity for processing_on. And as the downstream, E gets speculatively assigned there as well.

In that example, the worker gets autonomy to run the whole subgraph up front without any scheduler back-and-forth, decreasing runtime. (We would also need #5114 as a corollary to STA so it doesn't run out of memory.)

Obviously that's an optimistic picture; there are other cases this would fall over. It's just meant to express a general idea of how this might work.

@gjoseph92
Copy link
Collaborator Author

Can you elaborate how the added complexity is actually amortized? The runtime complexity is the thing that truly concerns me and I'd like to understand better how you think this is amortized

Because every sibling task gets sent to processing at once, in most cases we won't call family again on any of its siblings.

Let S be the number of siblings of a task. S < min(maxsize, cutoff).

L is the depth of linear chains. L should probably get a bound put on it too, but let's assume for now that it's reasonably small. And in any case, it typically does not scale with the total number of tasks. So let's call it ~O(1).

So we pay O(S * L), aka O(S), on the first task we encounter.

But then we submit the S - 1 other sibling tasks we just found, for free. So shared among all those tasks, the cost is O(1).

Besides ridiculously-long linear chains, the worst-case scenarios right now occur when a task has a lot of potential siblings, but none actually meet the criteria. Then we traverse a lot, but don't get more things to schedule and share cost with. But this isn't inherent to the approach. It's just because the algorithm isn't done yet, and I'm aiming for readability right now over handing these odd edge cases.

@gjoseph92
Copy link
Collaborator Author

Nobody asked this yet, but it's useful to write down:

Why traverse linear chains? Can't we just ignore that for now / punt that off on graph optimization?

I was very much hoping to ignore it, but sadly concluded that it's not realistic:

  1. Practically, getting fusion to actually happen everywhere in dask is way more work, and way slower, than implementing chain-traversal in family :)
  2. It will always be possible to do things that make fusion impossible: dask.compute(initial_data, derived_data), annotations, etc. (The existing tests for co-assignment do exactly this.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants