Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Withhold root tasks [no co assignment] #6614

Merged
merged 104 commits into from
Aug 31, 2022

Conversation

gjoseph92
Copy link
Collaborator

@gjoseph92 gjoseph92 commented Jun 22, 2022

This PR withholds root tasks on the scheduler in a global priority queue. Non-root tasks are unaffected.

Workers are only sent as many root tasks as they have threads, by default. This factor can be configured via distributed.scheduler.worker-saturation (1.5 would send workers 1.5x as many tasks than they have threads, for example). Setting this config value to inf completely disables scheduler-side queuing and retains the current scheduling behavior (minus co-assignment).

This disregards root task co-assignment. Benchmarking will determine whether fixing root task overproduction is enough of a gain to be worth giving up (flawed) co-assignment. Root task assignment here is typically worst-possible-case: neighboring tasks will usually all be assigned to different workers.

I also could/will easily add back co-assignment when distributed.scheduler.worker-saturation is inf EDIT: done. With that, this PR would be entirely feature-flaggable—we could merge it with the default set to inf and see zero change in scheduling out of the box.

Closes #6560, closes #6631, closes #6597 (with withholding mode turned off at least)

Supersedes #6584, which did the same, but for all tasks (even non-root). It also co-mingled unrunnable tasks (due to restrictions) and queued root tasks, which seemed unwise.

  • Tests added / passed
  • Passes pre-commit run --all-files

Idea was that if a `SortedSet` of unrunnable tasks is too expensive, then insertion order is probably _approximately_ priority order, since higher-priority (root) tasks will be scheduled first. This would give us O(1) for all necessary operations, instead of O(logn) for adding and removing.

Interestingly, the SortedSet implementation could be hacked to support O(1) `pop` and `popleft`, and inserting a min/max value. In the most common case (root tasks), we're always inserting a value that's greater than the max. Something like this might be the best tradeoff, since it gives us O(1) in the common case but still maintains the sorted gaurantee, which is easier to reason about.
Now task states on the dashboard are listed in the logical order that tasks transition through.
Simpler, though I think basically just an int of 1 may be the most reasonable.
This is just a hack currently, but maybe it would actually be useful?
This reverts commit df11f719b59aad11f39a27ccae7b2fd4dfd9243a.
When there were multiple root task groups, we were just re-using the last worker for every batch because it had nothing processing on it.

Unintentionally this also fixes dask#6597 in some cases (because the first task goes to processing, but we measure queued, so we pick the same worker for both task groups)
1. The family metric itself is flawed. Added linear chain traversal, but it's still not good. The maxsize is problematic and probably the wrong way to think about it? a) there's quite likely no maxsize parameter that will ever be right, because you could always have multiple independent crazy substructures that are each maxsize+1. b) even when every task would be in the same family because they're all interconnected, there's still benefit to scheduling subsequent things together, even if you do partition. Minimizing priority partitions is always what you want. Maybe there's something where maxsize is not a hard cutoff, but a cutoff for where to split up interconnected structures?
2. Families probably need to be data structures? When a task completes, you'd like to know if it belongs to a family that actually has more tasks to run on that worker, vs the task just happens to look like it belongs to a family but was never scheduled as a rootish task.

Overall I like the family structure for scheduling up/down scaling, but figuring out how to identify them is tricky. Partitioning priority order is great because it totally avoids this problem, of course at the expense of scaling. Can we combine priority and graph structure to identify isolated families when reasonable, partition on priority when not?
Update docstring and add back logic for queuing disabled case
Just easier to explain this way
I think this fix is reasonable? I wonder if occupancy should include queued tasks though?
@github-actions
Copy link
Contributor

github-actions bot commented Jun 23, 2022

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       15 files  ±    0         15 suites  ±0   7h 1m 15s ⏱️ + 18m 18s
  3 071 tests +  18    2 984 ✔️ +  15    85 💤 +1  2 +2 
22 729 runs  +144  21 740 ✔️ +137  986 💤 +4  3 +3 

For more details on these failures, see this check.

Results for commit 093d7dc. ± Comparison against base commit 817ead3.

♻️ This comment has been updated with latest results.

feeling pretty good about just `test_graph_execution_width`
Using it as an API saves having to manage `running` and `idle` in multiple places
hesitant on this, but I don't want to introduce a flaky test
@gjoseph92
Copy link
Collaborator Author

@fjetter I believe all comments have been addressed.

For tests, I went with just test_graph_execution_width. I removed the process memory test. I liked the simplicity of your test suggestion, but test_graph_execution_width is slightly more thorough towards one edge case.

Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed another commit to address a merge conflict. If CI is green(ish) I'll go ahead and merge

@fjetter
Copy link
Member

fjetter commented Aug 31, 2022

