Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redesign worker exponential backoff on busy-gather #6169

Closed
crusaderky opened this issue Apr 21, 2022 · 3 comments · Fixed by #6173
Closed

Redesign worker exponential backoff on busy-gather #6169

crusaderky opened this issue Apr 21, 2022 · 3 comments · Fixed by #6173
Assignees

Comments

@crusaderky
Copy link
Collaborator

When a worker receives a gather_dep request from another worker, but it is too busy with other transfers already, it will reply {"status": "busy"}. When this happens, there are three possible use cases:

  1. there are other workers that hold replicas of the same task, and the requesting worker knows about them
  2. there are other workers that hold replicas of the same task, but the requesting worker doesn't know. The worker needs to ask the scheduler through query_who_has.
  3. the busy worker holds the only replica of that task

worker.py implements an exponential backoff system, implemented in 2018 and never revisited (#2092) which, in theory, prevents it from

  • spamming the busy worker with consecutive gather requests
  • spamming the scheduler withquery_who_has

It is implemented as follows (pseudocode):

class Worker:
    repetitively_busy = 0
    data_needed = Heap()  # tasks in fetch status
    
    async def handle_scheduler(self):
        while True:
            await next_message_from_scheduler
            self.ensure_communicating()

    def ensure_communicating(self):
        while self.data_needed:
            ts = self.data_needed.pop()

            ws_local = [ws for ws in ts.who_has if same_host(ws, self)]
            ws_remote = [ws for ws in ts.who_has if not same_host(ws, self)]
            ws = random.choice(ws_local or ws_remote)

            self.transition_fetch_flight(ts)
            loop.add_callback(self.gather_dep, ws, ts)  # fire and forget task

    async def gather_dep(self, ws, ts):
        response = await get_data_from_worker(ws, ts)
        if response["status"] == "busy":
            self.transition_flight_fetch(ts)  # Add task back to self.data_needed
            # Exponential backoff to avoid hammering scheduler/worker
            self.repetitively_busy += 1
            await asyncio.sleep(0.100 * 1.5**self.repetitively_busy)
            await self.query_who_has(ts)
        else:
            self.repetitively_busy = 0
            # Omissis: deal with successful response and other failure use cases
        self.ensure_communicating()

If any other unrelated gather_dep completes or any message arrives from the scheduler, ensure_communicating will try fetching the task again. On a busy worker, this will happen almost instantly.
As there is no way to flag which worker was busy, if the busy worker was either the only local worker or the only worker on the whole cluster holding the data, it will be targeted again with get_data_from_worker.
This means that, almost instantly, the busy worker will reply the same thing and repetitively_busy will bump up a notch - unless a gather_dep from another worker recently terminated and the other worker was not busy.
It also means that there will be multiple identical gather_dep calls running in parallel, all waiting on the sleep.
query_who_has will kick off potentially several seconds later, potentially when it's no longer needed, e.g. because another iteration of ensure_communicating successfully fetched the task from another worker.

On the flip side, the worker may experience a burst of heavy chatter with the scheduler (but not other workers), followed by silence. For example, a bunch of tasks are submitted in a short timeframe, with just enough time between each so that they land in different messages, and now we need to wait for their execution to complete. This may cause repetitively_busy to rocket up to a very high number and sleep for - potentially - centuries before anything kicks off ensure_communicating again. DEADLOCK!

Finally, this interacts with select_keys_for_gather, which coagulates individually small fetches into the same message. If there are multiple tasks in fetch status pending and they're fairly large (target_message_size = 50MB), then a single ensure_communicating will spawn multiple contemporaneous gather_dep requests to the same worker, up to total_out_connections. They may all return busy, which in turn will increase repetitively_busy by several notches in a single go. Reducing target_message_size may result in a substantial slowdown, not because of the select_keys_for_gather system itself, but because repetitively_busy is reaching a higher number in a single iteration of ensure_communicating.

Besides its own shortcomings, this design is a blocker to #5896. The refactor removes the periodic kick to ensure_communicating from handle_scheduler, and instead kicks it off from transition_*_fetch. This means that the above will enter an infinite loop where the requesting worker starts accumulating more and more gather_dep tasks, and the busy worker is constantly hammered with get_data_from_worker. The sleep time quickly rises into the centuries.
A naive solution would be to move the sleep before self.transition_flight_fetch(ts). However, this would mean losing an unplanned, but highly desirable feature of the current design, which is if there are multiple workers holding replicas of the same task and the requesting worker is currently chatty, then another random worker will be tried straight away without waiting for the sleep.

Proposed design

  • Get rid of repetitively_busy.
  • Add to distributed.worker.WorkerState a busy: bool flag.
  • Worker changes as follows:
class Worker:
    data_needed = Heap()  # tasks in fetch status
    
    async def handle_scheduler(self):
        while True:
            await next_message_from_scheduler
            self.ensure_communicating()  # To be removed in #5896

    def ensure_communicating(self):
        busy_pushback = []
        while self.data_needed:
            ts = self.data_needed.pop()

            who_has = [ws for ws in ts.who_has if not ws.busy]
            if not who_has:
                busy_pushback.append(ts)
                continue

            ws_local = [ws for ws in who_has if same_host(ws, self)]
            ws_remote = [ws for ws in who_has if not same_host(ws, self)]
            ws = random.choice(ws_local or ws_remote)

            self.transition_fetch_flight(ts)
            loop.add_callback(self.gather_dep, ws, ts)  # fire and forget task

        for ts in busy_pushback:
            self.data_needed.push(ts)

    async def gather_dep(self, ws, ts):
        response = await get_data_from_worker(ws, ts)
        if response["status"] == "busy":
            ws.busy = True

            def reset_busy():
                ws.busy = False
                self.ensure_communicating()

            call_later(0.15, reset_busy)  # Hardcoded, not incremental

            self.transition_flight_fetch(ts)  # Add task back to self.data_needed
            self.ensure_communicating()
            if ts.status != "fetch":
                return  # fetching from another worker

            if all(ws2.busy for ws2 in ts.who_has):
                await self.query_who_has(ts)
                self.ensure_communicating()

The new algorithm works as follows:

  1. try fetching the task from a random, non-busy local worker
  2. if busy and there are non-busy local workers, goto 1
  3. if busy and all local workers are busy, try fetching the task from a random, non-busy remote worker
  4. if busy and there are non-busy remote workers, goto 3
  5. if at any time while trying remote workers a local worker becomes non-busy, goto 1
  6. if all local and remote workers are busy, immediately query_who_has the scheduler to discover more workers
  7. if the scheduler responds saying that previously unknown workers hold a replica, goto 1
  8. if all else fails, sleep 150ms and then goto 1

Note

If at least one local worker holds a replica, the old algorithm waits indefinitely for a local worker to become non-busy and completely ignores remote workers, thus avoiding the cost of network transfer but incurring in a delay. The new algorithm instead immediately falls back to remote workers.

@mrocklin
Copy link
Member

Quick historical note, one of the objectives of the busy signal was to avoid swarming a single worker, yes. A possibly larger objective, and on that's not immediately obvious from the implementation, is that we should spread data out using a tree of workers. The who_has check with the scheduler and the random worker selection help to ensure this behavior in aggregate.

I don't see any concern with the new design with regards to this constraint, I just wanted to make sure that it was in your mind as you were working on this.

@crusaderky
Copy link
Collaborator Author

Correction: target_message_size and select_keys_for_gather are not influential, because you can't have more than one active transfer per worker thanks to in_flight_workers. You can, however, have multiple gather_deps sleeping per worker, since the sleep is after the removal from in_flight_workers.

@fjetter
Copy link
Member

fjetter commented Apr 21, 2022

The problem about waiting potentially indefinitely on a local worker was raised with a proposed fix in #5206
It scratches a slightly different itch but also proposes a busy_worker state on Worker

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants