diff --git a/distributed/actor.py b/distributed/actor.py index 351ef5a1dc..e07f6b2cee 100644 --- a/distributed/actor.py +++ b/distributed/actor.py @@ -1,10 +1,10 @@ from __future__ import annotations import abc -import asyncio import functools import sys import threading +from collections.abc import Generator from dataclasses import dataclass from datetime import timedelta from typing import Generic, Literal, NoReturn, TypeVar @@ -13,43 +13,16 @@ from distributed.client import Future from distributed.protocol import to_serialize -from distributed.utils import iscoroutinefunction, sync, thread_state +from distributed.utils import LateLoopEvent, iscoroutinefunction, sync, thread_state from distributed.utils_comm import WrappedKey from distributed.worker import get_client, get_worker -_T = TypeVar("_T") - if sys.version_info >= (3, 9): - from collections.abc import Awaitable, Generator + from collections.abc import Awaitable else: - from typing import Awaitable, Generator - -if sys.version_info >= (3, 10): - from asyncio import Event as _LateLoopEvent -else: - # In python 3.10 asyncio.Lock and other primitives no longer support - # passing a loop kwarg to bind to a loop running in another thread - # e.g. calling from Client(asynchronous=False). Instead the loop is bound - # as late as possible: when calling any methods that wait on or wake - # Future instances. See: https://bugs.python.org/issue42392 - class _LateLoopEvent: - def __init__(self) -> None: - self._event: asyncio.Event | None = None - - def set(self) -> None: - if self._event is None: - self._event = asyncio.Event() + from typing import Awaitable - self._event.set() - - def is_set(self) -> bool: - return self._event is not None and self._event.is_set() - - async def wait(self) -> bool: - if self._event is None: - self._event = asyncio.Event() - - return await self._event.wait() +_T = TypeVar("_T") class Actor(WrappedKey): @@ -318,7 +291,7 @@ def unwrap(self) -> NoReturn: class ActorFuture(BaseActorFuture[_T]): def __init__(self, io_loop: IOLoop): self._io_loop = io_loop - self._event = _LateLoopEvent() + self._event = LateLoopEvent() self._out: _Error | _OK[_T] | None = None def __await__(self) -> Generator[object, None, _T]: diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index ec46793e20..986c3eb456 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -5,6 +5,7 @@ import logging import uuid import warnings +from collections.abc import Awaitable from contextlib import suppress from inspect import isawaitable from typing import Any @@ -205,16 +206,17 @@ async def _close(self): self.status = Status.closed - def close(self, timeout=None): + def close(self, timeout: float | None = None) -> Awaitable[None] | None: # If the cluster is already closed, we're already done if self.status == Status.closed: if self.asynchronous: return NoOpAwaitable() - else: - return + return None - with suppress(RuntimeError): # loop closed during process shutdown + try: return self.sync(self._close, callback_timeout=timeout) + except RuntimeError: # loop closed during process shutdown + return None def __del__(self, _warn=warnings.warn): if getattr(self, "status", Status.closed) != Status.closed: @@ -522,7 +524,8 @@ def __enter__(self): return self.sync(self.__aenter__) def __exit__(self, exc_type, exc_value, traceback): - return self.sync(self.__aexit__, exc_type, exc_value, traceback) + aw = self.close() + assert aw is None def __await__(self): return self diff --git a/distributed/deploy/spec.py b/distributed/deploy/spec.py index ac5a8f366c..f5a349090f 100644 --- a/distributed/deploy/spec.py +++ b/distributed/deploy/spec.py @@ -288,8 +288,15 @@ def __init__( self.sync(self._correct_state) except Exception: self.sync(self.close) + self._loop_runner.stop() raise + def close(self, timeout: float | None = None) -> Awaitable[None] | None: + aw = super().close(timeout) + if not self.asynchronous: + self._loop_runner.stop() + return aw + async def _start(self): while self.status == Status.starting: await asyncio.sleep(0.01) @@ -471,10 +478,6 @@ async def __aenter__(self): assert self.status == Status.running return self - def __exit__(self, exc_type, exc_value, traceback): - super().__exit__(exc_type, exc_value, traceback) - self._loop_runner.stop() - def _threads_per_worker(self) -> int: """Return the number of threads per worker for new workers""" if not self.new_spec: # pragma: no cover diff --git a/distributed/deploy/tests/test_adaptive.py b/distributed/deploy/tests/test_adaptive.py index 59ec50eaab..1cfa087cd8 100644 --- a/distributed/deploy/tests/test_adaptive.py +++ b/distributed/deploy/tests/test_adaptive.py @@ -280,8 +280,6 @@ async def test_no_more_workers_than_tasks(): assert len(cluster.scheduler.workers) <= 1 -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") def test_basic_no_loop(cleanup): loop = None try: @@ -294,8 +292,7 @@ def test_basic_no_loop(cleanup): assert future.result() == 2 loop = cluster.loop finally: - if loop is not None: - loop.add_callback(loop.stop) + assert loop is None or not loop.asyncio_loop.is_running() @pytest.mark.flaky(condition=LINUX, reruns=10, reruns_delay=5) diff --git a/distributed/deploy/tests/test_spec_cluster.py b/distributed/deploy/tests/test_spec_cluster.py index e0e7301fcf..afb8c62c7a 100644 --- a/distributed/deploy/tests/test_spec_cluster.py +++ b/distributed/deploy/tests/test_spec_cluster.py @@ -82,8 +82,6 @@ def test_spec_sync(loop): assert result == 11 -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") def test_loop_started_in_constructor(cleanup): # test that SpecCluster.__init__ starts a loop in another thread cluster = SpecCluster(worker_spec, scheduler=scheduler, loop=None) diff --git a/distributed/tests/test_actor.py b/distributed/tests/test_actor.py index 9c8163af6a..32ee7a28fe 100644 --- a/distributed/tests/test_actor.py +++ b/distributed/tests/test_actor.py @@ -18,8 +18,8 @@ get_client, wait, ) -from distributed.actor import _LateLoopEvent from distributed.metrics import time +from distributed.utils import LateLoopEvent from distributed.utils_test import cluster, gen_cluster @@ -261,7 +261,7 @@ def test_sync(client): def test_timeout(client): class Waiter: def __init__(self): - self.event = _LateLoopEvent() + self.event = LateLoopEvent() async def set(self): self.event.set() @@ -553,7 +553,7 @@ def sleep(self, time): async def test_waiter(c, s, a, b): class Waiter: def __init__(self): - self.event = _LateLoopEvent() + self.event = LateLoopEvent() async def set(self): self.event.set() diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 57b070a1e2..9824409318 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -77,14 +77,7 @@ from distributed.metrics import time from distributed.scheduler import CollectTaskMetaDataPlugin, KilledWorker, Scheduler from distributed.sizeof import sizeof -from distributed.utils import ( - NoOpAwaitable, - get_mp_context, - is_valid_xml, - open_port, - sync, - tmp_text, -) +from distributed.utils import get_mp_context, is_valid_xml, open_port, sync, tmp_text from distributed.utils_test import ( NO_AMM, BlockedGatherDep, @@ -2205,27 +2198,8 @@ async def test_multi_client(s, a, b): await asyncio.sleep(0.01) -@contextmanager -def _pristine_loop(): - IOLoop.clear_instance() - IOLoop.clear_current() - loop = IOLoop() - loop.make_current() - assert IOLoop.current() is loop - try: - yield loop - finally: - try: - loop.close(all_fds=True) - except (KeyError, ValueError): - pass - IOLoop.clear_instance() - IOLoop.clear_current() - - def long_running_client_connection(address): - with _pristine_loop(): - c = Client(address) + with Client(address, loop=None) as c: x = c.submit(lambda x: x + 1, 10) x.result() sleep(100) @@ -2888,8 +2862,6 @@ async def test_startup_close_startup(s, a, b): pass -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") def test_startup_close_startup_sync(loop): with cluster() as (s, [a, b]): with Client(s["address"], loop=loop) as c: @@ -5614,23 +5586,9 @@ async def test_future_auto_inform(c, s, a, b): await asyncio.sleep(0.01) -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:clear_current is deprecated:DeprecationWarning") def test_client_async_before_loop_starts(cleanup): - async def close(): - async with client: - pass - - with _pristine_loop() as loop: - with pytest.warns( - DeprecationWarning, - match=r"Constructing LoopRunner\(loop=loop\) without a running loop is deprecated", - ): - client = Client(asynchronous=True, loop=loop) - assert client.asynchronous - assert isinstance(client.close(), NoOpAwaitable) - loop.run_sync(close) # TODO: client.close() does not unset global client + with pytest.raises(RuntimeError, "no running event loop"): + client = Client(asynchronous=True, loop=None) @pytest.mark.slow @@ -7021,8 +6979,6 @@ async def test_workers_collection_restriction(c, s, a, b): assert a.data and not b.data -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)]) async def test_get_client_functions_spawn_clusters(c, s, a): # see gh4565 diff --git a/distributed/tests/test_client_loop.py b/distributed/tests/test_client_loop.py index 9d8316cdd0..723031145c 100644 --- a/distributed/tests/test_client_loop.py +++ b/distributed/tests/test_client_loop.py @@ -2,8 +2,6 @@ import contextlib -import pytest - from distributed import Client, LocalCluster from distributed.utils import LoopRunner @@ -29,16 +27,12 @@ def _check_cluster_and_client_loop(loop): # Test if Client stops LoopRunner on close. -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") def test_close_loop_sync_start_new_loop(cleanup): with _check_loop_runner(): _check_cluster_and_client_loop(loop=None) # Test if Client stops LoopRunner on close. -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") def test_close_loop_sync_use_running_loop(cleanup): with _check_loop_runner(): # Start own loop or use current thread's one. diff --git a/distributed/tests/test_utils.py b/distributed/tests/test_utils.py index 685a0f1ed3..6a27df23cc 100644 --- a/distributed/tests/test_utils.py +++ b/distributed/tests/test_utils.py @@ -389,57 +389,37 @@ def assert_not_running(loop): q.get(timeout=0.02) -_loop_not_running_property_warning = functools.partial( - pytest.warns, - DeprecationWarning, - match=r"Accessing the loop property while the loop is not running is deprecated", -) -_explicit_loop_is_not_running_warning = functools.partial( - pytest.warns, - DeprecationWarning, - match=r"Constructing LoopRunner\(loop=loop\) without a running loop is deprecated", -) -_implicit_loop_is_not_running_warning = functools.partial( - pytest.warns, - DeprecationWarning, - match=r"Constructing a LoopRunner\(asynchronous=True\) without a running loop is deprecated", -) - - -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") def test_loop_runner(loop_in_thread): # Implicit loop - loop = IOLoop() - loop.make_current() - runner = LoopRunner() - with _loop_not_running_property_warning(): - assert runner.loop not in (loop, loop_in_thread) + async def make_looprunner_in_async_context(): + return IOLoop.current(), LoopRunner() + + loop, runner = asyncio.run(make_looprunner_in_async_context()) + with pytest.raises( + RuntimeError, + match="accessing the loop while the loop is not running is not possible", + ): + runner.loop assert not runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) runner.start() assert runner.is_started() assert_running(runner.loop) + assert runner.loop is not loop runner.stop() assert not runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) + with pytest.raises( + RuntimeError, + match="accessing the loop while the loop is not running is not possible", + ): + runner.loop # Explicit loop loop = IOLoop() - with _explicit_loop_is_not_running_warning(): - runner = LoopRunner(loop=loop) - with _loop_not_running_property_warning(): - assert runner.loop is loop - assert not runner.is_started() - assert_not_running(loop) - runner.start() - assert runner.is_started() - assert_running(loop) - runner.stop() - assert not runner.is_started() - assert_not_running(loop) + with pytest.raises( + RuntimeError, + match=r"Constructing LoopRunner\(loop=loop\) without a running loop is not supported", + ): + LoopRunner(loop=loop) # Explicit loop, already started runner = LoopRunner(loop=loop_in_thread) @@ -453,57 +433,30 @@ def test_loop_runner(loop_in_thread): assert_running(loop_in_thread) # Implicit loop, asynchronous=True - loop = IOLoop() - loop.make_current() - with _implicit_loop_is_not_running_warning(): - runner = LoopRunner(asynchronous=True) - with _loop_not_running_property_warning(): - assert runner.loop is loop - assert not runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) - runner.start() - assert runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) - runner.stop() - assert not runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) + with pytest.raises( + RuntimeError, + r"Constructing a LoopRunner\(asynchronous=True\) without a running loop is not supported", + ): + LoopRunner(asynchronous=True) - # Explicit loop, asynchronous=True + # Explicit loop loop = IOLoop() - with _explicit_loop_is_not_running_warning(): - runner = LoopRunner(loop=loop, asynchronous=True) - with _loop_not_running_property_warning(): - assert runner.loop is loop - assert not runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) - runner.start() - assert runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) - runner.stop() - assert not runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) + with pytest.raises( + RuntimeError, + r"Constructing a LoopRunner\(asynchronous=True\) without a running loop is not supported", + ): + LoopRunner(loop=loop, asynchronous=True) -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") def test_two_loop_runners(loop_in_thread): # Loop runners tied to the same loop should cooperate # ABCCBA - loop = IOLoop() - with _explicit_loop_is_not_running_warning(): - a = LoopRunner(loop=loop) - with _explicit_loop_is_not_running_warning(): - b = LoopRunner(loop=loop) - assert_not_running(loop) + a = LoopRunner() a.start() + loop = a.loop assert_running(loop) + b = LoopRunner(loop=loop) c = LoopRunner(loop=loop) b.start() assert_running(loop) @@ -517,13 +470,12 @@ def test_two_loop_runners(loop_in_thread): assert_not_running(loop) # ABCABC - loop = IOLoop() - with _explicit_loop_is_not_running_warning(): - a = LoopRunner(loop=loop) - with _explicit_loop_is_not_running_warning(): - b = LoopRunner(loop=loop) - assert_not_running(loop) + a = LoopRunner() a.start() + loop = a.loop + assert_running(loop) + b = LoopRunner(loop=loop) + c = LoopRunner(loop=loop) assert_running(loop) b.start() assert_running(loop) diff --git a/distributed/utils.py b/distributed/utils.py index ba2f938340..33827f9f9e 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -407,6 +407,61 @@ def wait(timeout): return result +if sys.version_info >= (3, 10): + from asyncio import Event as LateLoopEvent +else: + # In python 3.10 asyncio.Lock and other primitives no longer support + # passing a loop kwarg to bind to a loop running in another thread + # e.g. calling from Client(asynchronous=False). Instead the loop is bound + # as late as possible: when calling any methods that wait on or wake + # Future instances. See: https://bugs.python.org/issue42392 + class LateLoopEvent: + def __init__(self) -> None: + self._event: asyncio.Event | None = None + + def set(self) -> None: + if self._event is None: + self._event = asyncio.Event() + + self._event.set() + + def is_set(self) -> bool: + return self._event is not None and self._event.is_set() + + async def wait(self) -> bool: + if self._event is None: + self._event = asyncio.Event() + + return await self._event.wait() + + +class _CollectErrorThread: + def __init__(self, target: Callable[[], None], daemon: bool, name: str): + self._exception: BaseException | None = None + + def wrapper() -> None: + try: + target() + except BaseException as e: + self._exception = e + + self._thread = thread = threading.Thread( + target=wrapper, daemon=daemon, name=name + ) + thread.start() + + def join(self, timeout: float | None = None) -> None: + thread = self._thread + thread.join(timeout=timeout) + if thread.is_alive(): + raise TimeoutError("join timed out") + if self._exception is not None: + try: + raise self._exception + finally: # remove a reference cycle + del self._exception + + class LoopRunner: """ A helper to start and stop an IO loop in a controlled way. @@ -430,35 +485,21 @@ class LoopRunner: ] = weakref.WeakKeyDictionary() _lock = threading.Lock() - def __init__(self, loop=None, asynchronous=False): + def __init__(self, loop: IOLoop | None = None, asynchronous: bool = False): if loop is None: if asynchronous: - try: - asyncio.get_running_loop() - except RuntimeError: - warnings.warn( - "Constructing a LoopRunner(asynchronous=True) without a running loop is deprecated", - DeprecationWarning, - stacklevel=2, - ) - self._loop = IOLoop.current() - else: - # We're expecting the loop to run in another thread, - # avoid re-using this thread's assigned loop - self._loop = IOLoop() + # raises RuntimeError if there's no running loop + asyncio.get_running_loop() + loop = IOLoop.current() else: - if not loop.asyncio_loop.is_running(): - warnings.warn( - "Constructing LoopRunner(loop=loop) without a running loop is deprecated", - DeprecationWarning, - stacklevel=2, - ) - self._loop = loop + # LoopRunner is not responsible for starting a foriegn IOLoop + assert loop.asyncio_loop.is_running() # type: ignore[attr-defined] + + self._loop = loop self._asynchronous = asynchronous - self._loop_thread = None + self._loop_thread: _CollectErrorThread | None = None self._started = False - with self._lock: - self._all_loops.setdefault(self._loop, (0, None)) + self._stop_event = LateLoopEvent() def start(self): """ @@ -470,62 +511,53 @@ def start(self): with self._lock: self._start_unlocked() - def _start_unlocked(self): + def _start_unlocked(self) -> None: assert not self._started - count, real_runner = self._all_loops[self._loop] - if self._asynchronous or real_runner is not None or count > 0: + if self._loop is not None: + try: + count, real_runner = self._all_loops[self._loop] + except KeyError: + assert self._loop.asyncio_loop.is_running() # type: ignore[attr-defined] + self._started = True + return + self._all_loops[self._loop] = count + 1, real_runner self._started = True return assert self._loop_thread is None - assert count == 0 - loop_evt = threading.Event() - done_evt = threading.Event() - in_thread = [None] - start_exc = [None] + start_evt = threading.Event() + start_exc = None + loop = None - def loop_cb(): - in_thread[0] = threading.current_thread() - loop_evt.set() + async def amain() -> None: + nonlocal loop + loop = IOLoop.current() + start_evt.set() + await self._stop_event.wait() - def run_loop(loop=self._loop): - loop.add_callback(loop_cb) - # run loop forever if it's not running already + def run_loop() -> None: + nonlocal start_exc try: - if not loop.asyncio_loop.is_running(): - loop.start() - except Exception as e: - start_exc[0] = e - finally: - done_evt.set() - - thread = threading.Thread(target=run_loop, name="IO loop") - thread.daemon = True - thread.start() - - loop_evt.wait(timeout=10) + asyncio.run(amain()) + except BaseException as e: + if start_evt.is_set(): + raise + start_exc = e + start_evt.set() + + self._loop_thread = _CollectErrorThread( + target=run_loop, daemon=True, name="IO loop" + ) + start_evt.wait(timeout=10) + if start_exc is not None: + raise start_exc + assert loop is not None + self._loop = loop self._started = True - - actual_thread = in_thread[0] - if actual_thread is not thread: - # Loop already running in other thread (user-launched) - done_evt.wait(5) - if start_exc[0] is not None and not isinstance(start_exc[0], RuntimeError): - if not isinstance( - start_exc[0], Exception - ): # track down infrequent error - raise TypeError( - f"not an exception: {start_exc[0]!r}", - ) - raise start_exc[0] - self._all_loops[self._loop] = count + 1, None - else: - assert start_exc[0] is None, start_exc - self._loop_thread = thread - self._all_loops[self._loop] = count + 1, self + self._all_loops[loop] = (1, self) def stop(self, timeout=10): """ @@ -541,23 +573,25 @@ def _stop_unlocked(self, timeout): self._started = False - count, real_runner = self._all_loops[self._loop] + try: + count, real_runner = self._all_loops[self._loop] + except KeyError: + return + if count > 1: self._all_loops[self._loop] = count - 1, real_runner - else: - assert count == 1 - del self._all_loops[self._loop] - if real_runner is not None: - real_runner._real_stop(timeout) + return + + assert count == 1 + del self._all_loops[self._loop] + real_runner._real_stop(timeout) def _real_stop(self, timeout): assert self._loop_thread is not None if self._loop_thread is not None: try: - self._loop.add_callback(self._loop.stop) + self._loop.add_callback(self._stop_event.set) self._loop_thread.join(timeout=timeout) - with suppress(KeyError): # IOLoop can be missing - self._loop.close() finally: self._loop_thread = None