Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle failing plugin.close() calls during scheduler shutdown #6450

Merged
merged 4 commits into from
May 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion distributed/diagnostics/tests/test_scheduler_plugin.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import logging

import pytest

from distributed import Scheduler, SchedulerPlugin, Worker, get_worker
from distributed.utils_test import gen_cluster, gen_test, inc
from distributed.utils_test import captured_logger, gen_cluster, gen_test, inc


@gen_cluster(client=True)
Expand Down Expand Up @@ -209,3 +211,35 @@ async def start(self, scheduler: Scheduler) -> None:
await s.register_scheduler_plugin(MyPlugin())

assert s._foo == "bar"


@gen_cluster(client=True)
async def test_closing_errors_ok(c, s, a, b, capsys):
class OK(SchedulerPlugin):
async def before_close(self):
print(123)

async def close(self):
print(456)

class Bad(SchedulerPlugin):
async def before_close(self):
raise Exception("BEFORE_CLOSE")

async def close(self):
raise Exception("AFTER_CLOSE")

await s.register_scheduler_plugin(OK())
await s.register_scheduler_plugin(Bad())

with captured_logger(logging.getLogger("distributed.scheduler")) as logger:
await s.close()

out, err = capsys.readouterr()
assert "123" in out
assert "456" in out
Comment on lines +239 to +240
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would also be nice to check that the logger exceptions also occurred as expected


text = logger.getvalue()
assert "BEFORE_CLOSE" in text
text = logger.getvalue()
assert "AFTER_CLOSE" in text
10 changes: 8 additions & 2 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3358,8 +3358,14 @@ async def close(self):
await self.finished()
return

async def log_errors(func):
try:
await func()
except Exception:
logger.exception("Plugin call failed during scheduler.close")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Including the plugin name in this message would be nice because as is we just know that some plugin method failed, but not which plugin

Copy link
Member

@graingert graingert May 26, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logger.exception includes the traceback which should include the plugin class name, although it could be a base class that fails

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, thanks for pointing that out @graingert


await asyncio.gather(
*[plugin.before_close() for plugin in list(self.plugins.values())]
*[log_errors(plugin.before_close) for plugin in list(self.plugins.values())]
)

self.status = Status.closing
Expand All @@ -3371,7 +3377,7 @@ async def close(self):
await preload.teardown()

await asyncio.gather(
*[plugin.close() for plugin in list(self.plugins.values())]
*[log_errors(plugin.close) for plugin in list(self.plugins.values())]
)

for pc in self.periodic_callbacks.values():
Expand Down