Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadlock stealing a resumed task #6159

Closed
gjoseph92 opened this issue Apr 19, 2022 · 12 comments
Closed

Deadlock stealing a resumed task #6159

gjoseph92 opened this issue Apr 19, 2022 · 12 comments
Labels
bug Something is broken deadlock The cluster appears to not make any progress stability Issue or feature related to cluster stability (e.g. deadlock)

Comments

@gjoseph92
Copy link
Collaborator

  • Worker starts fetching key abcd from a peer
  • The fetch gets cancelled
  • Scheduler now asks the worker to compute the key abcd
  • Deadlock

Here's the (annotated) worker story for the key in question:

# Initially we hear about the key as a dependency to fetch
- - ('split-shuffle-1-9698a72b7e1aad6e40a6567d3c06d355', 5, (7, 1))
  - ensure-task-exists
  - released
  - compute-task-1650391863.1634912
  - 2022-04-19 11:11:03.369210
- - ('split-shuffle-1-9698a72b7e1aad6e40a6567d3c06d355', 5, (7, 1))
  - released
  - fetch
  - fetch
  - {}
  - compute-task-1650391863.1634912
  - 2022-04-19 11:11:03.369474
- - gather-dependencies
  - tls://10.0.13.152:38353
  - - ('split-shuffle-1-9698a72b7e1aad6e40a6567d3c06d355', 5, (7, 8))
    - ('split-shuffle-1-9698a72b7e1aad6e40a6567d3c06d355', 5, (7, 1))
  - ensure-communicating-1650391867.838159
  - 2022-04-19 11:11:07.838874
- - ('split-shuffle-1-9698a72b7e1aad6e40a6567d3c06d355', 5, (7, 1))
  - fetch
  - flight
  - flight
  - {}
  - ensure-communicating-1650391867.838159
  - 2022-04-19 11:11:07.838897
# We go try to fetch it. I think we're talking to a worker that's still alive,
# but so locked up under memory pressure it never responds—see https://github.com/dask/distributed/issues/6110
- - request-dep
  - tls://10.0.13.152:38353
  - - ('split-shuffle-1-9698a72b7e1aad6e40a6567d3c06d355', 5, (7, 8))
    - ('split-shuffle-1-9698a72b7e1aad6e40a6567d3c06d355', 5, (7, 1))
  - ensure-communicating-1650391867.838159
  - 2022-04-19 11:11:07.848737
# This is really weird. After the stuck worker's TTL expires (https://github.com/dask/distributed/issues/6110#issuecomment-1102959742),
# the scheduler removes it. But the log we're looking at _isn't from the stuck worker_.
# So why is `Scheduler.transition_processing_released` sending a `free-keys` to this worker?
# That would indicate the scheduler's `processing_on` pointed to this worker.
- - ('split-shuffle-1-9698a72b7e1aad6e40a6567d3c06d355', 5, (7, 1))
  - flight
  - released
  - cancelled
  - {}
  - processing-released-1650392050.5300016
  - 2022-04-19 11:14:10.571274
# Regardless, the scheduler then asks us to compute (not fetch!) this task.
- - ('split-shuffle-1-9698a72b7e1aad6e40a6567d3c06d355', 5, (7, 1))
  - compute-task
  - compute-task-1650392058.1483586
  - 2022-04-19 11:14:18.165346
# We already know about it---the fetch was just cancelled by `processing-released`---so it stays in cancelled,
# with a recommendation to `(resumed, waiting)`
- - ('split-shuffle-1-9698a72b7e1aad6e40a6567d3c06d355', 5, (7, 1))
  - cancelled
  - waiting
  - cancelled
  - ('split-shuffle-1-9698a72b7e1aad6e40a6567d3c06d355', 5, (7, 1)):
    - resumed
    - waiting
  - compute-task-1650392058.1483586
  - 2022-04-19 11:14:18.165884
# It goes into resumed and nothing ever happens again
- - ('split-shuffle-1-9698a72b7e1aad6e40a6567d3c06d355', 5, (7, 1))
  - cancelled
  - resumed
  - resumed
  - {}
  - compute-task-1650392058.1483586
  - 2022-04-19 11:14:18.165897

There's definitely something weird about the processing-released message arriving right before the compute-task message. I can't find an obvious reason in scheduler code why that would happen.

But let's ignore that oddity for a second. Pretend it was just a normal work-stealing request that caused the task to be cancelled.

I find it odd that if a worker is told to compute a task it was previously fetching, that it'll resume the fetch:

else:
assert ts._previous == "flight"
return {ts: ("resumed", "waiting")}, []

If previously we were fetching a key, but now we're being asked to compute it, it seems almost certain that the fetch is going to fail. The compute request should probably take precedence.

I imagine here that we're assuming the gather_dep will error out sometime in the future, and when it does, then the key will go from resumed to waiting?

Also, this is coming from the #6110 scenario. That's an unusual one in that the TCP connection to the stuck worker doesn't get broken, it's just unresponsive. So I'm also wondering if perhaps gather_dep to the stuck worker will hang forever? for 300s (seems to go much longer than that)? for 300s * some retries? Basically, could it be that this isn't quite a deadlock, but a very, very, very long wait for a dependency fetch that might never return until the other worker properly dies? If we don't have any explicit timeouts on gather_dep already, maybe we should.
(All that said, I still think the proper fix would be to not have transition_cancelled_waiting try to resume the fetch, but instead go down the compute path. The timeout might be something in addition.)

cc @fjetter

@gjoseph92 gjoseph92 added bug Something is broken stability Issue or feature related to cluster stability (e.g. deadlock) labels Apr 19, 2022
@fjetter
Copy link
Member

fjetter commented Apr 20, 2022

I find it odd that if a worker is told to compute a task it was previously fetching, that it'll resume the fetch:

The code you are linking to is looking just fine. This code path should be hit if the following happens

  • A task was either in state flight, i.e. a gather_dep callback was scheduled, but the worker received a free-keys signal / release
  • Instead of releasing right away, we put the key into a cancelled intermediate state and wait for the fetch to finish
  • While we try to fetch the key, the worker is being told to compute the key instead. Since we don't have a good handle to cancel this gather_dep, we instead choose to wait for it to complete. To signal this, we put the task in another intermediate state resumed and tell it that once it is done, it should transition to waiting
  • What should happen then is that the task finishes. If the fetch is successful, we'll transition it to memory and forget about the compute. If it was unsuccessful, we'll transition it to waiting (see ts._next; i.e. compute once all dependencies are in memory)

So I'm also wondering if perhaps gather_dep to the stuck worker will hang forever? for 300s (seems to go much longer than that)? for 300s * some retries?

Is there a comm retry configured? If so, yes, it will retry X times for 300s each if the remote is dead

return await retry_operation(_get_data, operation="get_data_from_worker")

@fjetter
Copy link
Member

fjetter commented Apr 20, 2022

This behaviour should be captured by test_flight_to_executing_via_cancelled_resumed

xref #6112 which introduced a subtle change to the behaviour connected to this edge case although I think the change is OK. Most importantly the change this PR introduces happens in the finally clause and/or exception handler of gather_dep but if you are right about the story being done, this code is never reached

@fjetter
Copy link
Member

fjetter commented Apr 20, 2022

This looks like the comm is never broken, i.e. the worker never detects that the remote is dead. Are there some kind of network proxies involved that would keep the TCP connection alive?

@fjetter
Copy link
Member

fjetter commented Apr 20, 2022

Are you using the default tornado TCP backend? I've seen such a condition already with a user who implemented their own comm backend which did not properly close a comm if the peer died.

@gjoseph92
Copy link
Collaborator Author

Since we don't have a good handle to cancel this gather_dep, we instead choose to wait for it to complete

I think that's the core design problem, and what I meant by "I find it odd". Yes, the code is working as intended, because it's the best we could do with the gather_dep system. It's just annoying that since we don't have a handle on the gather, our only choice is to wait for it to finish. Even if we just had a dict[str, asyncio.Task] mapping keys to gather calls (though more complex than that since multiple keys come from a single gather), being able to call cancel() on the asyncio task would make this much easier.

Are there some kind of network proxies involved that would keep the TCP connection alive?
Are you using the default tornado TCP backend?
Is there a comm retry configured?

This is just your "shuffle in a while loop on Coiled" example, so no special networking, retries, or non-tornado TCP backend. All I did to cause it was take that example and set DASK_DISTRIBUTED__SCHEDULER__WORKER_TTL="120s".

This looks like the comm is never broken, i.e. the worker never detects that the remote is dead.

This is exactly what we're seeing on the scheduler side (see #6110 (comment) and #6148), so it seems reasonable to assume the same thing is happening in worker<->worker comms.

On the scheduler, the comm isn't even timing out after 300s (and the deadlock doesn't resolve itself after 300s), so I'm not sure that that 300s limit (where does it even come from? are you thinking of distributed.comms.timeouts, which are both 30s by default?) is even coming into play. The TCP connection manages to stay alive and healthy, just nothing gets sent over it.

So that's what seems to be causing the deadlock. If gather_dep would error or complete, things would resume. But because it's possible for a network connection to get stuck open like this, and not possible to cancel the gather_dep, we end up deadlocked. We could try to fix the network side, but not having a handle on gather_deps gives us pain in many other scenarios like this too. Hopefully we'll be able to improve this design with #5736.

@fjetter
Copy link
Member

fjetter commented Apr 21, 2022

I think that's the core design problem, and what I meant by "I find it odd". Yes, the code is working as intended, because it's the best we could do with the gather_dep system. It's just annoying that since we don't have a handle on the gather, our only choice is to wait for it to finish. Even if we just had a dict[str, asyncio.Task] mapping keys to gather calls (though more complex than that since multiple keys come from a single gather), being able to call cancel() on the asyncio task would make this much easier.

Agreed. I thought about this for a while and considered introducing a way to cancel requests a bunch of times but it is not trivial. At the same time, we have a similar problem with execute and I chose to go for the "boring" approach of just waiting it out. I'm happy to revisit this at one point in time.

This is exactly what we're seeing on the scheduler side (see #6110 (comment) and #6148), so it seems reasonable to assume the same thing is happening in worker<->worker comms.

My knowledge on low level linux is very limited but to my best knowledge, the TCP connection would never be broken up if the kernel is still alive and kicking but the python process is frozen for whatever reason (let it be GIL, an actual process suspension or any other linux wizardry) because the kernel itself will handle TCP keepalive probes (see below)

On the scheduler, the comm isn't even timing out after 300s (and the deadlock doesn't resolve itself after 300s), so I'm not sure that that 300s limit (where does it even come from? are you thinking of distributed.comms.timeouts, which are both 30s by default?) is even coming into play. The TCP connection manages to stay alive and healthy, just nothing gets sent over it.

I don't know where the 300s is coming from. You mentioned this in the OP. Note, that most of the timeouts we configure is on application layer, not on network/tcp layer with the exception of what's configured in set_tcp_timeout. The most important configuration there is TCP keepalive to check for dead peers. See also linux docs, specifically TCP_USER_TIMEOUT, TCP_KEEPCNT, TCP_KEEPIDLE and TCP_KEEPINTVL.
This behaviour is controlled via comm.timeouts.tcp.
TLDR the kernel emits keep-alive probes at regular intervals. if a remote appears to be dead and doesn't respond to keep alive probes for distributed.comm.timeouts.tcps the socket is closed. That's how we currently detect dead remotes.

@fjetter
Copy link
Member

fjetter commented Apr 21, 2022

FYI #6169 but I don't think it's the same problem since for the above condition to trigger, we'd need to see a busy-gather log.

@fjetter
Copy link
Member

fjetter commented Apr 21, 2022

I also floated the idea of a circuit breaker pattern once where the scheduler would be the one entity responsible for detecting dead remotes. If a worker would be flagged as dead, a signal would be broadcasted to all workers notifying them about the dead peer and they'd be instructed to abort all comms to that address. For instance, coupled with a mandatory worker-ttl, this would remove our dependency on TCP keep alives. There are obviously pros and cons

@gjoseph92 gjoseph92 added the deadlock The cluster appears to not make any progress label Apr 21, 2022
@gjoseph92
Copy link
Collaborator Author

At the same time, we have a similar problem with execute and I chose to go for the "boring" approach of just waiting it out.

execute is a bit different, because we can't actually cancel it (currently, see #4694). So we don't have any choice but to wait it out.

With gather_dep, we should be able to cancel it quite nicely thanks to asyncio (if we actually use async well #6047).

I thought about this for a while and considered introducing a way to cancel requests a bunch of times but it is not trivial.

Maybe we're thinking about different things. But what if instead of an interface like request_gather(key: str), release_gather_request(key: str), where we have to cooperate between multiple tasks that depend on fetching the same key, and could almost end up in a use-after-free-style situation, we instead separate the single physical network request from each task's need for the key, using a reference-counting-style approach:

def request_gather(key: str, who_has: list[str]) -> GatherRequest:
    """
    Request that a key be fetched from peer workers.

    Multiple requests for the same key will return different `GatherRequest` objects,
    though each will be backed by the same underlying request.
    When the underlying request completes, all `GatherRequest`s will complete.
    If the underlying request is cancelled (because no `GatherRequest`s need it anymore,
    or it is explicitly cancelled), all `GatherRequest`s will be cancelled.
    """

class GatherRequest:
    key: str
    def release(self) -> None:
        """
        Note that this gather request is no longer needed. Idempotent.
        If this is the last `GatherRequest` needing the key, the underlying
        request is cancelled.
        """

    def cancel(self) -> None:
        """
        Explicitly cancel the underlying request for all `GatherRequest`s needing this key. Idempotent.
        Use when we know the key should not be fetched anymore (the task has instead been stolen
        to this worker to compute, for example).
        """

    def __del__(self):
        self.release()

    def __await__(self) -> Awaitable[None]
        "Wait until `key` is available in `worker.data`. If underlying request is cancelled, raises CancelledError."

This might be easier to reason about and test.

@gjoseph92
Copy link
Collaborator Author

I also floated the idea of a circuit breaker pattern once where the scheduler would be the one entity responsible for detecting dead remotes. If a worker would be flagged as dead, a signal would be broadcasted to all workers notifying them about the dead peer and they'd be instructed to abort all comms to that address.

I think this would be a good idea. It's otherwise hard for a worker to know if the peer it's talking to is unresponsive. Setting a timeout on the gather_dep is not ideal. Getting low-enough level in the networking stack to say something like "if no new bytes have arrived over this socket in N seconds, the comm is dead" seems a bit complicated. The scheduler is already heartbeating with all workers, so it has a clear signal for this.

@fjetter
Copy link
Member

fjetter commented Jun 3, 2022

This issue has become quite stale and the code changed drastically since the report. I'm inclined to close this issue, @gjoseph92 objections?

@gjoseph92
Copy link
Collaborator Author

Since there's no reproducer, sure.

@gjoseph92 gjoseph92 closed this as not planned Won't fix, can't repro, duplicate, stale Jun 3, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something is broken deadlock The cluster appears to not make any progress stability Issue or feature related to cluster stability (e.g. deadlock)
Projects
None yet
Development

No branches or pull requests

2 participants