diff --git a/distributed/scheduler.py b/distributed/scheduler.py index cb5318a8231..5e7f4385aa1 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3368,7 +3368,10 @@ async def close(self): setproctitle("dask-scheduler [closing]") for preload in self.preloads: - await preload.teardown() + try: + await preload.teardown() + except Exception as e: + logger.exception(e) await asyncio.gather( *[plugin.close() for plugin in list(self.plugins.values())] diff --git a/distributed/tests/test_preload.py b/distributed/tests/test_preload.py index 0bb73dd0d15..7a4185f7621 100644 --- a/distributed/tests/test_preload.py +++ b/distributed/tests/test_preload.py @@ -302,3 +302,14 @@ def dask_setup(client, value): ): async with Client(address=s.address, asynchronous=True) as c: assert c.foo == value + + +@gen_test() +async def test_teardown_failure_doesnt_crash_scheduler(): + text = """ +def dask_teardown(worker): + raise Exception(123) +""" + async with Scheduler(dashboard_address=":0", preload=text) as s: + async with Worker(s.address, preload=[text]) as w: + pass diff --git a/distributed/worker.py b/distributed/worker.py index 1795bb80c59..c146cb56fff 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -1489,7 +1489,10 @@ async def close( ) for preload in self.preloads: - await preload.teardown() + try: + await preload.teardown() + except Exception as e: + logger.exception(e) for extension in self.extensions.values(): if hasattr(extension, "close"):