From 7a084a703dbecd211a5dda4bcd7dd29742d3dc92 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Thu, 26 May 2022 16:10:54 +0100 Subject: [PATCH] Assert that a fetch->cancelled->resumed->fetch cycle is impossible --- distributed/worker.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/distributed/worker.py b/distributed/worker.py index 0ad15cd3fb0..929654eead6 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -2331,7 +2331,16 @@ def transition_resumed_fetch( self, ts: TaskState, *, stimulus_id: str ) -> RecsInstrs: """See Worker._transition_from_resumed""" - return self._transition_from_resumed(ts, "fetch", stimulus_id=stimulus_id) + recs, instructions = self._transition_from_resumed( + ts, "fetch", stimulus_id=stimulus_id + ) + if self.validate: + # This would only be possible in a fetch->cancelled->resumed->fetch loop, + # but there are no transitions from fetch which set the state to cancelled. + # If this assertion failed, we' need to call _ensure_communicating like in + # the other transitions that set ts.status = "fetch". + assert ts.state != "fetch" + return recs, instructions def transition_resumed_missing( self, ts: TaskState, *, stimulus_id: str