From 21c7e40ca64464884371bec8fade07a8fb2148ad Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Wed, 11 Jan 2023 11:52:49 +0000 Subject: [PATCH 01/10] avoid calls to make_current() and make_clear() by using asyncio.run in LoopRunner Closes #6784 --- distributed/actor.py | 39 +--- distributed/deploy/cluster.py | 13 +- distributed/deploy/spec.py | 11 +- distributed/deploy/tests/test_adaptive.py | 5 +- distributed/deploy/tests/test_spec_cluster.py | 2 - distributed/tests/test_actor.py | 6 +- distributed/tests/test_client.py | 55 +---- distributed/tests/test_client_loop.py | 6 - distributed/tests/test_utils.py | 136 ++++-------- distributed/utils.py | 201 +++++++++++------- 10 files changed, 198 insertions(+), 276 deletions(-) diff --git a/distributed/actor.py b/distributed/actor.py index b5d1b32a0fb..c9db58c689b 100644 --- a/distributed/actor.py +++ b/distributed/actor.py @@ -1,10 +1,10 @@ from __future__ import annotations import abc -import asyncio import functools import sys import threading +from collections.abc import Generator from dataclasses import dataclass from datetime import timedelta from typing import Generic, Literal, NoReturn, TypeVar @@ -13,43 +13,16 @@ from distributed.client import Future from distributed.protocol import to_serialize -from distributed.utils import iscoroutinefunction, sync, thread_state +from distributed.utils import LateLoopEvent, iscoroutinefunction, sync, thread_state from distributed.utils_comm import WrappedKey from distributed.worker import get_client, get_worker -_T = TypeVar("_T") - if sys.version_info >= (3, 9): - from collections.abc import Awaitable, Generator + from collections.abc import Awaitable else: - from typing import Awaitable, Generator - -if sys.version_info >= (3, 10): - from asyncio import Event as _LateLoopEvent -else: - # In python 3.10 asyncio.Lock and other primitives no longer support - # passing a loop kwarg to bind to a loop running in another thread - # e.g. calling from Client(asynchronous=False). Instead the loop is bound - # as late as possible: when calling any methods that wait on or wake - # Future instances. See: https://bugs.python.org/issue42392 - class _LateLoopEvent: - def __init__(self) -> None: - self._event: asyncio.Event | None = None - - def set(self) -> None: - if self._event is None: - self._event = asyncio.Event() + from typing import Awaitable - self._event.set() - - def is_set(self) -> bool: - return self._event is not None and self._event.is_set() - - async def wait(self) -> bool: - if self._event is None: - self._event = asyncio.Event() - - return await self._event.wait() +_T = TypeVar("_T") class Actor(WrappedKey): @@ -317,7 +290,7 @@ def unwrap(self) -> NoReturn: class ActorFuture(BaseActorFuture[_T]): def __init__(self, io_loop: IOLoop): self._io_loop = io_loop - self._event = _LateLoopEvent() + self._event = LateLoopEvent() self._out: _Error | _OK[_T] | None = None def __await__(self) -> Generator[object, None, _T]: diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index 21b8f9d9c47..08006f550ee 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -5,6 +5,7 @@ import logging import uuid import warnings +from collections.abc import Awaitable from contextlib import suppress from inspect import isawaitable from typing import Any @@ -205,16 +206,17 @@ async def _close(self): self.status = Status.closed - def close(self, timeout=None): + def close(self, timeout: float | None = None) -> Awaitable[None] | None: # If the cluster is already closed, we're already done if self.status == Status.closed: if self.asynchronous: return NoOpAwaitable() - else: - return + return None - with suppress(RuntimeError): # loop closed during process shutdown + try: return self.sync(self._close, callback_timeout=timeout) + except RuntimeError: # loop closed during process shutdown + return None def __del__(self, _warn=warnings.warn): if getattr(self, "status", Status.closed) != Status.closed: @@ -521,7 +523,8 @@ def __enter__(self): return self.sync(self.__aenter__) def __exit__(self, exc_type, exc_value, traceback): - return self.sync(self.__aexit__, exc_type, exc_value, traceback) + aw = self.close() + assert aw is None def __await__(self): return self diff --git a/distributed/deploy/spec.py b/distributed/deploy/spec.py index c35ec764aff..e4d9c582b91 100644 --- a/distributed/deploy/spec.py +++ b/distributed/deploy/spec.py @@ -288,8 +288,15 @@ def __init__( self.sync(self._correct_state) except Exception: self.sync(self.close) + self._loop_runner.stop() raise + def close(self, timeout: float | None = None) -> Awaitable[None] | None: + aw = super().close(timeout) + if not self.asynchronous: + self._loop_runner.stop() + return aw + async def _start(self): while self.status == Status.starting: await asyncio.sleep(0.01) @@ -471,10 +478,6 @@ async def __aenter__(self): assert self.status == Status.running return self - def __exit__(self, exc_type, exc_value, traceback): - super().__exit__(exc_type, exc_value, traceback) - self._loop_runner.stop() - def _threads_per_worker(self) -> int: """Return the number of threads per worker for new workers""" if not self.new_spec: # pragma: no cover diff --git a/distributed/deploy/tests/test_adaptive.py b/distributed/deploy/tests/test_adaptive.py index e8a43dd1160..49ebcb2bc6a 100644 --- a/distributed/deploy/tests/test_adaptive.py +++ b/distributed/deploy/tests/test_adaptive.py @@ -279,8 +279,6 @@ async def test_no_more_workers_than_tasks(): assert len(cluster.scheduler.workers) <= 1 -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") def test_basic_no_loop(cleanup): loop = None try: @@ -293,8 +291,7 @@ def test_basic_no_loop(cleanup): assert future.result() == 2 loop = cluster.loop finally: - if loop is not None: - loop.add_callback(loop.stop) + assert loop is None or not loop.asyncio_loop.is_running() @pytest.mark.flaky(condition=LINUX, reruns=10, reruns_delay=5) diff --git a/distributed/deploy/tests/test_spec_cluster.py b/distributed/deploy/tests/test_spec_cluster.py index 32e13a6db80..f875db0c3ed 100644 --- a/distributed/deploy/tests/test_spec_cluster.py +++ b/distributed/deploy/tests/test_spec_cluster.py @@ -82,8 +82,6 @@ def test_spec_sync(loop): assert result == 11 -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") def test_loop_started_in_constructor(cleanup): # test that SpecCluster.__init__ starts a loop in another thread cluster = SpecCluster(worker_spec, scheduler=scheduler, loop=None) diff --git a/distributed/tests/test_actor.py b/distributed/tests/test_actor.py index 9c8163af6a9..32ee7a28fe0 100644 --- a/distributed/tests/test_actor.py +++ b/distributed/tests/test_actor.py @@ -18,8 +18,8 @@ get_client, wait, ) -from distributed.actor import _LateLoopEvent from distributed.metrics import time +from distributed.utils import LateLoopEvent from distributed.utils_test import cluster, gen_cluster @@ -261,7 +261,7 @@ def test_sync(client): def test_timeout(client): class Waiter: def __init__(self): - self.event = _LateLoopEvent() + self.event = LateLoopEvent() async def set(self): self.event.set() @@ -553,7 +553,7 @@ def sleep(self, time): async def test_waiter(c, s, a, b): class Waiter: def __init__(self): - self.event = _LateLoopEvent() + self.event = LateLoopEvent() async def set(self): self.event.set() diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index b244ced842b..e4192cc55ec 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -82,14 +82,7 @@ from distributed.metrics import time from distributed.scheduler import CollectTaskMetaDataPlugin, KilledWorker, Scheduler from distributed.sizeof import sizeof -from distributed.utils import ( - NoOpAwaitable, - get_mp_context, - is_valid_xml, - open_port, - sync, - tmp_text, -) +from distributed.utils import get_mp_context, is_valid_xml, open_port, sync, tmp_text from distributed.utils_test import ( NO_AMM, BlockedGatherDep, @@ -2208,27 +2201,8 @@ async def test_multi_client(s, a, b): await asyncio.sleep(0.01) -@contextmanager -def _pristine_loop(): - IOLoop.clear_instance() - IOLoop.clear_current() - loop = IOLoop() - loop.make_current() - assert IOLoop.current() is loop - try: - yield loop - finally: - try: - loop.close(all_fds=True) - except (KeyError, ValueError): - pass - IOLoop.clear_instance() - IOLoop.clear_current() - - def long_running_client_connection(address): - with _pristine_loop(): - c = Client(address) + with Client(address, loop=None) as c: x = c.submit(lambda x: x + 1, 10) x.result() sleep(100) @@ -2889,8 +2863,6 @@ async def test_startup_close_startup(s, a, b): pass -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") def test_startup_close_startup_sync(loop): with cluster() as (s, [a, b]): with Client(s["address"], loop=loop) as c: @@ -5613,23 +5585,12 @@ async def test_future_auto_inform(c, s, a, b): await asyncio.sleep(0.01) -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:clear_current is deprecated:DeprecationWarning") def test_client_async_before_loop_starts(cleanup): - async def close(): - async with client: - pass - - with _pristine_loop() as loop: - with pytest.warns( - DeprecationWarning, - match=r"Constructing LoopRunner\(loop=loop\) without a running loop is deprecated", - ): - client = Client(asynchronous=True, loop=loop) - assert client.asynchronous - assert isinstance(client.close(), NoOpAwaitable) - loop.run_sync(close) # TODO: client.close() does not unset global client + with pytest.raises( + RuntimeError, + match=r"Constructing LoopRunner\(asynchronous=True\) without a running loop is not supported", + ): + client = Client(asynchronous=True, loop=None) @pytest.mark.slow @@ -7047,8 +7008,6 @@ async def test_workers_collection_restriction(c, s, a, b): assert a.data and not b.data -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)]) async def test_get_client_functions_spawn_clusters(c, s, a): # see gh4565 diff --git a/distributed/tests/test_client_loop.py b/distributed/tests/test_client_loop.py index 9d8316cdd09..723031145cc 100644 --- a/distributed/tests/test_client_loop.py +++ b/distributed/tests/test_client_loop.py @@ -2,8 +2,6 @@ import contextlib -import pytest - from distributed import Client, LocalCluster from distributed.utils import LoopRunner @@ -29,16 +27,12 @@ def _check_cluster_and_client_loop(loop): # Test if Client stops LoopRunner on close. -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") def test_close_loop_sync_start_new_loop(cleanup): with _check_loop_runner(): _check_cluster_and_client_loop(loop=None) # Test if Client stops LoopRunner on close. -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") def test_close_loop_sync_use_running_loop(cleanup): with _check_loop_runner(): # Start own loop or use current thread's one. diff --git a/distributed/tests/test_utils.py b/distributed/tests/test_utils.py index 25ff3026999..ea9cf7e06c5 100644 --- a/distributed/tests/test_utils.py +++ b/distributed/tests/test_utils.py @@ -391,57 +391,41 @@ def assert_not_running(loop): q.get(timeout=0.02) -_loop_not_running_property_warning = functools.partial( - pytest.warns, - DeprecationWarning, - match=r"Accessing the loop property while the loop is not running is deprecated", -) -_explicit_loop_is_not_running_warning = functools.partial( - pytest.warns, - DeprecationWarning, - match=r"Constructing LoopRunner\(loop=loop\) without a running loop is deprecated", -) -_implicit_loop_is_not_running_warning = functools.partial( - pytest.warns, - DeprecationWarning, - match=r"Constructing a LoopRunner\(asynchronous=True\) without a running loop is deprecated", -) - - -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") def test_loop_runner(loop_in_thread): # Implicit loop - loop = IOLoop() - loop.make_current() - runner = LoopRunner() - with _loop_not_running_property_warning(): - assert runner.loop not in (loop, loop_in_thread) + async def make_looprunner_in_async_context(): + return IOLoop.current(), LoopRunner() + + loop, runner = asyncio.run(make_looprunner_in_async_context()) + with pytest.raises( + RuntimeError, + match=r"Accessing the loop property while the loop is not running is not supported", + ): + runner.loop assert not runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) runner.start() assert runner.is_started() assert_running(runner.loop) + assert runner.loop is not loop runner.stop() assert not runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) + with pytest.raises( + RuntimeError, + match=r"Accessing the loop property while the loop is not running is not supported", + ): + runner.loop + + async def make_io_loop_in_async_context(): + # calling IOLoop() raises DeprecationWarning: There is no current event loop + return IOLoop.current() # Explicit loop - loop = IOLoop() - with _explicit_loop_is_not_running_warning(): - runner = LoopRunner(loop=loop) - with _loop_not_running_property_warning(): - assert runner.loop is loop - assert not runner.is_started() - assert_not_running(loop) - runner.start() - assert runner.is_started() - assert_running(loop) - runner.stop() - assert not runner.is_started() - assert_not_running(loop) + loop = asyncio.run(make_io_loop_in_async_context()) + with pytest.raises( + RuntimeError, + match=r"Constructing LoopRunner\(loop=loop\) without a running loop is not supported", + ): + LoopRunner(loop=loop) # Explicit loop, already started runner = LoopRunner(loop=loop_in_thread) @@ -455,57 +439,30 @@ def test_loop_runner(loop_in_thread): assert_running(loop_in_thread) # Implicit loop, asynchronous=True - loop = IOLoop() - loop.make_current() - with _implicit_loop_is_not_running_warning(): - runner = LoopRunner(asynchronous=True) - with _loop_not_running_property_warning(): - assert runner.loop is loop - assert not runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) - runner.start() - assert runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) - runner.stop() - assert not runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) - - # Explicit loop, asynchronous=True - loop = IOLoop() - with _explicit_loop_is_not_running_warning(): - runner = LoopRunner(loop=loop, asynchronous=True) - with _loop_not_running_property_warning(): - assert runner.loop is loop - assert not runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) - runner.start() - assert runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) - runner.stop() - assert not runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) + with pytest.raises( + RuntimeError, + match=r"Constructing LoopRunner\(asynchronous=True\) without a running loop is not supported", + ): + LoopRunner(asynchronous=True) + + # Explicit loop + loop = asyncio.run(make_io_loop_in_async_context()) + with pytest.raises( + RuntimeError, + match=r"Constructing LoopRunner\(loop=loop\) without a running loop is not supported", + ): + LoopRunner(loop=loop, asynchronous=True) -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") def test_two_loop_runners(loop_in_thread): # Loop runners tied to the same loop should cooperate # ABCCBA - loop = IOLoop() - with _explicit_loop_is_not_running_warning(): - a = LoopRunner(loop=loop) - with _explicit_loop_is_not_running_warning(): - b = LoopRunner(loop=loop) - assert_not_running(loop) + a = LoopRunner() a.start() + loop = a.loop assert_running(loop) + b = LoopRunner(loop=loop) c = LoopRunner(loop=loop) b.start() assert_running(loop) @@ -519,13 +476,12 @@ def test_two_loop_runners(loop_in_thread): assert_not_running(loop) # ABCABC - loop = IOLoop() - with _explicit_loop_is_not_running_warning(): - a = LoopRunner(loop=loop) - with _explicit_loop_is_not_running_warning(): - b = LoopRunner(loop=loop) - assert_not_running(loop) + a = LoopRunner() a.start() + loop = a.loop + assert_running(loop) + b = LoopRunner(loop=loop) + c = LoopRunner(loop=loop) assert_running(loop) b.start() assert_running(loop) diff --git a/distributed/utils.py b/distributed/utils.py index b4cafc14cb7..31da658ed68 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -407,6 +407,61 @@ def wait(timeout): return result +if sys.version_info >= (3, 10): + from asyncio import Event as LateLoopEvent +else: + # In python 3.10 asyncio.Lock and other primitives no longer support + # passing a loop kwarg to bind to a loop running in another thread + # e.g. calling from Client(asynchronous=False). Instead the loop is bound + # as late as possible: when calling any methods that wait on or wake + # Future instances. See: https://bugs.python.org/issue42392 + class LateLoopEvent: + def __init__(self) -> None: + self._event: asyncio.Event | None = None + + def set(self) -> None: + if self._event is None: + self._event = asyncio.Event() + + self._event.set() + + def is_set(self) -> bool: + return self._event is not None and self._event.is_set() + + async def wait(self) -> bool: + if self._event is None: + self._event = asyncio.Event() + + return await self._event.wait() + + +class _CollectErrorThread: + def __init__(self, target: Callable[[], None], daemon: bool, name: str): + self._exception: BaseException | None = None + + def wrapper() -> None: + try: + target() + except BaseException as e: + self._exception = e + + self._thread = thread = threading.Thread( + target=wrapper, daemon=daemon, name=name + ) + thread.start() + + def join(self, timeout: float | None = None) -> None: + thread = self._thread + thread.join(timeout=timeout) + if thread.is_alive(): + raise TimeoutError("join timed out") + if self._exception is not None: + try: + raise self._exception + finally: # remove a reference cycle + del self._exception + + class LoopRunner: """ A helper to start and stop an IO loop in a controlled way. @@ -430,35 +485,28 @@ class LoopRunner: ] = weakref.WeakKeyDictionary() _lock = threading.Lock() - def __init__(self, loop=None, asynchronous=False): + def __init__(self, loop: IOLoop | None = None, asynchronous: bool = False): if loop is None: if asynchronous: + # raises RuntimeError if there's no running loop try: asyncio.get_running_loop() - except RuntimeError: - warnings.warn( - "Constructing a LoopRunner(asynchronous=True) without a running loop is deprecated", - DeprecationWarning, - stacklevel=2, - ) - self._loop = IOLoop.current() - else: - # We're expecting the loop to run in another thread, - # avoid re-using this thread's assigned loop - self._loop = IOLoop() - else: - if not loop.asyncio_loop.is_running(): - warnings.warn( - "Constructing LoopRunner(loop=loop) without a running loop is deprecated", - DeprecationWarning, - stacklevel=2, - ) - self._loop = loop + except RuntimeError as e: + raise RuntimeError( + "Constructing LoopRunner(asynchronous=True) without a running loop is not supported" + ) from e + loop = IOLoop.current() + elif not loop.asyncio_loop.is_running(): # type: ignore[attr-defined] + # LoopRunner is not responsible for starting a foriegn IOLoop + raise RuntimeError( + "Constructing LoopRunner(loop=loop) without a running loop is not supported" + ) + + self._loop = loop self._asynchronous = asynchronous - self._loop_thread = None + self._loop_thread: _CollectErrorThread | None = None self._started = False - with self._lock: - self._all_loops.setdefault(self._loop, (0, None)) + self._stop_event = LateLoopEvent() def start(self): """ @@ -470,62 +518,53 @@ def start(self): with self._lock: self._start_unlocked() - def _start_unlocked(self): + def _start_unlocked(self) -> None: assert not self._started - count, real_runner = self._all_loops[self._loop] - if self._asynchronous or real_runner is not None or count > 0: + if self._loop is not None: + try: + count, real_runner = self._all_loops[self._loop] + except KeyError: + assert self._loop.asyncio_loop.is_running() # type: ignore[attr-defined] + self._started = True + return + self._all_loops[self._loop] = count + 1, real_runner self._started = True return assert self._loop_thread is None - assert count == 0 - loop_evt = threading.Event() - done_evt = threading.Event() - in_thread = [None] - start_exc = [None] + start_evt = threading.Event() + start_exc = None + loop = None - def loop_cb(): - in_thread[0] = threading.current_thread() - loop_evt.set() + async def amain() -> None: + nonlocal loop + loop = IOLoop.current() + start_evt.set() + await self._stop_event.wait() - def run_loop(loop=self._loop): - loop.add_callback(loop_cb) - # run loop forever if it's not running already + def run_loop() -> None: + nonlocal start_exc try: - if not loop.asyncio_loop.is_running(): - loop.start() - except Exception as e: - start_exc[0] = e - finally: - done_evt.set() - - thread = threading.Thread(target=run_loop, name="IO loop") - thread.daemon = True - thread.start() - - loop_evt.wait(timeout=10) + asyncio.run(amain()) + except BaseException as e: + if start_evt.is_set(): + raise + start_exc = e + start_evt.set() + + self._loop_thread = _CollectErrorThread( + target=run_loop, daemon=True, name="IO loop" + ) + start_evt.wait(timeout=10) + if start_exc is not None: + raise start_exc + assert loop is not None + self._loop = loop self._started = True - - actual_thread = in_thread[0] - if actual_thread is not thread: - # Loop already running in other thread (user-launched) - done_evt.wait(5) - if start_exc[0] is not None and not isinstance(start_exc[0], RuntimeError): - if not isinstance( - start_exc[0], Exception - ): # track down infrequent error - raise TypeError( - f"not an exception: {start_exc[0]!r}", - ) - raise start_exc[0] - self._all_loops[self._loop] = count + 1, None - else: - assert start_exc[0] is None, start_exc - self._loop_thread = thread - self._all_loops[self._loop] = count + 1, self + self._all_loops[loop] = (1, self) def stop(self, timeout=10): """ @@ -541,23 +580,25 @@ def _stop_unlocked(self, timeout): self._started = False - count, real_runner = self._all_loops[self._loop] + try: + count, real_runner = self._all_loops[self._loop] + except KeyError: + return + if count > 1: self._all_loops[self._loop] = count - 1, real_runner - else: - assert count == 1 - del self._all_loops[self._loop] - if real_runner is not None: - real_runner._real_stop(timeout) + return + + assert count == 1 + del self._all_loops[self._loop] + real_runner._real_stop(timeout) def _real_stop(self, timeout): assert self._loop_thread is not None if self._loop_thread is not None: try: - self._loop.add_callback(self._loop.stop) + self._loop.add_callback(self._stop_event.set) self._loop_thread.join(timeout=timeout) - with suppress(KeyError): # IOLoop can be missing - self._loop.close() finally: self._loop_thread = None @@ -584,11 +625,9 @@ def run_sync(self, func, *args, **kwargs): @property def loop(self): loop = self._loop - if not loop.asyncio_loop.is_running(): - warnings.warn( - "Accessing the loop property while the loop is not running is deprecated", - DeprecationWarning, - stacklevel=2, + if loop is None or not loop.asyncio_loop.is_running(): + raise RuntimeError( + "Accessing the loop property while the loop is not running is not supported" ) return self._loop From de7798a111a07d16f601c0efb8cc90a89789bf58 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Thu, 16 Feb 2023 12:10:30 +0000 Subject: [PATCH 02/10] Update distributed/deploy/cluster.py Co-authored-by: Gabe Joseph --- distributed/deploy/cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index 08006f550ee..74b40cfb717 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -524,7 +524,7 @@ def __enter__(self): def __exit__(self, exc_type, exc_value, traceback): aw = self.close() - assert aw is None + assert aw is None, aw def __await__(self): return self From 122cb55e5a8aade22e2df4134b4fc307255c81db Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sun, 4 Jun 2023 09:34:43 +0100 Subject: [PATCH 03/10] avoid redundant conditional guarded by an assertion --- distributed/utils.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/distributed/utils.py b/distributed/utils.py index 59246b49db8..b5d6f9cd24d 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -608,12 +608,11 @@ def _stop_unlocked(self, timeout): def _real_stop(self, timeout): assert self._loop_thread is not None - if self._loop_thread is not None: - try: - self._loop.add_callback(self._stop_event.set) - self._loop_thread.join(timeout=timeout) - finally: - self._loop_thread = None + try: + self._loop.add_callback(self._stop_event.set) + self._loop_thread.join(timeout=timeout) + finally: + self._loop_thread = None def is_started(self): """ From fed38814716e989393869e6cec06dad8cf6a98dc Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sun, 4 Jun 2023 09:44:22 +0100 Subject: [PATCH 04/10] test loop startup exceptions are propagated from LoopRunner thread --- distributed/tests/test_utils.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/distributed/tests/test_utils.py b/distributed/tests/test_utils.py index 606f4e319cb..7b07d9b10b9 100644 --- a/distributed/tests/test_utils.py +++ b/distributed/tests/test_utils.py @@ -16,6 +16,7 @@ from collections import deque from concurrent.futures import Executor, Future, ThreadPoolExecutor from time import sleep +from unittest import mock import pytest from tornado.ioloop import IOLoop @@ -525,6 +526,17 @@ async def test_loop_runner_gen(): await asyncio.sleep(0.01) +def test_loop_runner_exception_in_start(cleanup): + class MyException(Exception): + pass + + with ( + mock.patch("tornado.ioloop.IOLoop.current", side_effect=MyException), + pytest.raises(MyException), + ): + LoopRunner().start() + + @gen_test() async def test_all_quiet_exceptions(): class CustomError(Exception): From 43841b52014fd87152fed7a8741f28f8cf602858 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sun, 4 Jun 2023 09:54:30 +0100 Subject: [PATCH 05/10] test loop teardown exceptions are propagated from LoopRunner thread --- distributed/tests/test_utils.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/distributed/tests/test_utils.py b/distributed/tests/test_utils.py index 7b07d9b10b9..73329a81b33 100644 --- a/distributed/tests/test_utils.py +++ b/distributed/tests/test_utils.py @@ -537,6 +537,21 @@ class MyException(Exception): LoopRunner().start() +def test_loop_runner_exception_in_teardown(cleanup): + runner = LoopRunner() + runner.start() + + async def cancel_all_tasks(): + current_task = asyncio.current_task() + for task in asyncio.all_tasks(): + if task is not current_task: + task.cancel() + + runner.run_sync(cancel_all_tasks) + with pytest.raises(asyncio.CancelledError): + runner.stop() + + @gen_test() async def test_all_quiet_exceptions(): class CustomError(Exception): From 0faad4fb5e3b149b98b395cf08c69931cd194f53 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 13 Jun 2023 17:51:29 +0100 Subject: [PATCH 06/10] fix typo in distributed/utils.py Co-authored-by: crusaderky --- distributed/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/distributed/utils.py b/distributed/utils.py index b5d6f9cd24d..658dabe40f7 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -510,7 +510,7 @@ def __init__(self, loop: IOLoop | None = None, asynchronous: bool = False): ) from e loop = IOLoop.current() elif not loop.asyncio_loop.is_running(): # type: ignore[attr-defined] - # LoopRunner is not responsible for starting a foriegn IOLoop + # LoopRunner is not responsible for starting a foreign IOLoop raise RuntimeError( "Constructing LoopRunner(loop=loop) without a running loop is not supported" ) From 7841923d879e170b0f7880cfbdf0f55cc0f22765 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 13 Jun 2023 17:53:04 +0100 Subject: [PATCH 07/10] move type annotations from constructor to class definition Co-authored-by: crusaderky --- distributed/utils.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/distributed/utils.py b/distributed/utils.py index 658dabe40f7..53484577688 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -429,8 +429,10 @@ def wait(timeout): # as late as possible: when calling any methods that wait on or wake # Future instances. See: https://bugs.python.org/issue42392 class LateLoopEvent: + _event: asyncio.Event | None + def __init__(self) -> None: - self._event: asyncio.Event | None = None + self._event = None def set(self) -> None: if self._event is None: @@ -497,6 +499,7 @@ class LoopRunner: weakref.WeakKeyDictionary[IOLoop, tuple[int, LoopRunner | None]] ] = weakref.WeakKeyDictionary() _lock = threading.Lock() + _loop_thread: _CollectErrorThread | None def __init__(self, loop: IOLoop | None = None, asynchronous: bool = False): if loop is None: @@ -517,7 +520,7 @@ def __init__(self, loop: IOLoop | None = None, asynchronous: bool = False): self._loop = loop self._asynchronous = asynchronous - self._loop_thread: _CollectErrorThread | None = None + self._loop_thread = None self._started = False self._stop_event = LateLoopEvent() From 42f8bade85d30aabb94665f9b4a69b3c44374fc0 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Tue, 13 Jun 2023 17:54:32 +0100 Subject: [PATCH 08/10] change Cluster.close() return type annotation to Any Co-authored-by: crusaderky --- distributed/deploy/cluster.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index 826da4d4c5f..f809a9ab8f9 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -5,7 +5,6 @@ import logging import uuid import warnings -from collections.abc import Awaitable from contextlib import suppress from inspect import isawaitable from typing import Any @@ -207,7 +206,7 @@ async def _close(self): self.status = Status.closed - def close(self, timeout: float | None = None) -> Awaitable[None] | None: + def close(self, timeout: float | None = None) -> Any: # If the cluster is already closed, we're already done if self.status == Status.closed: if self.asynchronous: From 1a918e0f4d70393299c2e9ffeba71453ce1bd596 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Thu, 15 Jun 2023 11:28:02 +0100 Subject: [PATCH 09/10] improve error message when using async Cluster as sync cmgr --- distributed/deploy/cluster.py | 5 +++++ distributed/deploy/tests/test_cluster.py | 10 ++++++++++ 2 files changed, 15 insertions(+) diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index f809a9ab8f9..9c15400f2fa 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -520,6 +520,11 @@ def _ipython_display_(self, **kwargs): display(mimebundle, raw=True) def __enter__(self): + if self.asynchronous: + raise TypeError( + "Used 'with' with asynchronous class; please use 'async with'" + ) + return self.sync(self.__aenter__) def __exit__(self, exc_type, exc_value, traceback): diff --git a/distributed/deploy/tests/test_cluster.py b/distributed/deploy/tests/test_cluster.py index 5533916072e..bb4682099fb 100644 --- a/distributed/deploy/tests/test_cluster.py +++ b/distributed/deploy/tests/test_cluster.py @@ -77,3 +77,13 @@ def test_exponential_backoff(): assert _exponential_backoff(5, 1.5, 3, 20) == 20 # avoid overflow assert _exponential_backoff(1000, 1.5, 3, 20) == 20 + + +@gen_test() +async def test_sync_context_manager_used_with_async_cluster(): + async with Cluster(asynchronous=True, name="A") as cluster: + with pytest.raises( + TypeError, + match=r"Used 'with' with asynchronous class; please use 'async with'", + ), cluster: + pass From f18f9e7aeccded91dc574140b459071bba98252f Mon Sep 17 00:00:00 2001 From: crusaderky Date: Fri, 16 Jun 2023 10:43:48 +0100 Subject: [PATCH 10/10] Remove unnecessary code path --- distributed/deploy/cluster.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index 9c15400f2fa..f0e5bd595c1 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -6,7 +6,6 @@ import uuid import warnings from contextlib import suppress -from inspect import isawaitable from typing import Any from packaging.version import parse as parse_version @@ -540,9 +539,7 @@ async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_value, traceback): - f = self.close() - if isawaitable(f): - await f + await self.close() @property def scheduler_address(self) -> str: