Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redesign worker exponential backoff on busy-gather #6173

Merged
merged 5 commits into from
Apr 25, 2022

Conversation

crusaderky
Copy link
Collaborator

@crusaderky crusaderky commented Apr 21, 2022

@@ -1930,6 +1935,8 @@ def transition_missing_fetch(
ts.state = "fetch"
ts.done = False
self.data_needed.push(ts)
for w in ts.who_has:
self.data_needed_per_worker[w].push(ts)
Copy link
Collaborator Author

@crusaderky crusaderky Apr 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixes bug where tasks transitioning from missing to fetch would not be picked up by select_keys_for_gather

while L:
ts = L.pop()
while tasks:
ts = tasks.peek()
Copy link
Collaborator Author

@crusaderky crusaderky Apr 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixes bug where

  1. a task would make an iteration of select_keys_for_gather exceeds total_bytes
  2. before the fetch from that worker is complete, another task with higher priority is added to data_needed on the same worker
  3. at the next iteration of ensure_communicating, the task is not picked up by select_keys_for_gather

# Avoid hammering the worker. If there are multiple replicas
# available, immediately try fetching from a different worker.
self.busy_workers.add(worker)
self.io_loop.call_later(0.15, self._readd_busy_worker, worker)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be replaced with an async instruction within the scope of #5896

who_has = await retry_operation(
self.scheduler.who_has, keys=refresh_who_has
)
self.update_who_has(who_has)
Copy link
Collaborator Author

@crusaderky crusaderky Apr 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notably this query to the scheduler does not happen if all workers that are known to hold a replica are in flight. I suppose that this disparity of treatment is because this worker knows that workers will eventually exit in_flight_workers, while it has no control over busy_workers.

@github-actions
Copy link
Contributor

github-actions bot commented Apr 21, 2022

Unit Test Results

       16 files  ±  0         16 suites  ±0   7h 27m 28s ⏱️ - 1m 18s
  2 729 tests +  2    2 648 ✔️ +  4       80 💤  - 1  0  - 2  1 🔥 +1 
21 718 runs  +17  20 673 ✔️ +10  1 044 💤 +8  0  - 2  1 🔥 +1 

For more details on these errors, see this check.

Results for commit 5bffa37. ± Comparison against base commit 370f456.

♻️ This comment has been updated with latest results.

@crusaderky crusaderky marked this pull request as ready for review April 21, 2022 22:58
@crusaderky
Copy link
Collaborator Author

@fjetter @mrocklin ready for review

@mrocklin
Copy link
Member

Generally what's here seems sensible to me. However, I'm also not going deeply into the logic. I'm mostly trusting @crusaderky and the tests.

I did have a general question though. There are a couple of occasions where you identify and fix a possible bug. Should we invest more time in creating tests for these?

@mrocklin
Copy link
Member

I think that if @fjetter has ample time on Monday for deeper review then it would be good to wait for that. If that's not the case then I'm comfortable merging.

)["f"]
g = c.submit(inc, f, key="g", workers=[a.address])
assert await g == 2
assert_worker_story(a.story("f"), [("receive-dep", lw.address, {"f"})])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this PR, but a quick note to @fjetter

I'm totally fine with uses of stories like this one. I like this because it is a very focused assertion statement. It's clear that we care about this specific thing, rather than copying down the entire transition log. It's also easier to understand the intent from a reader's perspective as well. I get that we're expecting to receive "f" from lw. If this breaks and I have to come fix it in the future I think that I'll be able to quickly understand the point that it was trying to get across. I also think that it's unlikely to break for unrelated reasons.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I think an even better way to assert this would be to assert on incoming/outgoing transfer logs since receive-dep is technically not a transition and only there for 'historic reasons'. Still, I'm Ok with this

@crusaderky
Copy link
Collaborator Author

I did have a general question though. There are a couple of occasions where you identify and fix a possible bug. Should we invest more time in creating tests for these?

Writing tests for select_keys_for_gather is unhealthily complicated today. I'd rather leave it for after the worker state machine refactor.

@mrocklin
Copy link
Member

Fine by me

@crusaderky crusaderky self-assigned this Apr 25, 2022
Copy link
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just had a minor question about a test but that's not a blocker.

)["f"]
g = c.submit(inc, f, key="g", workers=[a.address])
assert await g == 2
assert_worker_story(a.story("f"), [("receive-dep", lw.address, {"f"})])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW I think an even better way to assert this would be to assert on incoming/outgoing transfer logs since receive-dep is technically not a transition and only there for 'historic reasons'. Still, I'm Ok with this

Comment on lines +1838 to +1841
story = b.story("busy-gather")
# 1 busy response straight away, followed by 1 retry every 150ms for 500ms.
# The requests for b and g are clustered together in single messages.
assert 3 <= len(story) <= 7
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the motivation for changing the "timeout h" to this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't remove the h timeout? It's on line 1836.
There was no count on the number of retries before.

@crusaderky crusaderky merged commit 2ef5cf3 into dask:main Apr 25, 2022
@crusaderky crusaderky deleted the busy_worker branch April 25, 2022 13:14
crusaderky added a commit to crusaderky/distributed that referenced this pull request Apr 26, 2022
crusaderky added a commit that referenced this pull request Apr 26, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Redesign worker exponential backoff on busy-gather
3 participants