From a1ce116836c2e31d4c6668c779838ea72204983c Mon Sep 17 00:00:00 2001 From: Hendrik Makait Date: Thu, 20 Oct 2022 14:57:38 +0200 Subject: [PATCH] Do not close worker on comm error in heartbeat (#7163) Co-authored-by: fjetter --- distributed/tests/test_worker.py | 8 ++++---- distributed/worker.py | 16 +++++----------- 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 9b6314a3d74..25e11b9a9ca 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1873,10 +1873,10 @@ def bad_heartbeat_worker(*args, **kwargs): monkeypatch.setattr(w.scheduler, "heartbeat_worker", bad_heartbeat_worker) await w.heartbeat() - assert w.status == Status.closed - while s.workers: - await asyncio.sleep(0.01) - assert "Heartbeat to scheduler failed" in logger.getvalue() + assert w.status == Status.running + logs = logger.getvalue() + assert "Failed to communicate with scheduler during heartbeat" in logs + assert "Traceback" in logs @gen_cluster(nthreads=[("", 1)], worker_kwargs={"heartbeat_interval": "100s"}) diff --git a/distributed/worker.py b/distributed/worker.py index 258447e1091..5ba27f95cf5 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -56,7 +56,6 @@ from distributed.comm.utils import OFFLOAD_THRESHOLD from distributed.compatibility import randbytes, to_thread from distributed.core import ( - CommClosedError, ConnectionPool, Status, coerce_to_address, @@ -1237,17 +1236,12 @@ async def heartbeat(self) -> None: ) self.bandwidth_workers.clear() self.bandwidth_types.clear() - except CommClosedError: - logger.warning("Heartbeat to scheduler failed", exc_info=True) + except OSError: + logger.exception("Failed to communicate with scheduler during heartbeat.") + except Exception as e: + logger.exception("Unexpected exception during heartbeat. Closing worker.") await self.close() - except OSError as e: - # Scheduler is gone. Respect distributed.comm.timeouts.connect - if "Timed out trying to connect" in str(e): - logger.info("Timed out while trying to connect during heartbeat") - await self.close() - else: - logger.exception(e) - raise e + raise e finally: self.heartbeat_active = False