Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tell workers when their peers have left (so they don't hang fetching data from them) #6790

Closed
gjoseph92 opened this issue Jul 26, 2022 · 5 comments · Fixed by #7574
Closed
Assignees
Labels
deadlock The cluster appears to not make any progress discussion Discussing a topic with no specific actions yet

Comments

@gjoseph92
Copy link
Collaborator

When a worker breaks its connection to the scheduler, or goes too long without heartbeating, the scheduler removes it and reschedules its tasks to run elsewhere. However, other works may continue trying to fetch data from it far beyond that point. This can lead to something that feels like a deadlock, but isn't quite (if you wait long enough, it will resolve), where workers are waiting for their timeouts to expire requesting data from peers that no longer exist.

This is significantly exacerbated by increasing connection timeouts or retries. For example, if you set distributed.comm.retry.count = 5 (default is 0) and distributed.comm.timeouts.connect = 60s (default is 30s), you might experience a 5-minute deadlock as a worker keeps trying to connect to its dead peer, until it finally gives up and asks the scheduler for a new peer to try. If you set distributed.comm.retry.count = 5 and distributed.comm.timeouts.tcp = 5m (default is 30s), and a peer worker is almost out of memory and freezes up but doesn't close its TCP connection (#6110 #6208 #6177), you could have a 25-minute deadlock.

Rather than relying solely on connection timeouts on the worker side, the scheduler should probably inform workers when one of their peers has left. The scheduler is a good source of truth, because workers already have to heartbeat to it, and maintain a long-lived connection (the batched stream).

But naively broadcasting a remove-worker message to every worker, every time any worker leaves, probably wouldn't scale well to large clusters. Ideally, we would only inform the workers that are fetching data from the departed worker. We could determine this from set(wts.processing_on for ts in removed_ws.has_what for wts in ts.waiters if wts.processing_on) (but something more specific, since not every worker needs to hear the updates about every task).

In that message, we should also include the new who_has for the task, with the dead worker removed. (And ensure this actually cancels the gather_dep on workers, and doesn't deadlock them if the task has already transitioned to cancelled in the meantime.)

cc @crusaderky @fjetter

@gjoseph92 gjoseph92 added discussion Discussing a topic with no specific actions yet deadlock The cluster appears to not make any progress labels Jul 26, 2022
@fjetter
Copy link
Member

fjetter commented Aug 3, 2022

I've talked about this in various other issues and called this a "circuit breaker" pattern.

I think the "remove-worker" message (I would probably use a slightly different name) should then abort or close all open connections to that worker. This would allow all code waiting for a connection to get an OSError similar to what would happen if the timeout is actually hit.

That should be a straight forward implementation and we could use a feature toggle to role this behavior out carefully

@crusaderky
Copy link
Collaborator

crusaderky commented Aug 11, 2022

Ideally, we would only inform the workers that are fetching data from the departed worker.

This is not that simple for pending AMM acquire-replicas requests, as the scheduler today doesn't track them. Although in this case I think it would be ok to just state on the scheduler.TaskState "this worker may have a pending acquire-replicas going on" and send the remove-worker just in case.

@graingert
Copy link
Member

graingert commented Feb 2, 2023

for this we want to be able to cause all checked out comm use for a particular address (the removed worker) to OSError.
ConnectionPool.remove(downed_worker) does this, but we also want connection attempts (tasks in self._connecting) to OSError would it make sense for ConnectionPool.remove to also cancel those?

additionally as the problem here is exacerbated by the retry_operation in get_data_from_worker

return await retry_operation(_get_data, operation="get_data_from_worker")
would it be better to move the retry logic into _handle_gather_dep_network_failure so that instead of assuming the network error is always fatal treat a network error as a busy worker, so that retries are round-robin rather than just hammering the same worker

@fjetter
Copy link
Member

fjetter commented Feb 3, 2023

I think we can safely remove the retry for get_data_from_worker. I believe the worker willhandle this gracefully. It will remove the worker from its internal bookkeeping and, if necessary, asks the scheudler for help.

but we also want connection attempts (tasks in self._connecting) to OSError would it make sense for ConnectionPool.remove to also cancel those?

Yes, that makes sense. I suggest to do this in a dedicated PR. Should be easily isolated from the other changes necessary for this issue.

@fjetter
Copy link
Member

fjetter commented Feb 8, 2023

There are three tasks that can be worked on separately

  • The scheduler tells all workers that a worker left. The worker then adjusts internal state, e.g. who_has dicts
  • The ConnectionPool removes all available comms. This includes cancellation of currently running connection attempts
  • Retries workers.from get_data_from_workers is moved up the stack to utils_comms.gather_from_workers

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
deadlock The cluster appears to not make any progress discussion Discussing a topic with no specific actions yet
Projects
None yet
4 participants