Skip to content

Commit

Permalink
Do not close worker on comm error in heartbeat (#7163)
Browse files Browse the repository at this point in the history
Co-authored-by: fjetter <[email protected]>
  • Loading branch information
hendrikmakait and fjetter authored Oct 20, 2022
1 parent 5334a38 commit 6afce9c
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 15 deletions.
8 changes: 4 additions & 4 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1873,10 +1873,10 @@ def bad_heartbeat_worker(*args, **kwargs):
monkeypatch.setattr(w.scheduler, "heartbeat_worker", bad_heartbeat_worker)

await w.heartbeat()
assert w.status == Status.closed
while s.workers:
await asyncio.sleep(0.01)
assert "Heartbeat to scheduler failed" in logger.getvalue()
assert w.status == Status.running
logs = logger.getvalue()
assert "Failed to communicate with scheduler during heartbeat" in logs
assert "Traceback" in logs


@gen_cluster(nthreads=[("", 1)], worker_kwargs={"heartbeat_interval": "100s"})
Expand Down
16 changes: 5 additions & 11 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
from distributed.comm.utils import OFFLOAD_THRESHOLD
from distributed.compatibility import randbytes, to_thread
from distributed.core import (
CommClosedError,
ConnectionPool,
Status,
coerce_to_address,
Expand Down Expand Up @@ -1237,17 +1236,12 @@ async def heartbeat(self) -> None:
)
self.bandwidth_workers.clear()
self.bandwidth_types.clear()
except CommClosedError:
logger.warning("Heartbeat to scheduler failed", exc_info=True)
except OSError:
logger.exception("Failed to communicate with scheduler during heartbeat.")
except Exception as e:
logger.exception("Unexpected exception during heartbeat. Closing worker.")
await self.close()
except OSError as e:
# Scheduler is gone. Respect distributed.comm.timeouts.connect
if "Timed out trying to connect" in str(e):
logger.info("Timed out while trying to connect during heartbeat")
await self.close()
else:
logger.exception(e)
raise e
raise e
finally:
self.heartbeat_active = False

Expand Down

0 comments on commit 6afce9c

Please sign in to comment.