Skip to content

Commit

Permalink
Do not rely on logging for subprocess cluster (#8398)
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait authored Dec 13, 2023
1 parent 67b8e7f commit f2e1e51
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 11 deletions.
44 changes: 34 additions & 10 deletions distributed/deploy/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
import json
import logging
import math
import os
import tempfile
import uuid
from pathlib import Path
from typing import Any

import psutil
Expand All @@ -16,6 +20,7 @@
from distributed.compatibility import WINDOWS
from distributed.deploy.spec import ProcessInterface, SpecCluster
from distributed.deploy.utils import nprocesses_nthreads
from distributed.utils import Deadline
from distributed.worker_memory import parse_memory_limit

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -59,13 +64,20 @@ class SubprocessScheduler(Subprocess):
"""

scheduler_kwargs: dict
timeout: int
address: str | None

def __init__(
self,
scheduler_kwargs: dict | None = None,
timeout: int = 30,
):
self.scheduler_kwargs = scheduler_kwargs or {}
self.scheduler_kwargs = {
"scheduler_file": os.path.join(tempfile.gettempdir(), str(uuid.uuid4()))
}
if scheduler_kwargs:
self.scheduler_kwargs.update(scheduler_kwargs)
self.timeout = timeout
super().__init__()

async def _start(self):
Expand All @@ -78,20 +90,32 @@ async def _start(self):
),
]
logger.info(" ".join(cmd))
deadline = Deadline.after(self.timeout)
self.process = await asyncio.create_subprocess_exec(
*cmd,
stderr=asyncio.subprocess.PIPE,
)

while True:
line = (await self.process.stderr.readline()).decode()
if not line.strip():
raise RuntimeError("Scheduler failed to start")
logger.info(line.strip())
if "Scheduler at" in line:
self.address = line.split("Scheduler at:")[1].strip()
break
logger.debug(line)
scheduler_file = Path(self.scheduler_kwargs["scheduler_file"])
while not (
deadline.expired
or scheduler_file.exists()
or self.process.returncode is not None
):
await asyncio.sleep(0.1)
if deadline.expired or self.process.returncode is not None:
assert self.process.stderr
logger.error((await self.process.stderr.read()).decode())
if deadline.expired:
raise RuntimeError(f"Scheduler failed to start within {self.timeout}s")
raise RuntimeError(
f"Scheduler failed to start and exited with code {self.process.returncode}"
)

with scheduler_file.open(mode="r") as f:
identity = json.load(f)
self.address = identity["address"]
logger.info("Scheduler at %r", self.address)


class SubprocessWorker(Subprocess):
Expand Down
23 changes: 22 additions & 1 deletion distributed/deploy/tests/test_subprocess.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from __future__ import annotations

import asyncio
import logging

import pytest

from distributed import Client
Expand All @@ -9,7 +12,7 @@
SubprocessScheduler,
SubprocessWorker,
)
from distributed.utils_test import gen_test
from distributed.utils_test import gen_test, new_config_file


@pytest.mark.skipif(WINDOWS, reason="distributed#7434")
Expand Down Expand Up @@ -72,6 +75,24 @@ async def test_raise_if_scheduler_fails_to_start():
pass


@pytest.mark.skipif(WINDOWS, reason="distributed#7434")
@gen_test()
async def test_subprocess_cluster_does_not_depend_on_logging():
async def _start():
async with SubprocessCluster(
asynchronous=True,
dashboard_address=":0",
scheduler_kwargs={"idle_timeout": "5s"},
worker_kwargs={"death_timeout": "5s"},
):
pass

with new_config_file(
{"distributed": {"logging": {"distributed": logging.CRITICAL + 1}}}
):
await asyncio.wait_for(_start(), timeout=2)


@pytest.mark.skipif(
not WINDOWS, reason="Windows-specific error testing (distributed#7434)"
)
Expand Down

0 comments on commit f2e1e51

Please sign in to comment.