diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 3a427ddb0a..4e0cddd9f3 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -3047,12 +3047,10 @@ async def test_task_flight_compute_oserror(c, s, a, b): ), # inc is lost and needs to be recomputed. Therefore, sum is released ("free-keys", ("f1",)), - ("f1", "purge-state"), # The recommendations here are hard to predict. Whatever key is # currently scheduled to be fetched, if any, will be recommended to be # released. ("f1", "waiting", "released", "released", lambda msg: msg["f1"] == "forgotten"), - ("f1", "purge-state"), ("f1", "released", "forgotten", "forgotten", {}), # Now, we actually compute the task *once*. This must not cycle back ("f1", "compute-task", "released"), diff --git a/distributed/tests/test_worker_state_machine.py b/distributed/tests/test_worker_state_machine.py index c9cf0eb155..fdc7f01237 100644 --- a/distributed/tests/test_worker_state_machine.py +++ b/distributed/tests/test_worker_state_machine.py @@ -505,6 +505,7 @@ async def test_missing_handle_compute_dependency(c, s, w1, w2, w3): f1 = c.submit(inc, 1, key="f1", workers=[w1.address]) f2 = c.submit(inc, 2, key="f2", workers=[w1.address]) + await wait_for_state(f1.key, "memory", w1) w3.handle_acquire_replicas( keys=[f1.key], who_has={f1.key: [w2.address]}, stimulus_id="acquire" @@ -515,3 +516,22 @@ async def test_missing_handle_compute_dependency(c, s, w1, w2, w3): f3 = c.submit(sum, [f1, f2], key="f3", workers=[w3.address]) await f3 + + +@gen_cluster(client=True, nthreads=[("", 1)] * 3) +async def test_missing_to_waiting(c, s, w1, w2, w3): + w3.periodic_callbacks["find-missing"].stop() + + f1 = c.submit(inc, 1, key="f1", workers=[w1.address], allow_other_workers=True) + await wait_for_state(f1.key, "memory", w1) + + w3.handle_acquire_replicas( + keys=[f1.key], who_has={f1.key: [w2.address]}, stimulus_id="acquire" + ) + + await wait_for_state(f1.key, "missing", w3) + + await w2.close() + await w1.close() + + await f1 diff --git a/distributed/worker.py b/distributed/worker.py index 38998fcf8e..adb4a3cf82 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -2058,12 +2058,21 @@ def transition_generic_fetch(self, ts: TaskState, stimulus_id: str) -> RecsInstr # _select_keys_for_gather(). return {}, [EnsureCommunicatingAfterTransitions(stimulus_id=stimulus_id)] + def transition_missing_waiting( + self, ts: TaskState, *, stimulus_id: str + ) -> RecsInstrs: + self._missing_dep_flight.discard(ts) + self._purge_state(ts) + return self.transition_released_waiting(ts, stimulus_id=stimulus_id) + def transition_missing_fetch( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: if self.validate: assert ts.state == "missing" - assert ts.who_has + + if not ts.who_has: + return {}, [] self._missing_dep_flight.discard(ts) return self.transition_generic_fetch(ts, stimulus_id=stimulus_id) @@ -2105,7 +2114,7 @@ def transition_released_fetch( def transition_generic_released( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: - self._purge_state(ts.key, stimulus_id=stimulus_id) + self._purge_state(ts) recs: Recs = {} for dependency in ts.dependencies: if ( @@ -2127,7 +2136,6 @@ def transition_released_waiting( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: if self.validate: - assert ts.state == "released" assert all(d.key in self.tasks for d in ts.dependencies) recommendations: Recs = {} @@ -2136,10 +2144,7 @@ def transition_released_waiting( if dep_ts.state != "memory": ts.waiting_for_data.add(dep_ts) dep_ts.waiters.add(ts) - # TODO: main branch claims this shouldn't be conditional since a - # recent change - if dep_ts.state not in {"fetch", "flight", "missing"}: - recommendations[dep_ts] = "fetch" + recommendations[dep_ts] = "fetch" if ts.waiting_for_data: self.waiting_for_data_count += 1 @@ -2447,7 +2452,6 @@ def transition_cancelled_released( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: if not ts.done: - ts._next = "released" return {}, [] self._executing.discard(ts) self._in_flight_tasks.discard(ts) @@ -2656,7 +2660,7 @@ def transition_released_forgotten( dep.dependents.discard(ts) if dep.state == "released" and not dep.dependents: recommendations[dep] = "forgotten" - self._purge_state(ts.key, stimulus_id=stimulus_id) + self._purge_state(ts) # Mark state as forgotten in case it is still referenced ts.state = "forgotten" self.tasks.pop(ts.key, None) @@ -2708,6 +2712,7 @@ def transition_released_forgotten( ("missing", "fetch"): transition_missing_fetch, ("missing", "released"): transition_missing_released, ("missing", "error"): transition_generic_error, + ("missing", "waiting"): transition_missing_waiting, ("ready", "error"): transition_generic_error, ("ready", "executing"): transition_ready_executing, ("ready", "released"): transition_generic_released, @@ -2793,7 +2798,9 @@ def _transition( (recs, instructions), self._transition(ts, finish, *args, stimulus_id=stimulus_id), ) - except InvalidTransition: + # ValueError may be raised by merge_recs_instructions + # TODO: should merge_recs raise InvalidTransition? + except (ValueError, InvalidTransition): self.log_event( "invalid-worker-transition", { @@ -3528,25 +3535,14 @@ def handle_worker_status_change(self, status: str, stimulus_id: str) -> None: # Update status and send confirmation to the Scheduler (see status.setter) self.status = new_status - def _purge_state( - self, - key: str, - *, - stimulus_id: str, - ) -> None: + def _purge_state(self, ts: TaskState) -> None: """Ensure that TaskState attributes are reset to a neutral default and Worker-level state associated to the provided key is cleared (e.g. who_has) This is idempotent """ - ts = self.tasks[key] - logger.debug( - "Purge task key: %s state: %s; stimulus_id=%s", - ts.key, - ts.state, - stimulus_id, - ) - self.log.append((key, "purge-state", stimulus_id, time())) + key = ts.key + logger.debug("Purge task key: %s state: %s; stimulus_id=%s", ts.key, ts.state) self.data.pop(key, None) self.actors.pop(key, None) @@ -3976,7 +3972,8 @@ def _(self, ev: FindMissingEvent) -> RecsInstrs: return {}, [] if self.validate: - assert not any(ts.who_has for ts in self._missing_dep_flight) + for ts in self._missing_dep_flight: + assert not ts.who_has, self.story(ts) smsg = RequestRefreshWhoHasMsg( keys=[ts.key for ts in self._missing_dep_flight], @@ -4193,6 +4190,9 @@ def validate_task_executing(self, ts): assert ts.run_spec is not None assert ts.key not in self.data assert not ts.waiting_for_data + for dep in ts.dependencies: + assert dep.state == "memory", self.story(dep) + assert dep.key in self.data or dep.key in self.actors def validate_task_ready(self, ts): assert ts.key in pluck(1, self.ready) @@ -4240,7 +4240,8 @@ def validate_task_missing(self, ts): def validate_task_cancelled(self, ts): assert ts.key not in self.data assert ts._previous in {"long-running", "executing", "flight"} - assert ts._next is None # We'll always transition to released after it is done + # We'll always transition to released after it is done + assert ts._next is None, (ts.key, ts._next, self.story(ts)) def validate_task_resumed(self, ts): assert ts.key not in self.data @@ -5173,82 +5174,3 @@ async def benchmark_network( out[size_str] = total / (time() - start) return out - - -[ - ( - "f1", - "ensure-task-exists", - "released", - "compute-task-1654160270.4501", - 1654160270.450457, - ), - ( - "f1", - "released", - "fetch", - "fetch", - {}, - "compute-task-1654160270.4501", - 1654160270.450478, - ), - ( - "gather-dependencies", - "tcp://127.0.0.1:54108", - {"f1"}, - "compute-task-1654160270.4501", - 1654160270.45051, - ), - ( - "f1", - "fetch", - "flight", - "flight", - {}, - "compute-task-1654160270.4501", - 1654160270.4505181, - ), - ( - "request-dep", - "tcp://127.0.0.1:54108", - {"f1"}, - "compute-task-1654160270.4501", - 1654160270.450558, - ), - ( - "busy-gather", - "tcp://127.0.0.1:54108", - {"f1"}, - "compute-task-1654160270.4501", - 1654160270.4519591, - ), - ( - "f1", - "flight", - "fetch", - "fetch", - {}, - "compute-task-1654160270.4501", - 1654160270.4519749, - ), - ("f1", "purge-state", "worker-connect-1654160270.3552", 1654160270.4605172), - ( - "f1", - "fetch", - "released", - "released", - {"f1": "forgotten"}, - "worker-connect-1654160270.3552", - 1654160270.460523, - ), - ("f1", "purge-state", "worker-connect-1654160270.3552", 1654160270.460525), - ( - "f1", - "released", - "forgotten", - "forgotten", - {}, - "worker-connect-1654160270.3552", - 1654160270.4605281, - ), -]