diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 3316a324efd..9cae2088cce 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -5701,7 +5701,7 @@ async def restart(self, client=None, timeout=30, wait_for_workers=True): """ stimulus_id = f"restart-{time()}" - logger.info("Releasing all requested keys") + logger.info("Restarting workers and releasing all keys.") for cs in self.clients.values(): self.client_releases_keys( keys=[ts.key for ts in cs.wants_what], @@ -5802,6 +5802,7 @@ async def restart(self, client=None, timeout=30, wait_for_workers=True): "will always time out. Do not use `Client.restart` in that case." ) raise TimeoutError(msg) from None + logger.info("Restarting finished.") async def broadcast( self, diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index ed87b1fb5fe..8a4696aafde 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -866,27 +866,28 @@ async def test_ready_remove_worker(s, a, b): @gen_cluster(client=True, Worker=Nanny, timeout=60) async def test_restart(c, s, a, b): + with captured_logger("distributed.scheduler") as caplog: + futures = c.map(inc, range(20)) + await wait(futures) - futures = c.map(inc, range(20)) - await wait(futures) + await s.restart() - await s.restart() - - assert not s.computations - assert not s.task_prefixes - assert not s.task_groups + assert not s.computations + assert not s.task_prefixes + assert not s.task_groups - assert len(s.workers) == 2 + assert len(s.workers) == 2 - for ws in s.workers.values(): - assert not ws.occupancy - assert not ws.processing + for ws in s.workers.values(): + assert not ws.occupancy + assert not ws.processing - assert not s.tasks + assert not s.tasks - assert all(f.status == "cancelled" for f in futures) - x = c.submit(inc, 1) - assert await x == 2 + assert all(f.status == "cancelled" for f in futures) + x = c.submit(inc, 1) + assert await x == 2 + assert "restart" in caplog.getvalue().lower() @pytest.mark.slow