Skip to content

Commit

Permalink
Remove dump cluster from gen_cluster (dask#8823)
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter authored Aug 23, 2024
1 parent 5bbceb7 commit c073797
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 154 deletions.
95 changes: 0 additions & 95 deletions distributed/tests/test_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import asyncio
import logging
import os
import pathlib
import signal
import socket
Expand All @@ -16,7 +15,6 @@
from unittest import mock

import pytest
import yaml
from tornado import gen

import dask.config
Expand All @@ -41,7 +39,6 @@
check_process_leak,
check_thread_leak,
cluster,
dump_cluster_state,
ensure_no_new_clients,
freeze_batched_send,
gen_cluster,
Expand Down Expand Up @@ -439,40 +436,6 @@ async def ping_pong():
assert await fut == "pong"


@pytest.mark.slow()
def test_dump_cluster_state_timeout(tmp_path):
sleep_time = 30

async def inner_test(c, s, a, b):
await asyncio.sleep(sleep_time)

# This timeout includes cluster startup and teardown which sometimes can
# take a significant amount of time. For this particular test we would like
# to keep the _test timeout_ small because we intend to trigger it but the
# overall timeout large.
test = gen_cluster(client=True, timeout=5, cluster_dump_directory=tmp_path)(
inner_test
)
try:
with pytest.raises(asyncio.TimeoutError) as exc:
test()
assert "inner_test" in str(exc)
assert "await asyncio.sleep(sleep_time)" in str(exc)
except gen.TimeoutError:
pytest.xfail("Cluster startup or teardown took too long")

_, dirs, files = next(os.walk(tmp_path))
assert not dirs
assert files == [inner_test.__name__ + ".yaml"]
import yaml

with open(tmp_path / files[0], "rb") as fd:
state = yaml.load(fd, Loader=yaml.Loader)

assert "scheduler" in state
assert "workers" in state


def test_assert_story():
now = time()
story = [
Expand Down Expand Up @@ -558,64 +521,6 @@ async def test_assert_story_identity(c, s, a, strict):
assert_story(worker_story, scheduler_story, strict=strict)


@gen_cluster()
async def test_dump_cluster_state(s, a, b, tmp_path):
await dump_cluster_state(s, [a, b], str(tmp_path), "dump")
with open(f"{tmp_path}/dump.yaml") as fh:
out = yaml.safe_load(fh)

assert out.keys() == {"scheduler", "workers", "versions"}
assert out["workers"].keys() == {a.address, b.address}


@gen_cluster(nthreads=[])
async def test_dump_cluster_state_no_workers(s, tmp_path):
await dump_cluster_state(s, [], str(tmp_path), "dump")
with open(f"{tmp_path}/dump.yaml") as fh:
out = yaml.safe_load(fh)

assert out.keys() == {"scheduler", "workers", "versions"}
assert out["workers"] == {}


@gen_cluster(Worker=Nanny)
async def test_dump_cluster_state_nannies(s, a, b, tmp_path):
await dump_cluster_state(s, [a, b], str(tmp_path), "dump")
with open(f"{tmp_path}/dump.yaml") as fh:
out = yaml.safe_load(fh)

assert out.keys() == {"scheduler", "workers", "versions"}
assert out["workers"].keys() == s.workers.keys()


@gen_cluster()
async def test_dump_cluster_state_unresponsive_local_worker(s, a, b, tmp_path):
a.stop()
await dump_cluster_state(s, [a, b], str(tmp_path), "dump")
with open(f"{tmp_path}/dump.yaml") as fh:
out = yaml.safe_load(fh)

assert out.keys() == {"scheduler", "workers", "versions"}
assert isinstance(out["workers"][a.address], dict)
assert isinstance(out["workers"][b.address], dict)


@pytest.mark.slow
@gen_cluster(client=True, Worker=Nanny)
async def test_dump_cluster_unresponsive_remote_worker(c, s, a, b, tmp_path):
await c.run(lambda dask_worker: dask_worker.stop(), workers=[a.worker_address])

await dump_cluster_state(s, [a, b], str(tmp_path), "dump")
with open(f"{tmp_path}/dump.yaml") as fh:
out = yaml.safe_load(fh)

assert out.keys() == {"scheduler", "workers", "versions"}
assert isinstance(out["workers"][b.worker_address], dict)
assert out["workers"][a.worker_address].startswith(
"OSError('Timed out trying to connect to"
)


# Note: WINDOWS constant doesn't work with `mypy --platform win32`
if sys.platform == "win32":
TERM_SIGNALS = (signal.SIGTERM, signal.SIGINT)
Expand Down
65 changes: 6 additions & 59 deletions distributed/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
from dask.typing import Key

from distributed import Event, Scheduler, system
from distributed import versions as version_module
from distributed.batched import BatchedSend
from distributed.client import Client, _global_clients, default_client
from distributed.comm import Comm
Expand Down Expand Up @@ -878,7 +877,7 @@ def gen_cluster(
clean_kwargs: dict[str, Any] | None = None,
# FIXME: distributed#8054
allow_unclosed: bool = True,
cluster_dump_directory: str | Literal[False] = "test_cluster_dump",
cluster_dump_directory: str | Literal[False] = False,
) -> Callable[[Callable], Callable]:
from distributed import Client

Expand All @@ -901,6 +900,11 @@ async def test_foo(scheduler, worker1, worker2, pytest_fixture_a, pytest_fixture
start
end
"""
if cluster_dump_directory:
warnings.warn(
"The `cluster_dump_directory` argument is being ignored and will be removed in a future version.",
DeprecationWarning,
)
if nthreads is None:
nthreads = [
("127.0.0.1", 1),
Expand Down Expand Up @@ -1019,14 +1023,6 @@ async def async_fn():
# This stack indicates where the coro/test is suspended
task.print_stack(file=buffer)

if cluster_dump_directory:
await dump_cluster_state(
s=s,
ws=workers,
output_dir=cluster_dump_directory,
func_name=func.__name__,
)

task.cancel()
while not task.cancelled():
await asyncio.sleep(0.01)
Expand All @@ -1048,18 +1044,6 @@ async def async_fn():
except pytest.xfail.Exception:
raise

except Exception:
if cluster_dump_directory and not has_pytestmark(
test_func, "xfail"
):
await dump_cluster_state(
s=s,
ws=workers,
output_dir=cluster_dump_directory,
func_name=func.__name__,
)
raise

try:
c = default_client()
except ValueError:
Expand Down Expand Up @@ -1122,41 +1106,6 @@ async def async_fn_outer():
return _


async def dump_cluster_state(
s: Scheduler, ws: list[ServerNode], output_dir: str, func_name: str
) -> None:
"""A variant of Client.dump_cluster_state, which does not rely on any of the below
to work:
- Having a client at all
- Client->Scheduler comms
- Scheduler->Worker comms (unless using Nannies)
"""
scheduler_info = s._to_dict()
workers_info: dict[str, Any]
versions_info = version_module.get_versions()

if not ws or isinstance(ws[0], Worker):
workers_info = {w.address: w._to_dict() for w in ws}
else:
workers_info = await s.broadcast(msg={"op": "dump_state"}, on_error="return")
workers_info = {
k: repr(v) if isinstance(v, Exception) else v
for k, v in workers_info.items()
}

state = {
"scheduler": scheduler_info,
"workers": workers_info,
"versions": versions_info,
}
os.makedirs(output_dir, exist_ok=True)
fname = os.path.join(output_dir, func_name) + ".yaml"
with open(fname, "w") as fh:
yaml.safe_dump(state, fh) # Automatically convert tuples to lists
print(f"Dumped cluster state to {fname}")


def validate_state(*servers: Scheduler | Worker | Nanny) -> None:
"""Run validate_state() on the Scheduler and all the Workers of the cluster.
Excludes workers wrapped by Nannies and workers manually started by the test.
Expand Down Expand Up @@ -1505,8 +1454,6 @@ def new_config_file(c: dict[str, Any]) -> Iterator[None]:
"""
Temporarily change configuration file to match dictionary *c*.
"""
import yaml

old_file = os.environ.get("DASK_CONFIG")
fd, path = tempfile.mkstemp(prefix="dask-config")
with os.fdopen(fd, "w") as f:
Expand Down

0 comments on commit c073797

Please sign in to comment.