diff --git a/distributed/actor.py b/distributed/actor.py index 351ef5a1dc0..9c838169bd5 100644 --- a/distributed/actor.py +++ b/distributed/actor.py @@ -1,10 +1,9 @@ from __future__ import annotations import abc -import asyncio import functools -import sys import threading +from collections.abc import Awaitable, Generator from dataclasses import dataclass from datetime import timedelta from typing import Generic, Literal, NoReturn, TypeVar @@ -13,44 +12,12 @@ from distributed.client import Future from distributed.protocol import to_serialize -from distributed.utils import iscoroutinefunction, sync, thread_state +from distributed.utils import LateLoopEvent, iscoroutinefunction, sync, thread_state from distributed.utils_comm import WrappedKey from distributed.worker import get_client, get_worker _T = TypeVar("_T") -if sys.version_info >= (3, 9): - from collections.abc import Awaitable, Generator -else: - from typing import Awaitable, Generator - -if sys.version_info >= (3, 10): - from asyncio import Event as _LateLoopEvent -else: - # In python 3.10 asyncio.Lock and other primitives no longer support - # passing a loop kwarg to bind to a loop running in another thread - # e.g. calling from Client(asynchronous=False). Instead the loop is bound - # as late as possible: when calling any methods that wait on or wake - # Future instances. See: https://bugs.python.org/issue42392 - class _LateLoopEvent: - def __init__(self) -> None: - self._event: asyncio.Event | None = None - - def set(self) -> None: - if self._event is None: - self._event = asyncio.Event() - - self._event.set() - - def is_set(self) -> bool: - return self._event is not None and self._event.is_set() - - async def wait(self) -> bool: - if self._event is None: - self._event = asyncio.Event() - - return await self._event.wait() - class Actor(WrappedKey): """Controls an object on a remote worker @@ -318,7 +285,7 @@ def unwrap(self) -> NoReturn: class ActorFuture(BaseActorFuture[_T]): def __init__(self, io_loop: IOLoop): self._io_loop = io_loop - self._event = _LateLoopEvent() + self._event = LateLoopEvent() self._out: _Error | _OK[_T] | None = None def __await__(self) -> Generator[object, None, _T]: diff --git a/distributed/deploy/tests/test_adaptive.py b/distributed/deploy/tests/test_adaptive.py index 59ec50eaab8..1cfa087cd80 100644 --- a/distributed/deploy/tests/test_adaptive.py +++ b/distributed/deploy/tests/test_adaptive.py @@ -280,8 +280,6 @@ async def test_no_more_workers_than_tasks(): assert len(cluster.scheduler.workers) <= 1 -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") def test_basic_no_loop(cleanup): loop = None try: @@ -294,8 +292,7 @@ def test_basic_no_loop(cleanup): assert future.result() == 2 loop = cluster.loop finally: - if loop is not None: - loop.add_callback(loop.stop) + assert loop is None or not loop.asyncio_loop.is_running() @pytest.mark.flaky(condition=LINUX, reruns=10, reruns_delay=5) diff --git a/distributed/deploy/tests/test_spec_cluster.py b/distributed/deploy/tests/test_spec_cluster.py index e0e7301fcf8..afb8c62c7ab 100644 --- a/distributed/deploy/tests/test_spec_cluster.py +++ b/distributed/deploy/tests/test_spec_cluster.py @@ -82,8 +82,6 @@ def test_spec_sync(loop): assert result == 11 -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") def test_loop_started_in_constructor(cleanup): # test that SpecCluster.__init__ starts a loop in another thread cluster = SpecCluster(worker_spec, scheduler=scheduler, loop=None) diff --git a/distributed/tests/test_actor.py b/distributed/tests/test_actor.py index 9c8163af6a9..32ee7a28fe0 100644 --- a/distributed/tests/test_actor.py +++ b/distributed/tests/test_actor.py @@ -18,8 +18,8 @@ get_client, wait, ) -from distributed.actor import _LateLoopEvent from distributed.metrics import time +from distributed.utils import LateLoopEvent from distributed.utils_test import cluster, gen_cluster @@ -261,7 +261,7 @@ def test_sync(client): def test_timeout(client): class Waiter: def __init__(self): - self.event = _LateLoopEvent() + self.event = LateLoopEvent() async def set(self): self.event.set() @@ -553,7 +553,7 @@ def sleep(self, time): async def test_waiter(c, s, a, b): class Waiter: def __init__(self): - self.event = _LateLoopEvent() + self.event = LateLoopEvent() async def set(self): self.event.set() diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 57b070a1e2b..98244093187 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -77,14 +77,7 @@ from distributed.metrics import time from distributed.scheduler import CollectTaskMetaDataPlugin, KilledWorker, Scheduler from distributed.sizeof import sizeof -from distributed.utils import ( - NoOpAwaitable, - get_mp_context, - is_valid_xml, - open_port, - sync, - tmp_text, -) +from distributed.utils import get_mp_context, is_valid_xml, open_port, sync, tmp_text from distributed.utils_test import ( NO_AMM, BlockedGatherDep, @@ -2205,27 +2198,8 @@ async def test_multi_client(s, a, b): await asyncio.sleep(0.01) -@contextmanager -def _pristine_loop(): - IOLoop.clear_instance() - IOLoop.clear_current() - loop = IOLoop() - loop.make_current() - assert IOLoop.current() is loop - try: - yield loop - finally: - try: - loop.close(all_fds=True) - except (KeyError, ValueError): - pass - IOLoop.clear_instance() - IOLoop.clear_current() - - def long_running_client_connection(address): - with _pristine_loop(): - c = Client(address) + with Client(address, loop=None) as c: x = c.submit(lambda x: x + 1, 10) x.result() sleep(100) @@ -2888,8 +2862,6 @@ async def test_startup_close_startup(s, a, b): pass -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") def test_startup_close_startup_sync(loop): with cluster() as (s, [a, b]): with Client(s["address"], loop=loop) as c: @@ -5614,23 +5586,9 @@ async def test_future_auto_inform(c, s, a, b): await asyncio.sleep(0.01) -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:clear_current is deprecated:DeprecationWarning") def test_client_async_before_loop_starts(cleanup): - async def close(): - async with client: - pass - - with _pristine_loop() as loop: - with pytest.warns( - DeprecationWarning, - match=r"Constructing LoopRunner\(loop=loop\) without a running loop is deprecated", - ): - client = Client(asynchronous=True, loop=loop) - assert client.asynchronous - assert isinstance(client.close(), NoOpAwaitable) - loop.run_sync(close) # TODO: client.close() does not unset global client + with pytest.raises(RuntimeError, "no running event loop"): + client = Client(asynchronous=True, loop=None) @pytest.mark.slow @@ -7021,8 +6979,6 @@ async def test_workers_collection_restriction(c, s, a, b): assert a.data and not b.data -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)]) async def test_get_client_functions_spawn_clusters(c, s, a): # see gh4565 diff --git a/distributed/tests/test_client_loop.py b/distributed/tests/test_client_loop.py index 9d8316cdd09..723031145cc 100644 --- a/distributed/tests/test_client_loop.py +++ b/distributed/tests/test_client_loop.py @@ -2,8 +2,6 @@ import contextlib -import pytest - from distributed import Client, LocalCluster from distributed.utils import LoopRunner @@ -29,16 +27,12 @@ def _check_cluster_and_client_loop(loop): # Test if Client stops LoopRunner on close. -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") def test_close_loop_sync_start_new_loop(cleanup): with _check_loop_runner(): _check_cluster_and_client_loop(loop=None) # Test if Client stops LoopRunner on close. -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") def test_close_loop_sync_use_running_loop(cleanup): with _check_loop_runner(): # Start own loop or use current thread's one. diff --git a/distributed/tests/test_utils.py b/distributed/tests/test_utils.py index 685a0f1ed39..6a27df23cc9 100644 --- a/distributed/tests/test_utils.py +++ b/distributed/tests/test_utils.py @@ -389,57 +389,37 @@ def assert_not_running(loop): q.get(timeout=0.02) -_loop_not_running_property_warning = functools.partial( - pytest.warns, - DeprecationWarning, - match=r"Accessing the loop property while the loop is not running is deprecated", -) -_explicit_loop_is_not_running_warning = functools.partial( - pytest.warns, - DeprecationWarning, - match=r"Constructing LoopRunner\(loop=loop\) without a running loop is deprecated", -) -_implicit_loop_is_not_running_warning = functools.partial( - pytest.warns, - DeprecationWarning, - match=r"Constructing a LoopRunner\(asynchronous=True\) without a running loop is deprecated", -) - - -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") def test_loop_runner(loop_in_thread): # Implicit loop - loop = IOLoop() - loop.make_current() - runner = LoopRunner() - with _loop_not_running_property_warning(): - assert runner.loop not in (loop, loop_in_thread) + async def make_looprunner_in_async_context(): + return IOLoop.current(), LoopRunner() + + loop, runner = asyncio.run(make_looprunner_in_async_context()) + with pytest.raises( + RuntimeError, + match="accessing the loop while the loop is not running is not possible", + ): + runner.loop assert not runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) runner.start() assert runner.is_started() assert_running(runner.loop) + assert runner.loop is not loop runner.stop() assert not runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) + with pytest.raises( + RuntimeError, + match="accessing the loop while the loop is not running is not possible", + ): + runner.loop # Explicit loop loop = IOLoop() - with _explicit_loop_is_not_running_warning(): - runner = LoopRunner(loop=loop) - with _loop_not_running_property_warning(): - assert runner.loop is loop - assert not runner.is_started() - assert_not_running(loop) - runner.start() - assert runner.is_started() - assert_running(loop) - runner.stop() - assert not runner.is_started() - assert_not_running(loop) + with pytest.raises( + RuntimeError, + match=r"Constructing LoopRunner\(loop=loop\) without a running loop is not supported", + ): + LoopRunner(loop=loop) # Explicit loop, already started runner = LoopRunner(loop=loop_in_thread) @@ -453,57 +433,30 @@ def test_loop_runner(loop_in_thread): assert_running(loop_in_thread) # Implicit loop, asynchronous=True - loop = IOLoop() - loop.make_current() - with _implicit_loop_is_not_running_warning(): - runner = LoopRunner(asynchronous=True) - with _loop_not_running_property_warning(): - assert runner.loop is loop - assert not runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) - runner.start() - assert runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) - runner.stop() - assert not runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) + with pytest.raises( + RuntimeError, + r"Constructing a LoopRunner\(asynchronous=True\) without a running loop is not supported", + ): + LoopRunner(asynchronous=True) - # Explicit loop, asynchronous=True + # Explicit loop loop = IOLoop() - with _explicit_loop_is_not_running_warning(): - runner = LoopRunner(loop=loop, asynchronous=True) - with _loop_not_running_property_warning(): - assert runner.loop is loop - assert not runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) - runner.start() - assert runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) - runner.stop() - assert not runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) + with pytest.raises( + RuntimeError, + r"Constructing a LoopRunner\(asynchronous=True\) without a running loop is not supported", + ): + LoopRunner(loop=loop, asynchronous=True) -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") def test_two_loop_runners(loop_in_thread): # Loop runners tied to the same loop should cooperate # ABCCBA - loop = IOLoop() - with _explicit_loop_is_not_running_warning(): - a = LoopRunner(loop=loop) - with _explicit_loop_is_not_running_warning(): - b = LoopRunner(loop=loop) - assert_not_running(loop) + a = LoopRunner() a.start() + loop = a.loop assert_running(loop) + b = LoopRunner(loop=loop) c = LoopRunner(loop=loop) b.start() assert_running(loop) @@ -517,13 +470,12 @@ def test_two_loop_runners(loop_in_thread): assert_not_running(loop) # ABCABC - loop = IOLoop() - with _explicit_loop_is_not_running_warning(): - a = LoopRunner(loop=loop) - with _explicit_loop_is_not_running_warning(): - b = LoopRunner(loop=loop) - assert_not_running(loop) + a = LoopRunner() a.start() + loop = a.loop + assert_running(loop) + b = LoopRunner(loop=loop) + c = LoopRunner(loop=loop) assert_running(loop) b.start() assert_running(loop) diff --git a/distributed/utils.py b/distributed/utils.py index ba2f938340e..b1475cad847 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -407,6 +407,54 @@ def wait(timeout): return result +if sys.version_info >= (3, 10): + from asyncio import Event as LateLoopEvent +else: + # In python 3.10 asyncio.Lock and other primitives no longer support + # passing a loop kwarg to bind to a loop running in another thread + # e.g. calling from Client(asynchronous=False). Instead the loop is bound + # as late as possible: when calling any methods that wait on or wake + # Future instances. See: https://bugs.python.org/issue42392 + class LateLoopEvent: + def __init__(self) -> None: + self._event: asyncio.Event | None = None + + def set(self) -> None: + if self._event is None: + self._event = asyncio.Event() + + self._event.set() + + def is_set(self) -> bool: + return self._event is not None and self._event.is_set() + + async def wait(self) -> bool: + if self._event is None: + self._event = asyncio.Event() + + return await self._event.wait() + + +class _CollectErrorThread: + def __init__(self, fn, daemon, name): + def target(): + try: + fn() + except BaseException as e: + self._exception = e + + self._thread = thread = threading.Thread(target=fn, daemon=daemon, name=name) + thread.start() + + def join(self, timeout=None): + thread = self._thread + thread.join(timeout=timeout) + if thread.is_alive(): + raise TimeoutError("join timed out") + if self._exception is not None: + raise self._exception + + class LoopRunner: """ A helper to start and stop an IO loop in a controlled way. @@ -433,32 +481,23 @@ class LoopRunner: def __init__(self, loop=None, asynchronous=False): if loop is None: if asynchronous: - try: - asyncio.get_running_loop() - except RuntimeError: - warnings.warn( - "Constructing a LoopRunner(asynchronous=True) without a running loop is deprecated", - DeprecationWarning, - stacklevel=2, - ) - self._loop = IOLoop.current() + # raises RuntimeError if there's no running loop + asyncio.get_running_loop() + loop = IOLoop.current() else: # We're expecting the loop to run in another thread, - # avoid re-using this thread's assigned loop - self._loop = IOLoop() + # avoid re-using this thread's assigned loop by creating a new + # loop in the thread that will run it + loop = None else: - if not loop.asyncio_loop.is_running(): - warnings.warn( - "Constructing LoopRunner(loop=loop) without a running loop is deprecated", - DeprecationWarning, - stacklevel=2, - ) - self._loop = loop + # LoopRunner is not responsible for starting a foriegn IOLoop + assert loop.asyncio_loop.is_running() + + self._loop = loop self._asynchronous = asynchronous self._loop_thread = None self._started = False - with self._lock: - self._all_loops.setdefault(self._loop, (0, None)) + self._stop_event = LateLoopEvent() def start(self): """ @@ -473,59 +512,45 @@ def start(self): def _start_unlocked(self): assert not self._started - count, real_runner = self._all_loops[self._loop] - if self._asynchronous or real_runner is not None or count > 0: - self._all_loops[self._loop] = count + 1, real_runner - self._started = True - return + if self._loop is not None: + count, real_runner = self._all_loops.setdefault(self._loop, (0, None)) + if self._asynchronous or real_runner is not None or count > 0: + self._all_loops[self._loop] = count + 1, real_runner + self._started = True + return + assert self._loop.asyncio_loop.is_running() + else: + count = 0 assert self._loop_thread is None assert count == 0 - loop_evt = threading.Event() - done_evt = threading.Event() - in_thread = [None] - start_exc = [None] + start_evt = threading.Event() + start_exc = None - def loop_cb(): - in_thread[0] = threading.current_thread() - loop_evt.set() + async def amain(): + self._loop = IOLoop.current() + start_evt.set() + await self._stop_event.wait() - def run_loop(loop=self._loop): - loop.add_callback(loop_cb) - # run loop forever if it's not running already + def run_loop(): + nonlocal start_exc try: - if not loop.asyncio_loop.is_running(): - loop.start() - except Exception as e: - start_exc[0] = e - finally: - done_evt.set() - - thread = threading.Thread(target=run_loop, name="IO loop") - thread.daemon = True - thread.start() - - loop_evt.wait(timeout=10) + asyncio.run(amain()) + except BaseException as e: + if start_evt.set(): + raise + start_exc = e + start_evt.set() + + self._loop_thread = _CollectErrorThread( + target=run_loop, daemon=True, name="IO loop" + ) + start_evt.wait(timeout=10) + if start_exc is None: + raise start_exc self._started = True - - actual_thread = in_thread[0] - if actual_thread is not thread: - # Loop already running in other thread (user-launched) - done_evt.wait(5) - if start_exc[0] is not None and not isinstance(start_exc[0], RuntimeError): - if not isinstance( - start_exc[0], Exception - ): # track down infrequent error - raise TypeError( - f"not an exception: {start_exc[0]!r}", - ) - raise start_exc[0] - self._all_loops[self._loop] = count + 1, None - else: - assert start_exc[0] is None, start_exc - self._loop_thread = thread - self._all_loops[self._loop] = count + 1, self + self._all_loops.setdefault(self._loop, (count + 1, self)) def stop(self, timeout=10): """ @@ -541,23 +566,25 @@ def _stop_unlocked(self, timeout): self._started = False - count, real_runner = self._all_loops[self._loop] + try: + count, real_runner = self._all_loops[self._loop] + except KeyError: + return + if count > 1: self._all_loops[self._loop] = count - 1, real_runner - else: - assert count == 1 - del self._all_loops[self._loop] - if real_runner is not None: - real_runner._real_stop(timeout) + return + + assert count == 1 + del self._all_loops[self._loop] + real_runner._real_stop(timeout) def _real_stop(self, timeout): assert self._loop_thread is not None if self._loop_thread is not None: try: - self._loop.add_callback(self._loop.stop) + self._loop.add_callback(self._stop_event.set) self._loop_thread.join(timeout=timeout) - with suppress(KeyError): # IOLoop can be missing - self._loop.close() finally: self._loop_thread = None