Skip to content

Commit

Permalink
Improve log messages on scheduler for restart (#7150)
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter authored Oct 19, 2022
1 parent ec00cd5 commit f373a18
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 16 deletions.
3 changes: 2 additions & 1 deletion distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5701,7 +5701,7 @@ async def restart(self, client=None, timeout=30, wait_for_workers=True):
"""
stimulus_id = f"restart-{time()}"

logger.info("Releasing all requested keys")
logger.info("Restarting workers and releasing all keys.")
for cs in self.clients.values():
self.client_releases_keys(
keys=[ts.key for ts in cs.wants_what],
Expand Down Expand Up @@ -5802,6 +5802,7 @@ async def restart(self, client=None, timeout=30, wait_for_workers=True):
"will always time out. Do not use `Client.restart` in that case."
)
raise TimeoutError(msg) from None
logger.info("Restarting finished.")

async def broadcast(
self,
Expand Down
31 changes: 16 additions & 15 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -866,27 +866,28 @@ async def test_ready_remove_worker(s, a, b):

@gen_cluster(client=True, Worker=Nanny, timeout=60)
async def test_restart(c, s, a, b):
with captured_logger("distributed.scheduler") as caplog:
futures = c.map(inc, range(20))
await wait(futures)

futures = c.map(inc, range(20))
await wait(futures)
await s.restart()

await s.restart()

assert not s.computations
assert not s.task_prefixes
assert not s.task_groups
assert not s.computations
assert not s.task_prefixes
assert not s.task_groups

assert len(s.workers) == 2
assert len(s.workers) == 2

for ws in s.workers.values():
assert not ws.occupancy
assert not ws.processing
for ws in s.workers.values():
assert not ws.occupancy
assert not ws.processing

assert not s.tasks
assert not s.tasks

assert all(f.status == "cancelled" for f in futures)
x = c.submit(inc, 1)
assert await x == 2
assert all(f.status == "cancelled" for f in futures)
x = c.submit(inc, 1)
assert await x == 2
assert "restart" in caplog.getvalue().lower()


@pytest.mark.slow
Expand Down

0 comments on commit f373a18

Please sign in to comment.