Skip to content

Commit

Permalink
Try restarting workers when worker-ttl expires
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Mar 4, 2024
1 parent 44e66f2 commit e24f463
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 16 deletions.
28 changes: 19 additions & 9 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ def __init__(
self._memory_unmanaged_old = 0
self._memory_unmanaged_history = deque()
self.metrics = {}
self.last_seen = 0
self.last_seen = time()
self.time_delay = 0
self.bandwidth = parse_bytes(dask.config.get("distributed.scheduler.bandwidth"))
self.actors = set()
Expand Down Expand Up @@ -8391,19 +8391,29 @@ async def get_worker_monitor_info(self, recent=False, starts=None):
# Cleanup #
###########

async def check_worker_ttl(self):
@log_errors
async def check_worker_ttl(self) -> None:
now = time()
stimulus_id = f"check-worker-ttl-{now}"
assert self.worker_ttl
ttl = max(self.worker_ttl, 10 * heartbeat_interval(len(self.workers)))
to_restart = []

Check warning on line 8400 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L8398-L8400

Added lines #L8398 - L8400 were not covered by tests

for ws in self.workers.values():
if (ws.last_seen < now - self.worker_ttl) and (
ws.last_seen < now - 10 * heartbeat_interval(len(self.workers))
):
last_seen = now - ws.last_seen
if last_seen > ttl:
to_restart.append(ws.address)

Check warning on line 8405 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L8403-L8405

Added lines #L8403 - L8405 were not covered by tests
logger.warning(
"Worker failed to heartbeat within %s seconds. Closing: %s",
self.worker_ttl,
ws,
f"Worker failed to heartbeat for {last_seen:.0f}s; "
f"{'attempting restart' if ws.nanny else 'removing'}: {ws}"
)
await self.remove_worker(address=ws.address, stimulus_id=stimulus_id)

if to_restart:
await self.restart_workers(

Check warning on line 8412 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L8411-L8412

Added lines #L8411 - L8412 were not covered by tests
to_restart,
wait_for_workers=False,
stimulus_id=stimulus_id,
)

def check_idle(self) -> float | None:
if self.status in (Status.closing, Status.closed):
Expand Down
1 change: 0 additions & 1 deletion distributed/tests/test_active_memory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1084,7 +1084,6 @@ async def test_RetireWorker_new_keys_arrive_after_all_keys_moved_away(c, s, a, b
@gen_cluster(
client=True,
config={
"distributed.scheduler.worker-ttl": "500ms",
"distributed.scheduler.active-memory-manager.start": True,
"distributed.scheduler.active-memory-manager.interval": 0.05,
"distributed.scheduler.active-memory-manager.measure": "managed",
Expand Down
58 changes: 52 additions & 6 deletions distributed/tests/test_failed_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from dask import delayed
from dask.utils import parse_bytes

from distributed import Client, Nanny, profile, wait
from distributed import Client, KilledWorker, Nanny, get_worker, profile, wait
from distributed.comm import CommClosedError
from distributed.compatibility import MACOS
from distributed.metrics import time
Expand Down Expand Up @@ -450,10 +450,10 @@ async def test_restart_timeout_on_long_running_task(c, s, a):


@pytest.mark.slow
@gen_cluster(client=True, scheduler_kwargs={"worker_ttl": "500ms"})
@gen_cluster(client=True, config={"distributed.scheduler.worker-ttl": "500ms"})
async def test_worker_time_to_live(c, s, a, b):
from distributed.scheduler import heartbeat_interval

# Note that this value is ignored because is less than 10x heartbeat_interval
assert s.worker_ttl == 0.5
assert set(s.workers) == {a.address, b.address}

a.periodic_callbacks["heartbeat"].stop()
Expand All @@ -465,10 +465,56 @@ async def test_worker_time_to_live(c, s, a, b):

# Worker removal is triggered after 10 * heartbeat
# This is 10 * 0.5s at the moment of writing.
interval = 10 * heartbeat_interval(len(s.workers))
# Currently observing an extra 0.3~0.6s on top of the interval.
# Adding some padding to prevent flakiness.
assert time() - start < interval + 2.0
assert time() - start < 7


@pytest.mark.slow
@pytest.mark.parametrize("block_evloop", [False, True])
@gen_cluster(
client=True,
Worker=Nanny,
nthreads=[("", 1)],
scheduler_kwargs={"worker_ttl": "500ms", "allowed_failures": 0},
timeout=15,
)
async def test_worker_ttl_restarts_worker(c, s, a, block_evloop):
"""If the event loop of a worker becomes completely unresponsive, the scheduler will
restart it through the nanny.
"""

async def f():
w = get_worker()
w.periodic_callbacks["heartbeat"].stop()
if block_evloop:
sleep(9999) # Block event loop indefinitely
else:
await asyncio.sleep(9999)

ws = s.workers[a.worker_address]
fut = c.submit(f, key="x")

# Most times at this point we'll have 0 workers while the nanny is restarting the
# worker. However, the worker may already have been replaced, and there's a chance
# the new worker may randomly have the same port.
while not s.workers or s.workers.get(a.worker_address) is ws:
await asyncio.sleep(0.01)

if block_evloop:
# The nanny killed the worker with SIGKILL.
# The restart has increased the suspicious count.
with pytest.raises(KilledWorker):
await fut
assert s.tasks["x"].state == "erred"
assert s.tasks["x"].suspicious == 1
else:
# The nanny sent to the WorkerProcess a {op: stop} through IPC, which in turn
# successfully invoked Worker.close(nanny=False).
# This behaviour makes sense as the worker-ttl timeout was most likely caused
# by a failure in networking, rather than a hung process.
assert s.tasks["x"].state == "processing"
assert s.tasks["x"].suspicious == 0


@gen_cluster(client=True, nthreads=[("", 1)])
Expand Down

0 comments on commit e24f463

Please sign in to comment.