diff --git a/src/tribler/core/components/torrent_checker/torrent_checker/tests/test_torrentchecker.py b/src/tribler/core/components/torrent_checker/torrent_checker/tests/test_torrentchecker.py index a72837e5845..43a0adf31f4 100644 --- a/src/tribler/core/components/torrent_checker/torrent_checker/tests/test_torrentchecker.py +++ b/src/tribler/core/components/torrent_checker/torrent_checker/tests/test_torrentchecker.py @@ -290,7 +290,8 @@ def random_infohash(): torrent_checker.check_torrent_health = lambda _: succeed(None) # No torrents yet, the selected torrents should be empty - selected_torrents, _ = await torrent_checker.check_local_torrents() + selected_torrents = torrent_checker.check_local_torrents() + await torrent_checker.wait_for_tasks() assert len(selected_torrents) == 0 # Add some freshly checked torrents @@ -317,7 +318,8 @@ def random_infohash(): stale_infohashes.append(infohash) # Now check that all torrents selected for check are stale torrents. - selected_torrents, _ = await torrent_checker.check_local_torrents() + selected_torrents = torrent_checker.check_local_torrents() + await torrent_checker.wait_for_tasks() assert len(selected_torrents) <= torrent_checker_module.TORRENT_SELECTION_POOL_SIZE # In the above setup, both seeder (popularity) count and last_check are decreasing so, @@ -355,7 +357,8 @@ def add_torrent_to_channel(infohash, last_check): assert len(selected_torrents) == 0 # No health check call are done - await torrent_checker.check_torrents_in_user_channel() + torrent_checker.check_torrents_in_user_channel() + await torrent_checker.wait_for_tasks() assert check_torrent_health_mock.call_count == len(selected_torrents) num_torrents = 20 @@ -380,7 +383,8 @@ def add_torrent_to_channel(infohash, last_check): assert torrent.infohash in outdated_torrents # Health check requests are sent for all selected torrents - result = await torrent_checker.check_torrents_in_user_channel() + result = torrent_checker.check_torrents_in_user_channel() + await torrent_checker.wait_for_tasks() assert len(result) == len(selected_torrents) diff --git a/src/tribler/core/components/torrent_checker/torrent_checker/torrent_checker.py b/src/tribler/core/components/torrent_checker/torrent_checker/torrent_checker.py index 4dc705cb184..01a8d663057 100644 --- a/src/tribler/core/components/torrent_checker/torrent_checker/torrent_checker.py +++ b/src/tribler/core/components/torrent_checker/torrent_checker/torrent_checker.py @@ -2,7 +2,7 @@ import logging import random import time -from asyncio import CancelledError +from asyncio import CancelledError, get_running_loop from collections import defaultdict from typing import Dict, List, Optional, Tuple, Union @@ -69,9 +69,10 @@ def __init__(self, async def initialize(self): self.register_task("check random tracker", self.check_random_tracker, interval=TRACKER_SELECTION_INTERVAL) - self.register_task("check local torrents", self.check_local_torrents, interval=TORRENT_SELECTION_INTERVAL) - self.register_task("check channel torrents", self.check_torrents_in_user_channel, - interval=USER_CHANNEL_TORRENT_SELECTION_INTERVAL) + self.register_task("check local torrents", get_running_loop().run_in_executor, None, + self.check_local_torrents, interval=TORRENT_SELECTION_INTERVAL) + self.register_task("check channel torrents", get_running_loop().run_in_executor, None, + self.check_torrents_in_user_channel, interval=USER_CHANNEL_TORRENT_SELECTION_INTERVAL) await self.create_socket_or_schedule() async def listen_on_udp(self): @@ -208,6 +209,15 @@ def load_torrents_checked_from_db(self) -> Dict[bytes, HealthInfo]: last_check=torrent.last_check, self_checked=True) return result + def schedule_check(self, torrents: list) -> None: + """ + Schedule a health check of the given torrents. + + :type torrents: list[TorrentState] + """ + for t in torrents: + self.register_anonymous_task(f"Check health of {t.infohash}", self.check_torrent_health, t.infohash) + @db_session def torrents_to_check(self): """ @@ -233,16 +243,17 @@ def torrents_to_check(self): selected_torrents = random.sample(selected_torrents, min(TORRENT_SELECTION_POOL_SIZE, len(selected_torrents))) return selected_torrents - async def check_local_torrents(self) -> Tuple[List, List]: + def check_local_torrents(self) -> list: """ Perform a full health check on a few popular and old torrents in the database. + + :returns: the torrents to be checked. + :rtype: list[TorrentState] """ selected_torrents = self.torrents_to_check() self._logger.info(f'Check {len(selected_torrents)} local torrents') - coros = [self.check_torrent_health(t.infohash) for t in selected_torrents] - results = await gather_coros(coros) - self._logger.info(f'Results for local torrents check: {results}') - return selected_torrents, results + self.schedule_check(selected_torrents) + return selected_torrents @db_session def torrents_to_check_in_user_channel(self): @@ -259,16 +270,17 @@ def torrents_to_check_in_user_channel(self): .limit(USER_CHANNEL_TORRENT_SELECTION_POOL_SIZE)) return channel_torrents - async def check_torrents_in_user_channel(self) -> List[Union[HealthInfo, BaseException]]: + def check_torrents_in_user_channel(self) -> list: """ - Perform a full health check of torrents in user's channel + Perform a full health check of torrents in user's channel. + + :returns: the torrents to be checked. + :rtype: list[TorrentState] """ selected_torrents = self.torrents_to_check_in_user_channel() self._logger.info(f'Check {len(selected_torrents)} torrents in user channel') - coros = [self.check_torrent_health(t.infohash) for t in selected_torrents] - results = await gather_coros(coros) - self._logger.info(f'Results for torrents in user channel: {results}') - return results + self.schedule_check(selected_torrents) + return selected_torrents def get_next_tracker(self): while tracker := self.tracker_manager.get_next_tracker():