Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into more_fixes_for_test_s…
Browse files Browse the repository at this point in the history
…catter_death
  • Loading branch information
fjetter committed Jun 1, 2022
2 parents 15a56e6 + c2b28cf commit 0144bb2
Show file tree
Hide file tree
Showing 52 changed files with 1,330 additions and 748 deletions.
2 changes: 1 addition & 1 deletion continuous_integration/environment-3.10.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ dependencies:
- bokeh
- click
- cloudpickle
- coverage<6.3 # https://github.com/nedbat/coveragepy/issues/1310
- coverage
- dask # overridden by git tip below
- filesystem-spec # overridden by git tip below
- h5py
Expand Down
2 changes: 1 addition & 1 deletion continuous_integration/gpuci/axis.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ LINUX_VER:
- ubuntu18.04

RAPIDS_VER:
- "22.06"
- "22.08"

excludes:
6 changes: 5 additions & 1 deletion distributed/_signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,8 @@ def handle_signal(signum, frame):
for sig in signals:
old_handlers[sig] = signal.signal(sig, handle_signal)

await event.wait()
try:
await event.wait()
finally:
for sig in signals:
signal.signal(sig, old_handlers[sig])
4 changes: 1 addition & 3 deletions distributed/chaos.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ async def setup(self, worker):
)

def graceful(self):
asyncio.create_task(
self.worker.close(report=False, nanny=False, executor_wait=False)
)
asyncio.create_task(self.worker.close(nanny=False, executor_wait=False))

def sys_exit(self):
sys.exit(0)
Expand Down
3 changes: 0 additions & 3 deletions distributed/cli/dask_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import warnings

import click
from tornado.ioloop import IOLoop

from distributed import Scheduler
from distributed._signals import wait_for_signals
Expand Down Expand Up @@ -186,11 +185,9 @@ def del_pid_file():
resource.setrlimit(resource.RLIMIT_NOFILE, (limit, hard))

async def run():
loop = IOLoop.current()
logger.info("-" * 47)

scheduler = Scheduler(
loop=loop,
security=sec,
host=host,
port=port,
Expand Down
38 changes: 24 additions & 14 deletions distributed/cli/tests/test_dask_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
assert_can_connect_from_everywhere_4_6,
assert_can_connect_locally_4,
popen,
wait_for_log_line,
)


Expand Down Expand Up @@ -66,12 +67,8 @@ def test_dashboard(loop):
pytest.importorskip("bokeh")

with popen(["dask-scheduler"], flush_output=False) as proc:
for line in proc.stdout:
if b"dashboard at" in line:
dashboard_port = int(line.decode().split(":")[-1].strip())
break
else:
assert False # pragma: nocover
line = wait_for_log_line(b"dashboard at", proc.stdout)
dashboard_port = int(line.decode().split(":")[-1].strip())

with Client(f"127.0.0.1:{Scheduler.default_port}", loop=loop):
pass
Expand Down Expand Up @@ -223,13 +220,9 @@ def test_dashboard_port_zero(loop):
["dask-scheduler", "--dashboard-address", ":0"],
flush_output=False,
) as proc:
for line in proc.stdout:
if b"dashboard at" in line:
dashboard_port = int(line.decode().split(":")[-1].strip())
assert dashboard_port != 0
break
else:
assert False # pragma: nocover
line = wait_for_log_line(b"dashboard at", proc.stdout)
dashboard_port = int(line.decode().split(":")[-1].strip())
assert dashboard_port != 0


PRELOAD_TEXT = """
Expand Down Expand Up @@ -413,7 +406,7 @@ def test_version_option():


@pytest.mark.slow
def test_idle_timeout(loop):
def test_idle_timeout():
start = time()
runner = CliRunner()
result = runner.invoke(
Expand All @@ -424,6 +417,23 @@ def test_idle_timeout(loop):
assert result.exit_code == 0


@pytest.mark.slow
def test_restores_signal_handler():
# another test could have altered the signal handler, so use a new function
# that both has sensible sigint behaviour *and* can be used as a sentinel
def raise_ki():
raise KeyboardInterrupt

original_handler = signal.signal(signal.SIGINT, raise_ki)
try:
CliRunner().invoke(
distributed.cli.dask_scheduler.main, ["--idle-timeout", "1s"]
)
assert signal.getsignal(signal.SIGINT) is raise_ki
finally:
signal.signal(signal.SIGINT, original_handler)


def test_multiple_workers_2(loop):
text = """
def dask_setup(worker):
Expand Down
10 changes: 3 additions & 7 deletions distributed/cli/tests/test_dask_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from distributed import Client
from distributed.cli.dask_ssh import main
from distributed.compatibility import MACOS, WINDOWS
from distributed.utils_test import popen
from distributed.utils_test import popen, wait_for_log_line

