diff --git a/distributed/actor.py b/distributed/actor.py index eaf5bb63f19..88b7421e7d5 100644 --- a/distributed/actor.py +++ b/distributed/actor.py @@ -1,9 +1,7 @@ from __future__ import annotations import abc -import asyncio import functools -import sys import threading from collections.abc import Awaitable, Generator from dataclasses import dataclass @@ -14,41 +12,13 @@ from distributed.client import Future from distributed.protocol import to_serialize -from distributed.utils import iscoroutinefunction, sync, thread_state +from distributed.utils import LateLoopEvent, iscoroutinefunction, sync, thread_state from distributed.utils_comm import WrappedKey from distributed.worker import get_client, get_worker _T = TypeVar("_T") -if sys.version_info >= (3, 10): - from asyncio import Event as _LateLoopEvent -else: - # In python 3.10 asyncio.Lock and other primitives no longer support - # passing a loop kwarg to bind to a loop running in another thread - # e.g. calling from Client(asynchronous=False). Instead the loop is bound - # as late as possible: when calling any methods that wait on or wake - # Future instances. See: https://bugs.python.org/issue42392 - class _LateLoopEvent: - def __init__(self) -> None: - self._event: asyncio.Event | None = None - - def set(self) -> None: - if self._event is None: - self._event = asyncio.Event() - - self._event.set() - - def is_set(self) -> bool: - return self._event is not None and self._event.is_set() - - async def wait(self) -> bool: - if self._event is None: - self._event = asyncio.Event() - - return await self._event.wait() - - class Actor(WrappedKey): """Controls an object on a remote worker @@ -322,7 +292,7 @@ def unwrap(self) -> NoReturn: class ActorFuture(BaseActorFuture[_T]): def __init__(self, io_loop: IOLoop): self._io_loop = io_loop - self._event = _LateLoopEvent() + self._event = LateLoopEvent() self._out: _Error | _OK[_T] | None = None def __await__(self) -> Generator[object, None, _T]: diff --git a/distributed/deploy/cluster.py b/distributed/deploy/cluster.py index 87e5b818d0a..f0e5bd595c1 100644 --- a/distributed/deploy/cluster.py +++ b/distributed/deploy/cluster.py @@ -6,7 +6,6 @@ import uuid import warnings from contextlib import suppress -from inspect import isawaitable from typing import Any from packaging.version import parse as parse_version @@ -206,16 +205,17 @@ async def _close(self): self.status = Status.closed - def close(self, timeout=None): + def close(self, timeout: float | None = None) -> Any: # If the cluster is already closed, we're already done if self.status == Status.closed: if self.asynchronous: return NoOpAwaitable() - else: - return + return None - with suppress(RuntimeError): # loop closed during process shutdown + try: return self.sync(self._close, callback_timeout=timeout) + except RuntimeError: # loop closed during process shutdown + return None def __del__(self, _warn=warnings.warn): if getattr(self, "status", Status.closed) != Status.closed: @@ -519,10 +519,16 @@ def _ipython_display_(self, **kwargs): display(mimebundle, raw=True) def __enter__(self): + if self.asynchronous: + raise TypeError( + "Used 'with' with asynchronous class; please use 'async with'" + ) + return self.sync(self.__aenter__) def __exit__(self, exc_type, exc_value, traceback): - return self.sync(self.__aexit__, exc_type, exc_value, traceback) + aw = self.close() + assert aw is None, aw def __await__(self): return self @@ -533,9 +539,7 @@ async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_value, traceback): - f = self.close() - if isawaitable(f): - await f + await self.close() @property def scheduler_address(self) -> str: diff --git a/distributed/deploy/spec.py b/distributed/deploy/spec.py index 23bfa3431c4..fe51468a68a 100644 --- a/distributed/deploy/spec.py +++ b/distributed/deploy/spec.py @@ -293,8 +293,15 @@ def __init__( self.sync(self._correct_state) except Exception: self.sync(self.close) + self._loop_runner.stop() raise + def close(self, timeout: float | None = None) -> Awaitable[None] | None: + aw = super().close(timeout) + if not self.asynchronous: + self._loop_runner.stop() + return aw + async def _start(self): while self.status == Status.starting: await asyncio.sleep(0.01) @@ -472,10 +479,6 @@ async def __aenter__(self): assert self.status == Status.running return self - def __exit__(self, exc_type, exc_value, traceback): - super().__exit__(exc_type, exc_value, traceback) - self._loop_runner.stop() - def _threads_per_worker(self) -> int: """Return the number of threads per worker for new workers""" if not self.new_spec: # pragma: no cover diff --git a/distributed/deploy/tests/test_adaptive.py b/distributed/deploy/tests/test_adaptive.py index c3598897ac6..fcc15ee0e90 100644 --- a/distributed/deploy/tests/test_adaptive.py +++ b/distributed/deploy/tests/test_adaptive.py @@ -279,8 +279,6 @@ async def test_no_more_workers_than_tasks(): assert len(cluster.scheduler.workers) <= 1 -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") def test_basic_no_loop(cleanup): loop = None try: @@ -293,8 +291,7 @@ def test_basic_no_loop(cleanup): assert future.result() == 2 loop = cluster.loop finally: - if loop is not None: - loop.add_callback(loop.stop) + assert loop is None or not loop.asyncio_loop.is_running() @pytest.mark.flaky(condition=LINUX, reruns=10, reruns_delay=5) diff --git a/distributed/deploy/tests/test_cluster.py b/distributed/deploy/tests/test_cluster.py index 5533916072e..bb4682099fb 100644 --- a/distributed/deploy/tests/test_cluster.py +++ b/distributed/deploy/tests/test_cluster.py @@ -77,3 +77,13 @@ def test_exponential_backoff(): assert _exponential_backoff(5, 1.5, 3, 20) == 20 # avoid overflow assert _exponential_backoff(1000, 1.5, 3, 20) == 20 + + +@gen_test() +async def test_sync_context_manager_used_with_async_cluster(): + async with Cluster(asynchronous=True, name="A") as cluster: + with pytest.raises( + TypeError, + match=r"Used 'with' with asynchronous class; please use 'async with'", + ), cluster: + pass diff --git a/distributed/deploy/tests/test_spec_cluster.py b/distributed/deploy/tests/test_spec_cluster.py index 32e13a6db80..f875db0c3ed 100644 --- a/distributed/deploy/tests/test_spec_cluster.py +++ b/distributed/deploy/tests/test_spec_cluster.py @@ -82,8 +82,6 @@ def test_spec_sync(loop): assert result == 11 -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") def test_loop_started_in_constructor(cleanup): # test that SpecCluster.__init__ starts a loop in another thread cluster = SpecCluster(worker_spec, scheduler=scheduler, loop=None) diff --git a/distributed/tests/test_actor.py b/distributed/tests/test_actor.py index a3e6a84ad36..c44c0ec7f36 100644 --- a/distributed/tests/test_actor.py +++ b/distributed/tests/test_actor.py @@ -19,8 +19,8 @@ wait, worker_client, ) -from distributed.actor import _LateLoopEvent from distributed.metrics import time +from distributed.utils import LateLoopEvent from distributed.utils_test import cluster, double, gen_cluster, inc from distributed.worker import get_worker @@ -263,7 +263,7 @@ def test_sync(client): def test_timeout(client): class Waiter: def __init__(self): - self.event = _LateLoopEvent() + self.event = LateLoopEvent() async def set(self): self.event.set() @@ -555,7 +555,7 @@ def sleep(self, time): async def test_waiter(c, s, a, b): class Waiter: def __init__(self): - self.event = _LateLoopEvent() + self.event = LateLoopEvent() async def set(self): self.event.set() diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index 3cc40f1a0c9..0efbbe4bc3a 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -79,14 +79,7 @@ from distributed.scheduler import CollectTaskMetaDataPlugin, KilledWorker, Scheduler from distributed.shuffle import check_minimal_arrow_version from distributed.sizeof import sizeof -from distributed.utils import ( - NoOpAwaitable, - get_mp_context, - is_valid_xml, - open_port, - sync, - tmp_text, -) +from distributed.utils import get_mp_context, is_valid_xml, open_port, sync, tmp_text from distributed.utils_test import ( NO_AMM, BlockedGatherDep, @@ -2101,27 +2094,8 @@ async def test_multi_client(s, a, b): await asyncio.sleep(0.01) -@contextmanager -def _pristine_loop(): - IOLoop.clear_instance() - IOLoop.clear_current() - loop = IOLoop() - loop.make_current() - assert IOLoop.current() is loop - try: - yield loop - finally: - try: - loop.close(all_fds=True) - except (KeyError, ValueError): - pass - IOLoop.clear_instance() - IOLoop.clear_current() - - def long_running_client_connection(address): - with _pristine_loop(): - c = Client(address) + with Client(address, loop=None) as c: x = c.submit(lambda x: x + 1, 10) x.result() sleep(100) @@ -2774,8 +2748,6 @@ async def test_startup_close_startup(s, a, b): pass -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") def test_startup_close_startup_sync(loop): with cluster() as (s, [a, b]): with Client(s["address"], loop=loop) as c: @@ -5647,23 +5619,12 @@ async def test_future_auto_inform(c, s, a, b): await asyncio.sleep(0.01) -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:clear_current is deprecated:DeprecationWarning") def test_client_async_before_loop_starts(cleanup): - async def close(): - async with client: - pass - - with _pristine_loop() as loop: - with pytest.warns( - DeprecationWarning, - match=r"Constructing LoopRunner\(loop=loop\) without a running loop is deprecated", - ): - client = Client(asynchronous=True, loop=loop) - assert client.asynchronous - assert isinstance(client.close(), NoOpAwaitable) - loop.run_sync(close) # TODO: client.close() does not unset global client + with pytest.raises( + RuntimeError, + match=r"Constructing LoopRunner\(asynchronous=True\) without a running loop is not supported", + ): + client = Client(asynchronous=True, loop=None) @pytest.mark.slow @@ -7105,8 +7066,6 @@ async def test_workers_collection_restriction(c, s, a, b): assert a.data and not b.data -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") @gen_cluster(client=True, nthreads=[("127.0.0.1", 1)]) async def test_get_client_functions_spawn_clusters(c, s, a): # see gh4565 diff --git a/distributed/tests/test_client_loop.py b/distributed/tests/test_client_loop.py index 9d8316cdd09..723031145cc 100644 --- a/distributed/tests/test_client_loop.py +++ b/distributed/tests/test_client_loop.py @@ -2,8 +2,6 @@ import contextlib -import pytest - from distributed import Client, LocalCluster from distributed.utils import LoopRunner @@ -29,16 +27,12 @@ def _check_cluster_and_client_loop(loop): # Test if Client stops LoopRunner on close. -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") def test_close_loop_sync_start_new_loop(cleanup): with _check_loop_runner(): _check_cluster_and_client_loop(loop=None) # Test if Client stops LoopRunner on close. -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") def test_close_loop_sync_use_running_loop(cleanup): with _check_loop_runner(): # Start own loop or use current thread's one. diff --git a/distributed/tests/test_utils.py b/distributed/tests/test_utils.py index 0157257c04a..73329a81b33 100644 --- a/distributed/tests/test_utils.py +++ b/distributed/tests/test_utils.py @@ -16,6 +16,7 @@ from collections import deque from concurrent.futures import Executor, Future, ThreadPoolExecutor from time import sleep +from unittest import mock import pytest from tornado.ioloop import IOLoop @@ -393,57 +394,41 @@ def assert_not_running(loop): q.get(timeout=0.02) -_loop_not_running_property_warning = functools.partial( - pytest.warns, - DeprecationWarning, - match=r"Accessing the loop property while the loop is not running is deprecated", -) -_explicit_loop_is_not_running_warning = functools.partial( - pytest.warns, - DeprecationWarning, - match=r"Constructing LoopRunner\(loop=loop\) without a running loop is deprecated", -) -_implicit_loop_is_not_running_warning = functools.partial( - pytest.warns, - DeprecationWarning, - match=r"Constructing a LoopRunner\(asynchronous=True\) without a running loop is deprecated", -) - - -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") def test_loop_runner(loop_in_thread): # Implicit loop - loop = IOLoop() - loop.make_current() - runner = LoopRunner() - with _loop_not_running_property_warning(): - assert runner.loop not in (loop, loop_in_thread) + async def make_looprunner_in_async_context(): + return IOLoop.current(), LoopRunner() + + loop, runner = asyncio.run(make_looprunner_in_async_context()) + with pytest.raises( + RuntimeError, + match=r"Accessing the loop property while the loop is not running is not supported", + ): + runner.loop assert not runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) runner.start() assert runner.is_started() assert_running(runner.loop) + assert runner.loop is not loop runner.stop() assert not runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) + with pytest.raises( + RuntimeError, + match=r"Accessing the loop property while the loop is not running is not supported", + ): + runner.loop + + async def make_io_loop_in_async_context(): + # calling IOLoop() raises DeprecationWarning: There is no current event loop + return IOLoop.current() # Explicit loop - loop = IOLoop() - with _explicit_loop_is_not_running_warning(): - runner = LoopRunner(loop=loop) - with _loop_not_running_property_warning(): - assert runner.loop is loop - assert not runner.is_started() - assert_not_running(loop) - runner.start() - assert runner.is_started() - assert_running(loop) - runner.stop() - assert not runner.is_started() - assert_not_running(loop) + loop = asyncio.run(make_io_loop_in_async_context()) + with pytest.raises( + RuntimeError, + match=r"Constructing LoopRunner\(loop=loop\) without a running loop is not supported", + ): + LoopRunner(loop=loop) # Explicit loop, already started runner = LoopRunner(loop=loop_in_thread) @@ -457,57 +442,30 @@ def test_loop_runner(loop_in_thread): assert_running(loop_in_thread) # Implicit loop, asynchronous=True - loop = IOLoop() - loop.make_current() - with _implicit_loop_is_not_running_warning(): - runner = LoopRunner(asynchronous=True) - with _loop_not_running_property_warning(): - assert runner.loop is loop - assert not runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) - runner.start() - assert runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) - runner.stop() - assert not runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) - - # Explicit loop, asynchronous=True - loop = IOLoop() - with _explicit_loop_is_not_running_warning(): - runner = LoopRunner(loop=loop, asynchronous=True) - with _loop_not_running_property_warning(): - assert runner.loop is loop - assert not runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) - runner.start() - assert runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) - runner.stop() - assert not runner.is_started() - with _loop_not_running_property_warning(): - assert_not_running(runner.loop) + with pytest.raises( + RuntimeError, + match=r"Constructing LoopRunner\(asynchronous=True\) without a running loop is not supported", + ): + LoopRunner(asynchronous=True) + + # Explicit loop + loop = asyncio.run(make_io_loop_in_async_context()) + with pytest.raises( + RuntimeError, + match=r"Constructing LoopRunner\(loop=loop\) without a running loop is not supported", + ): + LoopRunner(loop=loop, asynchronous=True) -@pytest.mark.filterwarnings("ignore:There is no current event loop:DeprecationWarning") -@pytest.mark.filterwarnings("ignore:make_current is deprecated:DeprecationWarning") def test_two_loop_runners(loop_in_thread): # Loop runners tied to the same loop should cooperate # ABCCBA - loop = IOLoop() - with _explicit_loop_is_not_running_warning(): - a = LoopRunner(loop=loop) - with _explicit_loop_is_not_running_warning(): - b = LoopRunner(loop=loop) - assert_not_running(loop) + a = LoopRunner() a.start() + loop = a.loop assert_running(loop) + b = LoopRunner(loop=loop) c = LoopRunner(loop=loop) b.start() assert_running(loop) @@ -521,13 +479,12 @@ def test_two_loop_runners(loop_in_thread): assert_not_running(loop) # ABCABC - loop = IOLoop() - with _explicit_loop_is_not_running_warning(): - a = LoopRunner(loop=loop) - with _explicit_loop_is_not_running_warning(): - b = LoopRunner(loop=loop) - assert_not_running(loop) + a = LoopRunner() a.start() + loop = a.loop + assert_running(loop) + b = LoopRunner(loop=loop) + c = LoopRunner(loop=loop) assert_running(loop) b.start() assert_running(loop) @@ -569,6 +526,32 @@ async def test_loop_runner_gen(): await asyncio.sleep(0.01) +def test_loop_runner_exception_in_start(cleanup): + class MyException(Exception): + pass + + with ( + mock.patch("tornado.ioloop.IOLoop.current", side_effect=MyException), + pytest.raises(MyException), + ): + LoopRunner().start() + + +def test_loop_runner_exception_in_teardown(cleanup): + runner = LoopRunner() + runner.start() + + async def cancel_all_tasks(): + current_task = asyncio.current_task() + for task in asyncio.all_tasks(): + if task is not current_task: + task.cancel() + + runner.run_sync(cancel_all_tasks) + with pytest.raises(asyncio.CancelledError): + runner.stop() + + @gen_test() async def test_all_quiet_exceptions(): class CustomError(Exception): diff --git a/distributed/utils.py b/distributed/utils.py index 540af0f93be..53484577688 100644 --- a/distributed/utils.py +++ b/distributed/utils.py @@ -420,6 +420,63 @@ def wait(timeout): return result +if sys.version_info >= (3, 10): + from asyncio import Event as LateLoopEvent +else: + # In python 3.10 asyncio.Lock and other primitives no longer support + # passing a loop kwarg to bind to a loop running in another thread + # e.g. calling from Client(asynchronous=False). Instead the loop is bound + # as late as possible: when calling any methods that wait on or wake + # Future instances. See: https://bugs.python.org/issue42392 + class LateLoopEvent: + _event: asyncio.Event | None + + def __init__(self) -> None: + self._event = None + + def set(self) -> None: + if self._event is None: + self._event = asyncio.Event() + + self._event.set() + + def is_set(self) -> bool: + return self._event is not None and self._event.is_set() + + async def wait(self) -> bool: + if self._event is None: + self._event = asyncio.Event() + + return await self._event.wait() + + +class _CollectErrorThread: + def __init__(self, target: Callable[[], None], daemon: bool, name: str): + self._exception: BaseException | None = None + + def wrapper() -> None: + try: + target() + except BaseException as e: + self._exception = e + + self._thread = thread = threading.Thread( + target=wrapper, daemon=daemon, name=name + ) + thread.start() + + def join(self, timeout: float | None = None) -> None: + thread = self._thread + thread.join(timeout=timeout) + if thread.is_alive(): + raise TimeoutError("join timed out") + if self._exception is not None: + try: + raise self._exception + finally: # remove a reference cycle + del self._exception + + class LoopRunner: """ A helper to start and stop an IO loop in a controlled way. @@ -442,36 +499,30 @@ class LoopRunner: weakref.WeakKeyDictionary[IOLoop, tuple[int, LoopRunner | None]] ] = weakref.WeakKeyDictionary() _lock = threading.Lock() + _loop_thread: _CollectErrorThread | None - def __init__(self, loop=None, asynchronous=False): + def __init__(self, loop: IOLoop | None = None, asynchronous: bool = False): if loop is None: if asynchronous: + # raises RuntimeError if there's no running loop try: asyncio.get_running_loop() - except RuntimeError: - warnings.warn( - "Constructing a LoopRunner(asynchronous=True) without a running loop is deprecated", - DeprecationWarning, - stacklevel=2, - ) - self._loop = IOLoop.current() - else: - # We're expecting the loop to run in another thread, - # avoid re-using this thread's assigned loop - self._loop = IOLoop() - else: - if not loop.asyncio_loop.is_running(): - warnings.warn( - "Constructing LoopRunner(loop=loop) without a running loop is deprecated", - DeprecationWarning, - stacklevel=2, - ) - self._loop = loop + except RuntimeError as e: + raise RuntimeError( + "Constructing LoopRunner(asynchronous=True) without a running loop is not supported" + ) from e + loop = IOLoop.current() + elif not loop.asyncio_loop.is_running(): # type: ignore[attr-defined] + # LoopRunner is not responsible for starting a foreign IOLoop + raise RuntimeError( + "Constructing LoopRunner(loop=loop) without a running loop is not supported" + ) + + self._loop = loop self._asynchronous = asynchronous self._loop_thread = None self._started = False - with self._lock: - self._all_loops.setdefault(self._loop, (0, None)) + self._stop_event = LateLoopEvent() def start(self): """ @@ -483,62 +534,53 @@ def start(self): with self._lock: self._start_unlocked() - def _start_unlocked(self): + def _start_unlocked(self) -> None: assert not self._started - count, real_runner = self._all_loops[self._loop] - if self._asynchronous or real_runner is not None or count > 0: + if self._loop is not None: + try: + count, real_runner = self._all_loops[self._loop] + except KeyError: + assert self._loop.asyncio_loop.is_running() # type: ignore[attr-defined] + self._started = True + return + self._all_loops[self._loop] = count + 1, real_runner self._started = True return assert self._loop_thread is None - assert count == 0 - loop_evt = threading.Event() - done_evt = threading.Event() - in_thread = [None] - start_exc = [None] + start_evt = threading.Event() + start_exc = None + loop = None - def loop_cb(): - in_thread[0] = threading.current_thread() - loop_evt.set() + async def amain() -> None: + nonlocal loop + loop = IOLoop.current() + start_evt.set() + await self._stop_event.wait() - def run_loop(loop=self._loop): - loop.add_callback(loop_cb) - # run loop forever if it's not running already + def run_loop() -> None: + nonlocal start_exc try: - if not loop.asyncio_loop.is_running(): - loop.start() - except Exception as e: - start_exc[0] = e - finally: - done_evt.set() - - thread = threading.Thread(target=run_loop, name="IO loop") - thread.daemon = True - thread.start() - - loop_evt.wait(timeout=10) + asyncio.run(amain()) + except BaseException as e: + if start_evt.is_set(): + raise + start_exc = e + start_evt.set() + + self._loop_thread = _CollectErrorThread( + target=run_loop, daemon=True, name="IO loop" + ) + start_evt.wait(timeout=10) + if start_exc is not None: + raise start_exc + assert loop is not None + self._loop = loop self._started = True - - actual_thread = in_thread[0] - if actual_thread is not thread: - # Loop already running in other thread (user-launched) - done_evt.wait(5) - if start_exc[0] is not None and not isinstance(start_exc[0], RuntimeError): - if not isinstance( - start_exc[0], Exception - ): # track down infrequent error - raise TypeError( - f"not an exception: {start_exc[0]!r}", - ) - raise start_exc[0] - self._all_loops[self._loop] = count + 1, None - else: - assert start_exc[0] is None, start_exc - self._loop_thread = thread - self._all_loops[self._loop] = count + 1, self + self._all_loops[loop] = (1, self) def stop(self, timeout=10): """ @@ -554,25 +596,26 @@ def _stop_unlocked(self, timeout): self._started = False - count, real_runner = self._all_loops[self._loop] + try: + count, real_runner = self._all_loops[self._loop] + except KeyError: + return + if count > 1: self._all_loops[self._loop] = count - 1, real_runner - else: - assert count == 1 - del self._all_loops[self._loop] - if real_runner is not None: - real_runner._real_stop(timeout) + return + + assert count == 1 + del self._all_loops[self._loop] + real_runner._real_stop(timeout) def _real_stop(self, timeout): assert self._loop_thread is not None - if self._loop_thread is not None: - try: - self._loop.add_callback(self._loop.stop) - self._loop_thread.join(timeout=timeout) - with contextlib.suppress(KeyError): # IOLoop can be missing - self._loop.close() - finally: - self._loop_thread = None + try: + self._loop.add_callback(self._stop_event.set) + self._loop_thread.join(timeout=timeout) + finally: + self._loop_thread = None def is_started(self): """ @@ -597,11 +640,9 @@ def run_sync(self, func, *args, **kwargs): @property def loop(self): loop = self._loop - if not loop.asyncio_loop.is_running(): - warnings.warn( - "Accessing the loop property while the loop is not running is deprecated", - DeprecationWarning, - stacklevel=2, + if loop is None or not loop.asyncio_loop.is_running(): + raise RuntimeError( + "Accessing the loop property while the loop is not running is not supported" ) return self._loop