From 94111c5a7577f607eb76af4409ce7754b2eebde5 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Tue, 12 Apr 2022 12:58:52 -0500 Subject: [PATCH] Add back Worker.transition_fetch_missing Fixes https://github.com/dask/distributed/issues/5951 In https://github.com/dask/distributed/pull/5653 we removed the fetch -> missing transition. This caused deadlocks. Now we add it back in. --- distributed/tests/test_worker.py | 21 +++++++++++++++++++++ distributed/worker.py | 13 +++++++++++++ 2 files changed, 34 insertions(+) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index de3bf2423b..489f902b9b 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -3389,3 +3389,24 @@ async def test_tick_interval(c, s, a, b): while s.workers[a.address].metrics["event_loop_interval"] < 0.100: await asyncio.sleep(0.01) time.sleep(0.200) + + +class BreakingWorker(Worker): + broke_once = False + + def get_data(self, comm, **kwargs): + if not self.broke_once: + self.broke_once = True + raise OSError("fake error") + return super().get_data(comm, **kwargs) + + +@pytest.mark.slow +@gen_cluster(client=True, Worker=BreakingWorker) +async def test_broken_comm(c, s, a, b): + df = dask.datasets.timeseries( + start="2000-01-01", + end="2000-01-10", + ) + s = df.shuffle("id", shuffle="tasks") + await c.compute(s.size) diff --git a/distributed/worker.py b/distributed/worker.py index 7c5bc61ca1..71c56f9cbb 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -546,6 +546,7 @@ def __init__( ("executing", "released"): self.transition_executing_released, ("executing", "rescheduled"): self.transition_executing_rescheduled, ("fetch", "flight"): self.transition_fetch_flight, + ("fetch", "missing"): self.transition_fetch_missing, ("fetch", "released"): self.transition_generic_released, ("flight", "error"): self.transition_flight_error, ("flight", "fetch"): self.transition_flight_fetch, @@ -1929,6 +1930,14 @@ def transition_flight_missing( ts.done = False return {}, [] + def transition_fetch_missing( + self, ts: TaskState, *, stimulus_id: str + ) -> RecsInstrs: + ts.state = "missing" + self._missing_dep_flight.add(ts) + ts.done = False + return {}, [] + def transition_released_fetch( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: @@ -2671,6 +2680,10 @@ def ensure_communicating(self) -> None: if ts.state != "fetch": continue + if not ts.who_has: + self.transition(ts, "missing", stimulus_id=stimulus_id) + continue + workers = [w for w in ts.who_has if w not in self.in_flight_workers] if not workers: assert ts.priority is not None