Skip to content

Commit

Permalink
Move long-running (db) local torrents check to thread
Browse files Browse the repository at this point in the history
  • Loading branch information
qstokkink committed Jan 4, 2024
1 parent 8c2f12c commit 8212cdc
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,8 @@ def random_infohash():
torrent_checker.check_torrent_health = lambda _: succeed(None)

# No torrents yet, the selected torrents should be empty
selected_torrents, _ = await torrent_checker.check_local_torrents()
selected_torrents = torrent_checker.check_local_torrents()
await torrent_checker.wait_for_tasks()
assert len(selected_torrents) == 0

# Add some freshly checked torrents
Expand All @@ -317,7 +318,8 @@ def random_infohash():
stale_infohashes.append(infohash)

# Now check that all torrents selected for check are stale torrents.
selected_torrents, _ = await torrent_checker.check_local_torrents()
selected_torrents = torrent_checker.check_local_torrents()
await torrent_checker.wait_for_tasks()
assert len(selected_torrents) <= torrent_checker_module.TORRENT_SELECTION_POOL_SIZE

# In the above setup, both seeder (popularity) count and last_check are decreasing so,
Expand Down Expand Up @@ -355,7 +357,8 @@ def add_torrent_to_channel(infohash, last_check):
assert len(selected_torrents) == 0

# No health check call are done
await torrent_checker.check_torrents_in_user_channel()
torrent_checker.check_torrents_in_user_channel()
await torrent_checker.wait_for_tasks()
assert check_torrent_health_mock.call_count == len(selected_torrents)

num_torrents = 20
Expand All @@ -380,7 +383,8 @@ def add_torrent_to_channel(infohash, last_check):
assert torrent.infohash in outdated_torrents

# Health check requests are sent for all selected torrents
result = await torrent_checker.check_torrents_in_user_channel()
result = torrent_checker.check_torrents_in_user_channel()
await torrent_checker.wait_for_tasks()
assert len(result) == len(selected_torrents)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
import random
import time
from asyncio import CancelledError
from asyncio import CancelledError, get_running_loop
from collections import defaultdict
from typing import Dict, List, Optional, Tuple, Union

Expand Down Expand Up @@ -69,9 +69,10 @@ def __init__(self,

async def initialize(self):
self.register_task("check random tracker", self.check_random_tracker, interval=TRACKER_SELECTION_INTERVAL)
self.register_task("check local torrents", self.check_local_torrents, interval=TORRENT_SELECTION_INTERVAL)
self.register_task("check channel torrents", self.check_torrents_in_user_channel,
interval=USER_CHANNEL_TORRENT_SELECTION_INTERVAL)
self.register_task("check local torrents", get_running_loop().run_in_executor, None,
self.check_local_torrents, interval=TORRENT_SELECTION_INTERVAL)
self.register_task("check channel torrents", get_running_loop().run_in_executor, None,
self.check_torrents_in_user_channel, interval=USER_CHANNEL_TORRENT_SELECTION_INTERVAL)
await self.create_socket_or_schedule()

async def listen_on_udp(self):
Expand Down Expand Up @@ -208,6 +209,15 @@ def load_torrents_checked_from_db(self) -> Dict[bytes, HealthInfo]:
last_check=torrent.last_check, self_checked=True)
return result

def schedule_check(self, torrents: list) -> None:
"""
Schedule a health check of the given torrents.
:type torrents: list[TorrentState]
"""
for t in torrents:
self.register_anonymous_task(f"Check health of {t.infohash}", self.check_torrent_health, t.infohash)

@db_session
def torrents_to_check(self):
"""
Expand All @@ -233,16 +243,17 @@ def torrents_to_check(self):
selected_torrents = random.sample(selected_torrents, min(TORRENT_SELECTION_POOL_SIZE, len(selected_torrents)))
return selected_torrents

async def check_local_torrents(self) -> Tuple[List, List]:
def check_local_torrents(self) -> list:
"""
Perform a full health check on a few popular and old torrents in the database.
:returns: the torrents to be checked.
:rtype: list[TorrentState]
"""
selected_torrents = self.torrents_to_check()
self._logger.info(f'Check {len(selected_torrents)} local torrents')
coros = [self.check_torrent_health(t.infohash) for t in selected_torrents]
results = await gather_coros(coros)
self._logger.info(f'Results for local torrents check: {results}')
return selected_torrents, results
self.schedule_check(selected_torrents)
return selected_torrents

@db_session
def torrents_to_check_in_user_channel(self):
Expand All @@ -259,16 +270,17 @@ def torrents_to_check_in_user_channel(self):
.limit(USER_CHANNEL_TORRENT_SELECTION_POOL_SIZE))
return channel_torrents

async def check_torrents_in_user_channel(self) -> List[Union[HealthInfo, BaseException]]:
def check_torrents_in_user_channel(self) -> list:
"""
Perform a full health check of torrents in user's channel
Perform a full health check of torrents in user's channel.
:returns: the torrents to be checked.
:rtype: list[TorrentState]
"""
selected_torrents = self.torrents_to_check_in_user_channel()
self._logger.info(f'Check {len(selected_torrents)} torrents in user channel')
coros = [self.check_torrent_health(t.infohash) for t in selected_torrents]
results = await gather_coros(coros)
self._logger.info(f'Results for torrents in user channel: {results}')
return results
self.schedule_check(selected_torrents)
return selected_torrents

def get_next_tracker(self):
while tracker := self.tracker_manager.get_next_tracker():
Expand Down

0 comments on commit 8212cdc

Please sign in to comment.