From e3a201827a06e76e322422a621c436fbbffb7f16 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Wed, 25 May 2022 20:26:15 -0500 Subject: [PATCH 1/4] Handle failing plugin.close calls during scheduler shutdown --- .../tests/test_scheduler_plugin.py | 26 +++++++++++++++++++ distributed/scheduler.py | 18 ++++++++----- 2 files changed, 38 insertions(+), 6 deletions(-) diff --git a/distributed/diagnostics/tests/test_scheduler_plugin.py b/distributed/diagnostics/tests/test_scheduler_plugin.py index 880c5f65881..57f51c98b58 100644 --- a/distributed/diagnostics/tests/test_scheduler_plugin.py +++ b/distributed/diagnostics/tests/test_scheduler_plugin.py @@ -209,3 +209,29 @@ async def start(self, scheduler: Scheduler) -> None: await s.register_scheduler_plugin(MyPlugin()) assert s._foo == "bar" + + +@gen_cluster(client=True) +async def test_closing_errors_ok(c, s, a, b, capsys): + class OK(SchedulerPlugin): + async def before_close(self): + print(123) + + async def close(self): + print(456) + + class Bad(SchedulerPlugin): + async def before_close(self): + raise Exception() + + async def close(self): + raise Exception() + + await s.register_scheduler_plugin(OK()) + await s.register_scheduler_plugin(Bad()) + + await s.close() + + out, err = capsys.readouterr() + assert "123" in out + assert "456" in out diff --git a/distributed/scheduler.py b/distributed/scheduler.py index cb5318a8231..581cdb44584 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3358,9 +3358,12 @@ async def close(self): await self.finished() return - await asyncio.gather( - *[plugin.before_close() for plugin in list(self.plugins.values())] - ) + try: + await asyncio.gather( + *[plugin.before_close() for plugin in list(self.plugins.values())] + ) + except Exception: + logger.exception("Plugin before_close call failed during scheduler close") self.status = Status.closing @@ -3370,9 +3373,12 @@ async def close(self): for preload in self.preloads: await preload.teardown() - await asyncio.gather( - *[plugin.close() for plugin in list(self.plugins.values())] - ) + try: + await asyncio.gather( + *[plugin.close() for plugin in list(self.plugins.values())] + ) + except Exception: + logger.exception("Plugin close call failed during scheduler close") for pc in self.periodic_callbacks.values(): pc.stop() From 728624ffa7fd3ecbfcb5dac5cb27d6e18c952eaa Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Thu, 26 May 2022 09:06:12 -0500 Subject: [PATCH 2/4] Run each plugin's close method individually --- distributed/scheduler.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 581cdb44584..b3dd01387ea 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3358,12 +3358,13 @@ async def close(self): await self.finished() return - try: - await asyncio.gather( - *[plugin.before_close() for plugin in list(self.plugins.values())] - ) - except Exception: - logger.exception("Plugin before_close call failed during scheduler close") + for plugin in self.plugins.values(): + try: + await plugin.before_close() + except Exception: + logger.exception( + "Plugin before_close call failed during scheduler close" + ) self.status = Status.closing @@ -3373,12 +3374,11 @@ async def close(self): for preload in self.preloads: await preload.teardown() - try: - await asyncio.gather( - *[plugin.close() for plugin in list(self.plugins.values())] - ) - except Exception: - logger.exception("Plugin close call failed during scheduler close") + for plugin in self.plugins.values(): + try: + await plugin.close() + except Exception: + logger.exception("Plugin close call failed during scheduler close") for pc in self.periodic_callbacks.values(): pc.stop() From 3c7b643e3cb502b6ace3b8cd156f3cb2edf16c51 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Thu, 26 May 2022 09:40:15 -0500 Subject: [PATCH 3/4] Log each exception individually --- distributed/scheduler.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index b3dd01387ea..5a12dccd37d 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3358,13 +3358,15 @@ async def close(self): await self.finished() return - for plugin in self.plugins.values(): + async def log_errors(func): try: - await plugin.before_close() + await func() except Exception: - logger.exception( - "Plugin before_close call failed during scheduler close" - ) + logger.exception("Plugin call failed during scheduler.close") + + await asyncio.gather( + *[log_errors(plugin.before_close) for plugin in list(self.plugins.values())] + ) self.status = Status.closing @@ -3374,11 +3376,9 @@ async def close(self): for preload in self.preloads: await preload.teardown() - for plugin in self.plugins.values(): - try: - await plugin.close() - except Exception: - logger.exception("Plugin close call failed during scheduler close") + await asyncio.gather( + *[log_errors(plugin.close) for plugin in list(self.plugins.values())] + ) for pc in self.periodic_callbacks.values(): pc.stop() From 8971441e22a8e8820b2da8d859d986b70f142ce7 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Thu, 26 May 2022 11:12:13 -0500 Subject: [PATCH 4/4] verify exception output --- .../diagnostics/tests/test_scheduler_plugin.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/distributed/diagnostics/tests/test_scheduler_plugin.py b/distributed/diagnostics/tests/test_scheduler_plugin.py index 57f51c98b58..5c433e7a1af 100644 --- a/distributed/diagnostics/tests/test_scheduler_plugin.py +++ b/distributed/diagnostics/tests/test_scheduler_plugin.py @@ -1,7 +1,9 @@ +import logging + import pytest from distributed import Scheduler, SchedulerPlugin, Worker, get_worker -from distributed.utils_test import gen_cluster, gen_test, inc +from distributed.utils_test import captured_logger, gen_cluster, gen_test, inc @gen_cluster(client=True) @@ -222,16 +224,22 @@ async def close(self): class Bad(SchedulerPlugin): async def before_close(self): - raise Exception() + raise Exception("BEFORE_CLOSE") async def close(self): - raise Exception() + raise Exception("AFTER_CLOSE") await s.register_scheduler_plugin(OK()) await s.register_scheduler_plugin(Bad()) - await s.close() + with captured_logger(logging.getLogger("distributed.scheduler")) as logger: + await s.close() out, err = capsys.readouterr() assert "123" in out assert "456" in out + + text = logger.getvalue() + assert "BEFORE_CLOSE" in text + text = logger.getvalue() + assert "AFTER_CLOSE" in text