Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new, more careful option for downscaling #877

Closed
wants to merge 5 commits into from

Conversation

sil-lnagel
Copy link

@sil-lnagel sil-lnagel commented Mar 28, 2024

Background / Problem

After my forum post https://dask.discourse.group/t/shuffle-p2p-unstable-with-adaptive-k8s-operator/2600 @hendrikmakait confirmed my suspicion that downscaling and reshuffling (especially with p2p) seem not to work well together.

For our test workload the grafana dashboard shows the following:

default_behavior

It is clearly visible that as soon as the scheduler recommends to retire one worker (ie.: 5->4) the worker is retired and the shuffle is (at least partially) restarted, as can be seen in the increase in waiting tasks. In the end it just "finishes" because shuffle fails "shuffle_barrier failed during shuffle 5da46c9c80547f11890d48656a58c7f4". Sometimes it works, but in most cases it either needs multiple shuffle restarts or fails at some point.

Since this is a feature, we need for our workloads which heavily depend on reshuffling by nature, we further investigated this issue and came up with a potential improvement for the operator.

Proposed Solution

In the existing implementation of the operator it just relies on the scheduler to propose workers to close. If the worker is part of a shuffle this causes the shuffle to restart.

Our idea was the following:

  • don't immediately retire the workers the scheduler suggests
  • instead query their /metrics endpoint and verify that no tasks are running
  • only retire if they do nothing, otherwise delay the scaling

This resolves the restart issues in all our test workloads, while still allowing scale down after work is finished.

Potential issues we forsee

  • workers are never shut down until they are done even if they only have 1 task they are working on
  • we attempted to assign worker states to ACTIVE_STATES and IDLE_STATES but we would appreciate any input if this partition was correct
  • the get_workers rpc is not available as HTTP endpoint in dask.distributed, so we can only use RPC here

Additional changes

Made a couple of hard coded values configurable, that we used to fine tune the scaling behavior (defaults should be unchanged)

  • kubernetes.controller.autoscaler.cooldown
  • kubernetes.controller.worker.dashboard-port
  • kubernetes.controller.worker-allocation.batch-size
  • kubernetes.controller.worker-allocation.delay

Added parameters to enable the scaling:

  • kubernetes.controller.autoscaler.method (left the existing behavior as default, set to "careful" to enable)
  • kubernetes.controller.autoscaler.retry-delay

careful
A picture of the proposed operator running smoothly :)

We are very interested in any feedback and improvement suggestions. Please let us know if this something that could improve the operator or if there are any questions / concerns.

@hendrikmakait
Copy link
Member

Our idea was the following:

  • don't immediately retire the workers the scheduler suggests
  • instead query their /metrics endpoint and verify that no tasks are running
  • only retire if they do nothing, otherwise delay the scaling

I'm currently out on PTO, so I haven't had a closer look at the PR. I'll just note that the idea sketched out above is not sufficient to prevent P2P restarts as workers might be idle due to straggling transfer tasks, but still required for the remainder of the shuffle. cc @fjetter

See also dask/distributed#8579 for a related issue/possible solution.

@sil-lnagel
Copy link
Author

sil-lnagel commented Mar 29, 2024

thanks for bringing the related issue to my attention @hendrikmakait, for your comment and sorry to bother you on your easter vacation. that's exactly why I wanted to tap your knowledge on the topic.

what we did is basically the "brute force method" as we lack deeper insight and especially the p2p process in dask distributed. in my opinion the real question that any kind of scaler needs to answer is: "can I kill this without causing too much harm?", which in my uninformed view should be something the scheduler should be able to answer. therefore, from my, probably naive perspective, the more elegant solution would probably something like:

a) If the controller or anyone asks the scheduler "what can I close" (rpc: workers_to_close), it should only suggest workers that do not take participate a P2P shuffle.

or if only the worker knows:

b) it could also expose some "I can't be restarted right now" - flag, which could be queried by a scaler similar what is proposed here, but by asking the real question instead of using some imperfect proxy which is looking at the worker state (this PR).

EDIT: ok, I realize now that is what @fjetter already proposed there...

@hendrikmakait
Copy link
Member

No worries, I chose to answer after all :)

a) If the controller or anyone asks the scheduler "what can I close" (rpc: workers_to_close), it should only suggest workers that do not take participate a P2P shuffle.

Yes, that's the basic idea behind @fjetter's comment on the other issue.

Copy link
Member

@jacobtomlinson jacobtomlinson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really appreciate the effort going into this here. I haven't looked deeply at the code yet as I'm concerned that this problem is being solved in the wrong place.

Dask Kubernetes shouldn't be worrying about the details of the downward scaling. It should just ask the scheduler periodically if the cluster needs to scale up or down and act accordingly. I feel like all of this logic around avoiding disrupting operations like shuffles should be handled in the scheduler. If a shuffle is in progress the scheduler shouldn't be suggesting to retire workers.

What do you think @fjetter @hendrikmakait?

@fjetter
Copy link
Member

fjetter commented Apr 9, 2024

I opened a PR in dask/distributed#8610 that modifies Scheduler.workers_to_close to disallow downscaling of workers if they are participating in P2P

@jacobtomlinson
Copy link
Member

I'm going to close this PR out in favour of fixes in distributed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants