diff --git a/distributed/cli/tests/test_dask_scheduler.py b/distributed/cli/tests/test_dask_scheduler.py index 1eb2e074c42..9741d9c506e 100644 --- a/distributed/cli/tests/test_dask_scheduler.py +++ b/distributed/cli/tests/test_dask_scheduler.py @@ -554,16 +554,16 @@ def dask_setup(worker): def test_multiple_workers(loop): scheduler_address = f"127.0.0.1:{open_port()}" - with popen( - ["dask", "scheduler", "--no-dashboard", "--host", scheduler_address] - ) as s: - with popen(["dask", "worker", scheduler_address, "--no-dashboard"]) as a: - with popen(["dask", "worker", scheduler_address, "--no-dashboard"]) as b: - with Client(scheduler_address, loop=loop) as c: - start = time() - while len(c.nthreads()) < 2: - sleep(0.1) - assert time() < start + 10 + with ( + popen(["dask", "scheduler", "--no-dashboard", "--host", scheduler_address]), + popen(["dask", "worker", scheduler_address, "--no-dashboard"]), + popen(["dask", "worker", scheduler_address, "--no-dashboard"]), + Client(scheduler_address, loop=loop) as c, + ): + start = time() + while len(c.nthreads()) < 2: + sleep(0.1) + assert time() < start + 10 @pytest.mark.slow diff --git a/distributed/cli/tests/test_dask_worker.py b/distributed/cli/tests/test_dask_worker.py index 2b342a1403b..34023e4c4dd 100644 --- a/distributed/cli/tests/test_dask_worker.py +++ b/distributed/cli/tests/test_dask_worker.py @@ -330,16 +330,16 @@ async def test_local_directory(c, s, nanny, tmp_path): @pytest.mark.slow @pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) def test_scheduler_file(loop, nanny): - with tmpfile() as fn: - with popen(["dask", "scheduler", "--no-dashboard", "--scheduler-file", fn]): - with popen( - ["dask", "worker", "--scheduler-file", fn, nanny, "--no-dashboard"] - ): - with Client(scheduler_file=fn, loop=loop) as c: - start = time() - while not c.scheduler_info()["workers"]: - sleep(0.1) - assert time() < start + 10 + with ( + tmpfile() as fn, + popen(["dask", "scheduler", "--no-dashboard", "--scheduler-file", fn]), + popen(["dask", "worker", "--scheduler-file", fn, nanny, "--no-dashboard"]), + Client(scheduler_file=fn, loop=loop) as c, + ): + start = time() + while not c.scheduler_info()["workers"]: + sleep(0.1) + assert time() < start + 10 @pytest.mark.slow diff --git a/distributed/cli/tests/test_tls_cli.py b/distributed/cli/tests/test_tls_cli.py index e6fe203be56..60be6a95287 100644 --- a/distributed/cli/tests/test_tls_cli.py +++ b/distributed/cli/tests/test_tls_cli.py @@ -31,14 +31,12 @@ def wait_for_cores(c, nthreads=1): def test_basic(loop, requires_default_ports): - with popen(["dask", "scheduler", "--no-dashboard"] + tls_args) as s: - with popen( - ["dask", "worker", "--no-dashboard", "tls://127.0.0.1:8786"] + tls_args - ) as w: - with Client( - "tls://127.0.0.1:8786", loop=loop, security=tls_security() - ) as c: - wait_for_cores(c) + with ( + popen(["dask", "scheduler", "--no-dashboard"] + tls_args), + popen(["dask", "worker", "--no-dashboard", "tls://127.0.0.1:8786"] + tls_args), + Client("tls://127.0.0.1:8786", loop=loop, security=tls_security()) as c, + ): + wait_for_cores(c) def test_sni(loop): diff --git a/distributed/comm/tcp.py b/distributed/comm/tcp.py index 76e8c958cc6..d2da61aa287 100644 --- a/distributed/comm/tcp.py +++ b/distributed/comm/tcp.py @@ -239,7 +239,7 @@ async def read(self, deserializers=None): if not sys.is_finalizing(): convert_stream_closed_error(self, e) except BaseException: - # Some OSError, CancelledError or a another "low-level" exception. + # Some OSError, CancelledError or another "low-level" exception. # We do not really know what was already read from the underlying # socket, so it is not even safe to retry here using the same stream. # The only safe thing to do is to abort. diff --git a/distributed/utils_test.py b/distributed/utils_test.py index fda004ed27e..740bc8c99c3 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -995,9 +995,10 @@ async def _cluster_factory(): async def async_fn(): result = None with dask.config.set(config): - async with _cluster_factory() as (s, workers), _client_factory( - s - ) as c: + async with ( + _cluster_factory() as (s, workers), + _client_factory(s) as c, + ): args = [s] + workers if c is not None: args = [c] + args