diff --git a/distributed/client.py b/distributed/client.py index 531fa5babe5..46982203530 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -1497,8 +1497,7 @@ async def __aexit__(self, exc_type, exc_value, traceback): await self._close( # if we're handling an exception, we assume that it's more # important to deliver that exception than shutdown gracefully. - fast=exc_type - is not None + fast=(exc_type is not None) ) def __exit__(self, exc_type, exc_value, traceback): @@ -1669,16 +1668,15 @@ async def _wait_for_handle_report_task(self, fast=False): await wait_for(handle_report_task, 0 if fast else 2) @log_errors - async def _close(self, fast=False): - """ - Send close signal and wait until scheduler completes + async def _close(self, fast: bool = False) -> None: + """Send close signal and wait until scheduler completes If fast is True, the client will close forcefully, by cancelling tasks the background _handle_report_task. """ - # TODO: aclose more forcefully by aborting the RPC and cancelling all + # TODO: close more forcefully by aborting the RPC and cancelling all # background tasks. - # see https://trio.readthedocs.io/en/stable/reference-io.html#trio.aclose_forcefully + # See https://trio.readthedocs.io/en/stable/reference-io.html#trio.aclose_forcefully if self.status == "closed": return @@ -1773,18 +1771,7 @@ def close(self, timeout=no_default): coro = wait_for(coro, timeout) return coro - if self._start_arg is None: - with suppress(AttributeError): - f = self.cluster.close() - if asyncio.iscoroutine(f): - - async def _(): - await f - - self.sync(_) - sync(self.loop, self._close, fast=True, callback_timeout=timeout) - assert self.status == "closed" if not is_python_shutting_down(): diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 7b9b803f7a1..320edc391a4 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5182,14 +5182,6 @@ async def remove_worker( ws = self.workers[address] - event_msg = { - "action": "remove-worker", - "processing-tasks": {ts.key for ts in ws.processing}, - } - self.log_event(address, event_msg.copy()) - event_msg["worker"] = address - self.log_event("all", event_msg) - logger.info(f"Remove worker {ws} ({stimulus_id=})") if close: with suppress(AttributeError, CommClosedError): @@ -5220,6 +5212,7 @@ async def remove_worker( recommendations: Recs = {} + processing_keys = {ts.key for ts in ws.processing} for ts in list(ws.processing): k = ts.key recommendations[k] = "released" @@ -5244,21 +5237,49 @@ async def remove_worker( worker=address, ) recommendations.update(r) - logger.info( + logger.error( "Task %s marked as failed because %d workers died" " while trying to run it", ts.key, ts.suspicious, ) + recompute_keys = set() + lost_keys = set() + for ts in list(ws.has_what): self.remove_replica(ts, ws) if not ts.who_has: if ts.run_spec: + recompute_keys.add(ts.key) recommendations[ts.key] = "released" else: # pure data + lost_keys.add(ts.key) recommendations[ts.key] = "forgotten" + if recompute_keys: + logger.warning( + f"Removing worker {ws.address!r} caused the cluster to lose " + "already computed task(s), which will be recomputed elsewhere: " + f"{recompute_keys} ({stimulus_id=})" + ) + if lost_keys: + logger.error( + f"Removing worker {ws.address!r} caused the cluster to lose scattered " + f"data, which can't be recovered: {lost_keys} ({stimulus_id=})" + ) + + event_msg = { + "action": "remove-worker", + "processing-tasks": processing_keys, + "lost-computed-tasks": recompute_keys, + "lost-scattered-tasks": lost_keys, + "stimulus_id": stimulus_id, + } + self.log_event(address, event_msg.copy()) + event_msg["worker"] = address + self.log_event("all", event_msg) + self.transitions(recommendations, stimulus_id=stimulus_id) awaitables = [] @@ -5827,6 +5848,7 @@ def handle_worker_status_change( "action": "worker-status-change", "prev-status": prev_status.name, "status": ws.status.name, + "stimulus_id": stimulus_id, }, ) logger.debug(f"Worker status {prev_status.name} -> {status} - {ws}") @@ -7207,7 +7229,7 @@ async def retire_workers( try: coros = [] for ws in wss: - logger.info("Retiring worker %s", ws.address) + logger.info(f"Retiring worker {ws.address!r} ({stimulus_id=!r})") policy = RetireWorker(ws.address) amm.add_policy(policy) @@ -7244,19 +7266,37 @@ async def retire_workers( # time (depending on interval settings) amm.run_once() - workers_info = { - addr: info - for addr, info in await asyncio.gather(*coros) - if addr is not None - } + workers_info_ok = {} + workers_info_abort = {} + for addr, result, info in await asyncio.gather(*coros): + if result == "OK": + workers_info_ok[addr] = info + else: + workers_info_abort[addr] = info + finally: if stop_amm: amm.stop() - self.log_event("all", {"action": "retire-workers", "workers": workers_info}) - self.log_event(list(workers_info), {"action": "retired"}) + self.log_event( + "all", + { + "action": "retire-workers", + "retired": workers_info_ok, + "could-not-retire": workers_info_abort, + "stimulus_id": stimulus_id, + }, + ) + self.log_event( + list(workers_info_ok), + {"action": "retired", "stimulus_id": stimulus_id}, + ) + self.log_event( + list(workers_info_abort), + {"action": "could-not-retire", "stimulus_id": stimulus_id}, + ) - return workers_info + return workers_info_ok async def _track_retire_worker( self, @@ -7266,7 +7306,7 @@ async def _track_retire_worker( close: bool, remove: bool, stimulus_id: str, - ) -> tuple[str | None, dict]: + ) -> tuple[str, Literal["OK", "no-recipients"], dict]: while not policy.done(): # Sleep 0.01s when there are 4 tasks or less # Sleep 0.5s when there are 200 or more @@ -7284,10 +7324,14 @@ async def _track_retire_worker( "stimulus_id": stimulus_id, } ) - return None, {} + logger.warning( + f"Could not retire worker {ws.address!r}: unique data could not be " + f"moved to any other worker ({stimulus_id=!r})" + ) + return ws.address, "no-recipients", ws.identity() logger.debug( - "All unique keys on worker %s have been replicated elsewhere", ws.address + f"All unique keys on worker {ws.address!r} have been replicated elsewhere" ) if remove: @@ -7297,8 +7341,8 @@ async def _track_retire_worker( elif close: self.close_worker(ws.address) - logger.info("Retired worker %s", ws.address) - return ws.address, ws.identity() + logger.info(f"Retired worker {ws.address!r} ({stimulus_id=!r})") + return ws.address, "OK", ws.identity() def add_keys( self, worker: str, keys: Collection[Key] = (), stimulus_id: str | None = None @@ -7434,7 +7478,7 @@ async def feed( def log_worker_event( self, worker: str, topic: str | Collection[str], msg: Any ) -> None: - if isinstance(msg, dict): + if isinstance(msg, dict) and worker != topic: msg["worker"] = worker self.log_event(topic, msg) diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index c00c2bbf2cd..29d7d8ce7e8 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -5196,7 +5196,10 @@ def test_quiet_client_close(loop): threads_per_worker=4, ) as c: futures = c.map(slowinc, range(1000), delay=0.01) - sleep(0.200) # stop part-way + # Stop part-way + s = c.cluster.scheduler + while sum(ts.state == "memory" for ts in s.tasks.values()) < 20: + sleep(0.01) sleep(0.1) # let things settle out = logger.getvalue() diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index c581ea14c46..636efcd9e48 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -3058,7 +3058,7 @@ async def connect(self, *args, **kwargs): @gen_cluster(client=True) -async def test_gather_failing_cnn_recover(c, s, a, b): +async def test_gather_failing_can_recover(c, s, a, b): x = await c.scatter({"x": 1}, workers=a.address) rpc = await FlakyConnectionPool(failing_connections=1) with mock.patch.object(s, "rpc", rpc), dask.config.set( diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index cfe6941bb8e..b926606a5c1 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -2938,32 +2938,178 @@ async def test_worker_status_sync(s, a): await asyncio.sleep(0.01) events = [ev for _, ev in s.events[ws.address] if ev["action"] != "heartbeat"] + for ev in events: + if "stimulus_id" in ev: # Strip timestamp + ev["stimulus_id"] = ev["stimulus_id"].rsplit("-", 1)[0] + assert events == [ {"action": "add-worker"}, { "action": "worker-status-change", "prev-status": "init", "status": "running", + "stimulus_id": "worker-status-change", }, { "action": "worker-status-change", "prev-status": "running", "status": "paused", + "stimulus_id": "worker-status-change", }, { "action": "worker-status-change", "prev-status": "paused", "status": "running", + "stimulus_id": "worker-status-change", }, { "action": "worker-status-change", "prev-status": "running", "status": "closing_gracefully", + "stimulus_id": "retire-workers", + }, + { + "action": "remove-worker", + "lost-computed-tasks": set(), + "lost-scattered-tasks": set(), + "processing-tasks": set(), + "stimulus_id": "retire-workers", }, - {"action": "remove-worker", "processing-tasks": set()}, - {"action": "retired"}, + {"action": "retired", "stimulus_id": "retire-workers"}, + ] + + +@gen_cluster(client=True) +async def test_log_remove_worker(c, s, a, b): + # Computed task + x = c.submit(inc, 1, key="x", workers=a.address) + await x + ev = Event() + # Processing task + y = c.submit( + lambda ev: ev.wait(), ev, key="y", workers=a.address, allow_other_workers=True + ) + await wait_for_state("y", "processing", s) + # Scattered task + z = await c.scatter({"z": 3}, workers=a.address) + + s.events.clear() + + with captured_logger("distributed.scheduler", level=logging.INFO) as log: + # Successful graceful shutdown + await s.retire_workers([a.address], stimulus_id="graceful") + # Refuse to retire gracefully as there's nowhere to put x and z + await s.retire_workers([b.address], stimulus_id="graceful_abort") + await asyncio.sleep(0.2) + # Ungraceful shutdown + await s.remove_worker(b.address, stimulus_id="ungraceful") + await asyncio.sleep(0.2) + await ev.set() + + assert log.getvalue().splitlines() == [ + # Successful graceful + f"Retire worker addresses ['{a.address}']", + f"Retiring worker '{a.address}' (stimulus_id='graceful')", + f"Remove worker (stimulus_id='graceful')", + f"Retired worker '{a.address}' (stimulus_id='graceful')", + # Aborted graceful + f"Retire worker addresses ['{b.address}']", + f"Retiring worker '{b.address}' (stimulus_id='graceful_abort')", + f"Could not retire worker '{b.address}': unique data could not be " + "moved to any other worker (stimulus_id='graceful_abort')", + # Ungraceful + f"Remove worker (stimulus_id='ungraceful')", + f"Removing worker '{b.address}' caused the cluster to lose already " + "computed task(s), which will be recomputed elsewhere: {'x'} " + "(stimulus_id='ungraceful')", + f"Removing worker '{b.address}' caused the cluster to lose scattered " + "data, which can't be recovered: {'z'} (stimulus_id='ungraceful')", + "Lost all workers", ] + events = {topic: [ev for _, ev in evs] for topic, evs in s.events.items()} + for evs in events.values(): + for ev in evs: + if ev["action"] == "retire-workers": + for k in ("retired", "could-not-retire"): + ev[k] = {addr: "snip" for addr in ev[k]} + if "stimulus_id" in ev: # Strip timestamp + ev["stimulus_id"] = ev["stimulus_id"].rsplit("-", 1)[0] + + assert events == { + a.address: [ + { + "action": "worker-status-change", + "prev-status": "running", + "status": "closing_gracefully", + "stimulus_id": "graceful", + }, + { + "action": "remove-worker", + "lost-computed-tasks": set(), + "lost-scattered-tasks": set(), + "processing-tasks": {"y"}, + "stimulus_id": "graceful", + }, + {"action": "retired", "stimulus_id": "graceful"}, + ], + b.address: [ + { + "action": "worker-status-change", + "prev-status": "running", + "status": "closing_gracefully", + "stimulus_id": "graceful_abort", + }, + {"action": "could-not-retire", "stimulus_id": "graceful_abort"}, + { + "action": "worker-status-change", + "prev-status": "closing_gracefully", + "status": "running", + "stimulus_id": "worker-status-change", + }, + { + "action": "remove-worker", + "lost-computed-tasks": {"x"}, + "lost-scattered-tasks": {"z"}, + "processing-tasks": {"y"}, + "stimulus_id": "ungraceful", + }, + {"action": "closing-worker", "reason": "scheduler-remove-worker"}, + ], + "all": [ + { + "action": "remove-worker", + "lost-computed-tasks": set(), + "lost-scattered-tasks": set(), + "processing-tasks": {"y"}, + "stimulus_id": "graceful", + "worker": a.address, + }, + { + "action": "retire-workers", + "stimulus_id": "graceful", + "retired": {a.address: "snip"}, + "could-not-retire": {}, + }, + { + "action": "retire-workers", + "stimulus_id": "graceful_abort", + "retired": {}, + "could-not-retire": {b.address: "snip"}, + }, + { + "action": "remove-worker", + "lost-computed-tasks": {"x"}, + "lost-scattered-tasks": {"z"}, + "processing-tasks": {"y"}, + "stimulus_id": "ungraceful", + "worker": b.address, + }, + ], + } + @gen_cluster(client=True) async def test_task_flight_compute_oserror(c, s, a, b):