From 440219fe499f67ce7a22f66128ec2e6bc47f7fa8 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Thu, 26 May 2022 07:30:47 -0500 Subject: [PATCH 1/2] Log rather than raise exceptions in preload.teardown --- distributed/scheduler.py | 5 ++++- distributed/tests/test_preload.py | 11 +++++++++++ distributed/worker.py | 5 ++++- 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index cb5318a8231..5e7f4385aa1 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3368,7 +3368,10 @@ async def close(self): setproctitle("dask-scheduler [closing]") for preload in self.preloads: - await preload.teardown() + try: + await preload.teardown() + except Exception as e: + logger.exception(e) await asyncio.gather( *[plugin.close() for plugin in list(self.plugins.values())] diff --git a/distributed/tests/test_preload.py b/distributed/tests/test_preload.py index 0bb73dd0d15..7a4185f7621 100644 --- a/distributed/tests/test_preload.py +++ b/distributed/tests/test_preload.py @@ -302,3 +302,14 @@ def dask_setup(client, value): ): async with Client(address=s.address, asynchronous=True) as c: assert c.foo == value + + +@gen_test() +async def test_teardown_failure_doesnt_crash_scheduler(): + text = """ +def dask_teardown(worker): + raise Exception(123) +""" + async with Scheduler(dashboard_address=":0", preload=text) as s: + async with Worker(s.address, preload=[text]) as w: + pass diff --git a/distributed/worker.py b/distributed/worker.py index 1795bb80c59..c146cb56fff 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1489,7 +1489,10 @@ async def close( ) for preload in self.preloads: - await preload.teardown() + try: + await preload.teardown() + except Exception as e: + logger.exception(e) for extension in self.extensions.values(): if hasattr(extension, "close"): From 38601226ee7ca43808d72b40831bb92be4d3e0a9 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Thu, 26 May 2022 11:15:25 -0500 Subject: [PATCH 2/2] verify logging output --- distributed/tests/test_preload.py | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/distributed/tests/test_preload.py b/distributed/tests/test_preload.py index 7a4185f7621..04bec1cb951 100644 --- a/distributed/tests/test_preload.py +++ b/distributed/tests/test_preload.py @@ -1,3 +1,4 @@ +import logging import os import re import shutil @@ -281,6 +282,23 @@ def dask_setup(client, value): assert c.foo == value +@gen_test() +async def test_teardown_failure_doesnt_crash_scheduler(): + text = """ +def dask_teardown(worker): + raise Exception(123) +""" + + with captured_logger(logging.getLogger("distributed.scheduler")) as s_logger: + with captured_logger(logging.getLogger("distributed.worker")) as w_logger: + async with Scheduler(dashboard_address=":0", preload=text) as s: + async with Worker(s.address, preload=[text]) as w: + pass + + assert "123" in s_logger.getvalue() + assert "123" in w_logger.getvalue() + + @gen_cluster(nthreads=[]) async def test_client_preload_config_click(s): text = dedent( @@ -302,14 +320,3 @@ def dask_setup(client, value): ): async with Client(address=s.address, asynchronous=True) as c: assert c.foo == value - - -@gen_test() -async def test_teardown_failure_doesnt_crash_scheduler(): - text = """ -def dask_teardown(worker): - raise Exception(123) -""" - async with Scheduler(dashboard_address=":0", preload=text) as s: - async with Worker(s.address, preload=[text]) as w: - pass