I agree we should aim to get this in main quickly and then further iterate. I can see two avenues:

  1. get it in as-is (post minor tweaks), and then run performance benchmarks vs a branch where is_rootish simply returns true. This potentially means getting in main a lot of code to then remove it shortly afterwards.
  2. run perf benchmarks now, before merging, to prove that the is_rootish heuristic is indeed needed, albeit it may be tweaked in the future. To clarify I don't propose to benchmark wildly different tweaks to is_rootish; I would just like a battery of tests with different use cases showing
  • main
  • this PR with worker-saturation: inf (no regression vs main - just for safety)
  • this PR with worker-saturation: 1.2
  • this PR with worker-saturation: 1.2, but with return True at the top of is_rootish

We discussed this early on, before we even started implementation. We agreed to merge this behind the feature flag since this will not change the behavior compared to main.
The goal is to set a default parameter for this value asap by running benchmarks. If we are not happy with the performance or cannot find a value that is a sane default, we may even rip this entire thing out again.

@fjetter fjetter merged commit dd81b42 into dask:main Aug 31, 2022
@gjoseph92 gjoseph92 deleted the withold-root-tasks-no-co-assign branch August 31, 2022 15:00
@gjoseph92 gjoseph92 restored the withold-root-tasks-no-co-assign branch August 31, 2022 15:07
@gjoseph92 gjoseph92 deleted the withold-root-tasks-no-co-assign branch August 31, 2022 15:07
@fjetter fjetter mentioned this pull request Sep 1, 2022
5 tasks
@dcherian
Copy link

@gjoseph92 (and everyone else involved here), thank you! How do I test it out ;)?

with dask.config.set({"distributed.scheduler.worker-saturation": 1.5}):
    result.compute()

Is this right? What range of values should I provide: inf as an upper-bound is not very useful.

@gjoseph92
Copy link
Collaborator Author

There are many different ways to set dask config, and depending on how your clusters are deployed (local vs dask-gateway/pangeo vs dask-cloudprovider vs coiled, etc.), the way to set that config will vary.

Often though, the easiest can be to set an environment variable on the cluster. (Pangeo / dask-gateway docs, coiled docs, saturn docs, dask-cloudprovider seems to support env_vars=.) Note that the variable needs to be set before the scheduler starts—once the scheduler has started, setting it will have no effect.

$ DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION=1.0 dask-scheduler

For a local cluster, when creating your cluster, you can just do:

with dask.config.set({"distributed.scheduler.worker-saturation": 1.0}):
    client = distributed.Client(n_workers=..., threads_per_worker=...)

If you can't get the config to work, it's possible to change the setting on a live cluster. You could also use this to try different settings without re-creating the cluster. Only run this while the scheduler is idle (no tasks). Otherwise, you'll probably break your cluster.

# enable queuing (new behavior)
client.run_on_scheduler(lambda dask_scheduler: setattr(dask_scheduler, "WORKER_SATURATION", 1.0))

# disable queuing (old behavior)
client.run_on_scheduler(lambda dask_scheduler: setattr(dask_scheduler, "WORKER_SATURATION", float("inf")))

What range of values should I provide: inf as an upper-bound is not very useful

I would try the 1.0 - 2.0 range. I would expect 1.0 to usually be what you want. We are doing some benchmarking, and hopefully will figure out what a good value is across the board, and remove the need/ability to set this value in the future.


@dcherian and anyone who tries this, please report back with your findings, regardless of what they are! We would really like to hear how this works on real-world uses.

@jrbourbeau
Copy link
Member

Just checking in here, @dcherian any luck trying things out? Happy to help out

@TomNicholas
Copy link

TomNicholas commented Oct 17, 2022

Great to see this merged (and exciting to see Deepak's results too)!

Now that we have 2022.9.2 on the LEAP hub (2i2c-org/infrastructure#1769), I'm trying this out there.

Unfortunately I can't seem to set the worker saturation option successfully. 😕

Setting via the gateway cluster options manager isn't working - if I do this

options.environment = {"MALLOC_TRIM_THRESHOLD_": "0"}
gc = g.new_cluster(cluster_options=options)

the cluster starts as expected, but if I do this

options.environment = {"MALLOC_TRIM_THRESHOLD_": "0", "DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION": 1.1}}
gc = g.new_cluster(cluster_options=options)

then it hangs indefinitely on cluster creation.

I'm not quite sure where or when I'm supposed to run this
$ DASK_DISTRIBUTED__SCHEDULER__WORKER_SATURATION=1.0 dask-scheduler - it creates a new scheduler? Is that different to the cluster?

I also tried client.run_on_scheduler(lambda dask_scheduler: setattr(dask_scheduler, "WORKER_SATURATION", 1.1))
but when I did the check suggested on the pangeo cloud docs I just get an empty dict back.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants