Skip to content

Commit

Permalink
Try restarting workers when worker-ttl expires
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Feb 27, 2024
1 parent cbf939c commit 7c20a11
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 24 deletions.
103 changes: 80 additions & 23 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@
)
from distributed.utils_perf import disable_gc_diagnosis, enable_gc_diagnosis
from distributed.variable import VariableExtension
from distributed.worker import _normalize_task
from distributed.worker import WORKER_ANY_RUNNING, WORKER_ANY_STARTING, _normalize_task

if TYPE_CHECKING:
# TODO import from typing (requires Python >=3.10)
Expand Down Expand Up @@ -428,7 +428,7 @@ class WorkerState:
versions: dict[str, Any]

#: Address of the associated :class:`~distributed.nanny.Nanny`, if present
nanny: str
nanny: str | None

#: Read-only worker status, synced one way from the remote Worker object
status: Status
Expand Down Expand Up @@ -522,7 +522,7 @@ def __init__(
nthreads: int = 0,
memory_limit: int,
local_directory: str,
nanny: str,
nanny: str | None,
server_id: str,
services: dict[str, int] | None = None,
versions: dict[str, Any] | None = None,
Expand All @@ -545,7 +545,7 @@ def __init__(
self._memory_unmanaged_old = 0
self._memory_unmanaged_history = deque()
self.metrics = {}
self.last_seen = 0
self.last_seen = time()
self.time_delay = 0
self.bandwidth = parse_bytes(dask.config.get("distributed.scheduler.bandwidth"))
self.actors = set()
Expand Down Expand Up @@ -4207,7 +4207,7 @@ def heartbeat_worker(
local_now = time()
host_info = host_info or {}

dh: dict = self.host_info.setdefault(host, {})
dh = self.host_info.setdefault(host, {})
dh["last-seen"] = local_now

frac = 1 / len(self.workers)
Expand Down Expand Up @@ -5816,20 +5816,33 @@ def handle_worker_status_change(
ws = self.workers.get(worker) if isinstance(worker, str) else worker
if not ws:
return
prev_status = ws.status
ws.status = Status[status] if isinstance(status, str) else status
if ws.status == prev_status:
if isinstance(status, str):
status = Status[status]
if status == ws.status:
return

if (
status in WORKER_ANY_RUNNING
and ws.status not in WORKER_ANY_STARTING | WORKER_ANY_RUNNING
):
# This can happen when something sets the worker status e.g. to failed
# but doesn't remove it straight away
logger.error(

Check warning on line 5830 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L5830

Added line #L5830 was not covered by tests
"Unadmissible worker status change "
f"{ws.status.name} -> {status.name}: {ws}"
)
return

self.log_event(
ws.address,
{
"action": "worker-status-change",
"prev-status": prev_status.name,
"status": ws.status.name,
"prev-status": ws.status.name,
"status": status.name,
},
)
logger.debug(f"Worker status {prev_status.name} -> {status} - {ws}")
logger.debug(f"Worker status {ws.status.name} -> {status.name}: {ws}")
ws.status = status

if ws.status == Status.running:
self.running.add(ws)
Expand Down Expand Up @@ -6296,8 +6309,8 @@ async def broadcast(
self,
*,
msg: dict,
workers: list[str] | None = None,
hosts: list[str] | None = None,
workers: Collection[str] | None = None,
hosts: Collection[str] | None = None,
nanny: bool = False,
serializers: Any = None,
on_error: Literal["raise", "return", "return_pickle", "ignore"] = "raise",
Expand All @@ -6308,13 +6321,15 @@ async def broadcast(
workers = list(self.workers)
else:
workers = []
else:
workers = list(workers)
if hosts is not None:
for host in hosts:
dh: dict = self.host_info.get(host) # type: ignore
dh = self.host_info.get(host)
if dh is not None:
workers.extend(dh["addresses"])
# TODO replace with worker_list

addresses: Collection[str | None]
if nanny:
addresses = [self.workers[w].nanny for w in workers]
else:
Expand Down Expand Up @@ -8253,19 +8268,61 @@ async def get_worker_monitor_info(self, recent=False, starts=None):
# Cleanup #
###########

async def check_worker_ttl(self):
@log_errors
async def check_worker_ttl(self) -> None:
now = time()
stimulus_id = f"check-worker-ttl-{now}"
to_restart = {}
to_close = set()
assert self.worker_ttl
ttl = max(self.worker_ttl, 10 * heartbeat_interval(len(self.workers)))

for ws in self.workers.values():
if (ws.last_seen < now - self.worker_ttl) and (
ws.last_seen < now - 10 * heartbeat_interval(len(self.workers))
):
last_seen = now - ws.last_seen
if last_seen > ttl:
if ws.nanny:
to_restart[ws.address] = ws
msg = "restarting"

Check warning on line 8285 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L8284-L8285

Added lines #L8284 - L8285 were not covered by tests
else:
to_close.add(ws)
msg = "closing"
# In case the worker resuscitates while we're restarting it, prevent
# it from receiving tasks from AMM
self.handle_worker_status_change(
Status.failed, ws, stimulus_id=stimulus_id
)
logger.warning(
"Worker failed to heartbeat within %s seconds. Closing: %s",
self.worker_ttl,
ws,
f"Worker failed to heartbeat for {last_seen} seconds; {msg}: {ws}"
)
await self.remove_worker(address=ws.address, stimulus_id=stimulus_id)

if to_restart:
result = await self.broadcast(

Check warning on line 8299 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L8299

Added line #L8299 was not covered by tests
msg={"op": "restart", "reason": stimulus_id},
workers=list(to_restart),
nanny=True,
on_error="return",
)
for addr, res in result.items():
if res != "OK":
ws = to_restart[addr]
assert isinstance(res, BaseException)
logger.error(

Check warning on line 8309 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L8305-L8309

Added lines #L8305 - L8309 were not covered by tests
f"Nanny failed to restart worker {ws}; removing it",
exc_info=res,
)
to_close.add(ws)

Check warning on line 8313 in distributed/scheduler.py

View check run for this annotation

Codecov / codecov/patch

distributed/scheduler.py#L8313

Added line #L8313 was not covered by tests

# Many seconds could have passed.
# Workers may have been removed or replaced by others with the same address.
to_close = {ws for ws in to_close if self.workers.get(ws.address) is ws}
if to_close:
for ws in to_close:
# Guaranteed by handle_worker_status_change
assert ws.status not in WORKER_ANY_RUNNING, ws
await asyncio.gather(
self.remove_worker(address=ws.address, stimulus_id=stimulus_id)
for ws in to_close
)

def check_idle(self) -> float | None:
if self.status in (Status.closing, Status.closed):
Expand Down
1 change: 0 additions & 1 deletion distributed/tests/test_active_memory_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1084,7 +1084,6 @@ async def test_RetireWorker_new_keys_arrive_after_all_keys_moved_away(c, s, a, b
@gen_cluster(
client=True,
config={
"distributed.scheduler.worker-ttl": "500ms",
"distributed.scheduler.active-memory-manager.start": True,
"distributed.scheduler.active-memory-manager.interval": 0.05,
"distributed.scheduler.active-memory-manager.measure": "managed",
Expand Down
5 changes: 5 additions & 0 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@
Status.paused,
Status.closing_gracefully,
}
WORKER_ANY_STARTING = {
Status.created,
Status.init,
Status.starting,
}


class GetDataBusy(TypedDict):
Expand Down

0 comments on commit 7c20a11

Please sign in to comment.