From 75b3ca8fc6e5f18ffa39c4bb84b431fc6967e720 Mon Sep 17 00:00:00 2001 From: drew2a Date: Fri, 7 Jul 2023 15:03:27 +0200 Subject: [PATCH 1/4] Fix `shutdown` --- .../libtorrent/download_manager/download.py | 22 +++++++++++++------ .../libtorrent/tests/test_download.py | 11 ++++++++++ 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/src/tribler/core/components/libtorrent/download_manager/download.py b/src/tribler/core/components/libtorrent/download_manager/download.py index 344492ce5c1..baa84466fd9 100644 --- a/src/tribler/core/components/libtorrent/download_manager/download.py +++ b/src/tribler/core/components/libtorrent/download_manager/download.py @@ -5,9 +5,11 @@ """ import asyncio import base64 +import itertools import logging from asyncio import CancelledError, Future, iscoroutine, sleep, wait_for from collections import defaultdict +from contextlib import suppress from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple from bitarray import bitarray @@ -31,6 +33,8 @@ from tribler.core.utilities.unicode import ensure_unicode, hexlify from tribler.core.utilities.utilities import bdecode_compat +Getter = Optional[Callable[[Any], Any]] + class Download(TaskManager): """ Download subclass that represents a libtorrent download.""" @@ -64,7 +68,7 @@ def __init__(self, self.checkpoint_after_next_hashcheck = False self.tracker_status = {} # {url: [num_peers, status_str]} - self.futures = defaultdict(list) + self.futures: Dict[str, list[tuple[Future, Callable, Getter]]] = defaultdict(list) self.alert_handlers = defaultdict(list) self.future_added = self.wait_for_alert('add_torrent_alert', lambda a: a.handle) @@ -100,7 +104,7 @@ def __init__(self, def __str__(self): return "Download " % \ - (self.tdef.get_name(), self.config.get_hops(), self.checkpoint_disabled) + (self.tdef.get_name(), self.config.get_hops(), self.checkpoint_disabled) def __repr__(self): return self.__str__() @@ -123,8 +127,8 @@ def get_torrent_data(self) -> Optional[object]: def register_alert_handler(self, alert_type: str, handler: lt.torrent_handle): self.alert_handlers[alert_type].append(handler) - def wait_for_alert(self, success_type: str, success_getter: Optional[Callable[[Any], Any]] = None, - fail_type: str = None, fail_getter: Optional[Callable[[Any], Any]] = None) -> Future: + def wait_for_alert(self, success_type: str, success_getter: Getter = None, + fail_type: str = None, fail_getter: Getter = None) -> Future: future = Future() if success_type: self.futures[success_type].append((future, future.set_result, success_getter)) @@ -631,12 +635,16 @@ async def state_callback_loop(): return self.register_anonymous_task("downloads_cb", state_callback_loop) async def shutdown(self): + self._logger.info('Shutting down...') self.alert_handlers.clear() if self.stream is not None: self.stream.close() - for _, futures in self.futures.items(): - for future, _, _ in futures: - future.cancel() + + active_futures = [f for f, _, _ in itertools.chain(*self.futures.values()) if not f.done()] + for future in active_futures: + future.cancel() + with suppress(CancelledError): + await asyncio.gather(*active_futures) # wait for futures to be actually cancelled self.futures.clear() await self.shutdown_task_manager() diff --git a/src/tribler/core/components/libtorrent/tests/test_download.py b/src/tribler/core/components/libtorrent/tests/test_download.py index 5f782b5039c..caf53284dc7 100644 --- a/src/tribler/core/components/libtorrent/tests/test_download.py +++ b/src/tribler/core/components/libtorrent/tests/test_download.py @@ -490,3 +490,14 @@ def test_get_tracker_status_get_peer_info_error(test_download: Download): ) status = test_download.get_tracker_status() assert status + + +async def test_shutdown(test_download: Download): + """ Test that the `shutdown` method closes the stream and clears the `futures` list.""" + test_download.stream = Mock() + assert len(test_download.futures) == 4 + + await test_download.shutdown() + + assert not test_download.futures + assert test_download.stream.close.called From 090ae756e76fd3649da553cd1ab5bab9671d3f2f Mon Sep 17 00:00:00 2001 From: drew2a Date: Tue, 11 Jul 2023 16:32:57 +0200 Subject: [PATCH 2/4] Add switch for the `wait_for_status` --- .../components/libtorrent/download_manager/download.py | 8 ++++++-- .../libtorrent/download_manager/download_manager.py | 2 +- src/tribler/core/utilities/async_force_switch.py | 7 ++++++- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/tribler/core/components/libtorrent/download_manager/download.py b/src/tribler/core/components/libtorrent/download_manager/download.py index baa84466fd9..73a3123928b 100644 --- a/src/tribler/core/components/libtorrent/download_manager/download.py +++ b/src/tribler/core/components/libtorrent/download_manager/download.py @@ -26,6 +26,7 @@ from tribler.core.components.libtorrent.utils.torrent_utils import check_handle, get_info_from_handle, require_handle from tribler.core.components.reporter.exception_handler import NoCrashException from tribler.core.exceptions import SaveResumeDataError +from tribler.core.utilities.async_force_switch import switch from tribler.core.utilities.notifier import Notifier from tribler.core.utilities.osutils import fix_filebasename from tribler.core.utilities.path_util import Path @@ -138,6 +139,7 @@ def wait_for_alert(self, success_type: str, success_getter: Getter = None, async def wait_for_status(self, *status): while self.get_state().get_status() not in status: + await switch() await self.wait_for_alert('state_changed_alert') def get_def(self) -> TorrentDef: @@ -147,10 +149,12 @@ def get_handle(self) -> Awaitable[lt.torrent_handle]: """ Returns a deferred that fires with a valid libtorrent download handle. """ - if self.handle and self.handle.is_valid(): + if self.handle: + # This block could be safely omitted because `self.future_added` does the same thing. + # However, it is used in tests, therefore it is better to keep it for now. return succeed(self.handle) - return self.wait_for_alert('add_torrent_alert', lambda a: a.handle) + return self.future_added def get_atp(self) -> Dict: save_path = self.config.get_dest_dir() diff --git a/src/tribler/core/components/libtorrent/download_manager/download_manager.py b/src/tribler/core/components/libtorrent/download_manager/download_manager.py index d150bc092e5..e0e7e2deac3 100644 --- a/src/tribler/core/components/libtorrent/download_manager/download_manager.py +++ b/src/tribler/core/components/libtorrent/download_manager/download_manager.py @@ -327,7 +327,7 @@ def create_session(self, hops=0, store_listen_port=True): ltsession.add_dht_router(*router) ltsession.start_lsd() - self._logger.debug("Started libtorrent session for %d hops on port %d", hops, ltsession.listen_port()) + self._logger.info(f"Started libtorrent session for {hops} hops on port {ltsession.listen_port()}") self.lt_session_shutdown_ready[hops] = False return ltsession diff --git a/src/tribler/core/utilities/async_force_switch.py b/src/tribler/core/utilities/async_force_switch.py index 07be4eae700..30522669aa2 100644 --- a/src/tribler/core/utilities/async_force_switch.py +++ b/src/tribler/core/utilities/async_force_switch.py @@ -2,6 +2,11 @@ import functools +async def switch(): + """ Coroutine that yields control to the event loop.""" + await asyncio.sleep(0) + + def force_switch(func): """Decorator for forced coroutine switch. The switch will occur before calling the function. @@ -11,7 +16,7 @@ def force_switch(func): @functools.wraps(func) async def wrapper(*args, **kwargs): - await asyncio.sleep(0) + await switch() return await func(*args, **kwargs) return wrapper From 895e0d2d0a2a166993e323ccdccd28badc2b8d9d Mon Sep 17 00:00:00 2001 From: drew2a Date: Wed, 12 Jul 2023 13:32:34 +0200 Subject: [PATCH 3/4] Fix shutdown for the fake download manager --- .../core/components/libtorrent/tests/test_download_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tribler/core/components/libtorrent/tests/test_download_manager.py b/src/tribler/core/components/libtorrent/tests/test_download_manager.py index 2834c5668f6..bf1befdbf99 100644 --- a/src/tribler/core/components/libtorrent/tests/test_download_manager.py +++ b/src/tribler/core/components/libtorrent/tests/test_download_manager.py @@ -50,7 +50,7 @@ async def fake_dlmgr(tmp_path_factory): dlmgr.metadata_tmpdir = tmp_path_factory.mktemp('metadata_tmpdir') dlmgr.get_session = lambda *_, **__: MagicMock() yield dlmgr - await dlmgr.shutdown(timeout=0) + await dlmgr.shutdown() async def test_get_metainfo_valid_metadata(fake_dlmgr): From cc1e93bea084396644e30bae639363cc19f2b228 Mon Sep 17 00:00:00 2001 From: drew2a Date: Mon, 17 Jul 2023 10:01:14 +0200 Subject: [PATCH 4/4] Make `Getter` optional --- .../components/libtorrent/download_manager/download.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/tribler/core/components/libtorrent/download_manager/download.py b/src/tribler/core/components/libtorrent/download_manager/download.py index 73a3123928b..dafd824e277 100644 --- a/src/tribler/core/components/libtorrent/download_manager/download.py +++ b/src/tribler/core/components/libtorrent/download_manager/download.py @@ -34,7 +34,7 @@ from tribler.core.utilities.unicode import ensure_unicode, hexlify from tribler.core.utilities.utilities import bdecode_compat -Getter = Optional[Callable[[Any], Any]] +Getter = Callable[[Any], Any] class Download(TaskManager): @@ -69,7 +69,7 @@ def __init__(self, self.checkpoint_after_next_hashcheck = False self.tracker_status = {} # {url: [num_peers, status_str]} - self.futures: Dict[str, list[tuple[Future, Callable, Getter]]] = defaultdict(list) + self.futures: Dict[str, list[tuple[Future, Callable, Optional[Getter]]]] = defaultdict(list) self.alert_handlers = defaultdict(list) self.future_added = self.wait_for_alert('add_torrent_alert', lambda a: a.handle) @@ -128,8 +128,8 @@ def get_torrent_data(self) -> Optional[object]: def register_alert_handler(self, alert_type: str, handler: lt.torrent_handle): self.alert_handlers[alert_type].append(handler) - def wait_for_alert(self, success_type: str, success_getter: Getter = None, - fail_type: str = None, fail_getter: Getter = None) -> Future: + def wait_for_alert(self, success_type: str, success_getter: Optional[Getter] = None, + fail_type: str = None, fail_getter: Optional[Getter] = None) -> Future: future = Future() if success_type: self.futures[success_type].append((future, future.set_result, success_getter))