pytest.importorskip("paramiko")
pytestmark = [
Expand All @@ -30,16 +30,12 @@ def test_ssh_cli_nprocs_renamed_to_nworkers(loop):
# This interrupt is necessary for the cluster to place output into the stdout
# and stderr pipes
proc.send_signal(2)
assert any(
b"renamed to --nworkers" in proc.stdout.readline() for _ in range(15)
)
wait_for_log_line(b"renamed to --nworkers", proc.stdout, max_lines=15)


def test_ssh_cli_nworkers_with_nprocs_is_an_error():
with popen(
["dask-ssh", "localhost", "--nprocs=2", "--nworkers=2"],
flush_output=False,
) as proc:
assert any(
b"Both --nprocs and --nworkers" in proc.stdout.readline() for _ in range(15)
)
wait_for_log_line(b"Both --nprocs and --nworkers", proc.stdout, max_lines=15)
70 changes: 42 additions & 28 deletions distributed/cli/tests/test_dask_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
from distributed.compatibility import LINUX, WINDOWS
from distributed.deploy.utils import nprocesses_nthreads
from distributed.metrics import time
from distributed.utils_test import gen_cluster, popen, requires_ipv6
from distributed.utils import open_port
from distributed.utils_test import gen_cluster, popen, requires_ipv6, wait_for_log_line


@pytest.mark.parametrize(
Expand Down Expand Up @@ -245,9 +246,7 @@ async def test_nanny_worker_port_range_too_many_workers_raises(s):
],
flush_output=False,
) as worker:
assert any(
b"Not enough ports in range" in worker.stdout.readline() for _ in range(100)
)
wait_for_log_line(b"Not enough ports in range", worker.stdout, max_lines=100)


@pytest.mark.slow
Expand Down Expand Up @@ -281,26 +280,14 @@ async def test_reconnect_deprecated(c, s):
["dask-worker", s.address, "--reconnect"],
flush_output=False,
) as worker:
for _ in range(10):
line = worker.stdout.readline()
print(line)
if b"`--reconnect` option has been removed" in line:
break
else:
raise AssertionError("Message not printed, see stdout")
wait_for_log_line(b"`--reconnect` option has been removed", worker.stdout)
assert worker.wait() == 1

with popen(
["dask-worker", s.address, "--no-reconnect"],
flush_output=False,
) as worker:
for _ in range(10):
line = worker.stdout.readline()
print(line)
if b"flag is deprecated, and will be removed" in line:
break
else:
raise AssertionError("Message not printed, see stdout")
wait_for_log_line(b"flag is deprecated, and will be removed", worker.stdout)
await c.wait_for_workers(1)
await c.shutdown()

Expand Down Expand Up @@ -376,9 +363,7 @@ async def test_nworkers_requires_nanny(s):
["dask-worker", s.address, "--nworkers=2", "--no-nanny"],
flush_output=False,
) as worker:
assert any(
b"Failed to launch worker" in worker.stdout.readline() for _ in range(15)
)
wait_for_log_line(b"Failed to launch worker", worker.stdout, max_lines=15)


@pytest.mark.slow
Expand Down Expand Up @@ -418,9 +403,7 @@ async def test_worker_cli_nprocs_renamed_to_nworkers(c, s):
flush_output=False,
) as worker:
await c.wait_for_workers(2)
assert any(
b"renamed to --nworkers" in worker.stdout.readline() for _ in range(15)
)
wait_for_log_line(b"renamed to --nworkers", worker.stdout, max_lines=15)


@gen_cluster(nthreads=[])
Expand All @@ -429,10 +412,7 @@ async def test_worker_cli_nworkers_with_nprocs_is_an_error(s):
["dask-worker", s.address, "--nprocs=2", "--nworkers=2"],
flush_output=False,
) as worker:
assert any(
b"Both --nprocs and --nworkers" in worker.stdout.readline()
for _ in range(15)
)
wait_for_log_line(b"Both --nprocs and --nworkers", worker.stdout, max_lines=15)


@pytest.mark.slow
Expand Down Expand Up @@ -713,3 +693,37 @@ async def test_signal_handling(c, s, nanny, sig):
assert "timed out" not in logs
assert "error" not in logs
assert "exception" not in logs


@pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"])
def test_error_during_startup(monkeypatch, nanny):
# see https://github.com/dask/distributed/issues/6320
scheduler_port = str(open_port())
scheduler_addr = f"tcp://127.0.0.1:{scheduler_port}"

monkeypatch.setenv("DASK_SCHEDULER_ADDRESS", scheduler_addr)
with popen(
[
"dask-scheduler",
"--port",
scheduler_port,
],
flush_output=False,
) as scheduler:
start = time()
# Wait for the scheduler to be up
wait_for_log_line(b"Scheduler at", scheduler.stdout)
# Ensure this is not killed by pytest-timeout
if time() - start > 5:
raise TimeoutError("Scheduler failed to start in time.")

with popen(
[
"dask-worker",
scheduler_addr,
nanny,
"--worker-port",
scheduler_port,
],
) as worker:
assert worker.wait(5) == 1
2 changes: 1 addition & 1 deletion distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1630,7 +1630,7 @@ async def _shutdown(self):
else:
with suppress(CommClosedError):
self.status = "closing"
await self.scheduler.terminate(close_workers=True)
await self.scheduler.terminate()

def shutdown(self):
"""Shut down the connected scheduler and workers
Expand Down
2 changes: 1 addition & 1 deletion distributed/comm/tests/test_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ async def client_communicate(key, delay=0):

@pytest.mark.gpu
@gen_test()
async def test_ucx_client_server():
async def test_ucx_client_server(ucx_loop):
pytest.importorskip("distributed.comm.ucx")
ucp = pytest.importorskip("ucp")

Expand Down
Loading

0 comments on commit 0144bb2

Please sign in to comment.