Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Move long-running (db) local torrents check to thread #7796

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,8 @@ def random_infohash():
torrent_checker.check_torrent_health = lambda _: succeed(None)

# No torrents yet, the selected torrents should be empty
selected_torrents, _ = await torrent_checker.check_local_torrents()
selected_torrents = torrent_checker.check_local_torrents()
await torrent_checker.wait_for_tasks()
assert len(selected_torrents) == 0

# Add some freshly checked torrents
Expand All @@ -317,7 +318,8 @@ def random_infohash():
stale_infohashes.append(infohash)

# Now check that all torrents selected for check are stale torrents.
selected_torrents, _ = await torrent_checker.check_local_torrents()
selected_torrents = torrent_checker.check_local_torrents()
await torrent_checker.wait_for_tasks()
assert len(selected_torrents) <= torrent_checker_module.TORRENT_SELECTION_POOL_SIZE

# In the above setup, both seeder (popularity) count and last_check are decreasing so,
Expand Down Expand Up @@ -355,7 +357,8 @@ def add_torrent_to_channel(infohash, last_check):
assert len(selected_torrents) == 0

# No health check call are done
await torrent_checker.check_torrents_in_user_channel()
torrent_checker.check_torrents_in_user_channel()
await torrent_checker.wait_for_tasks()
assert check_torrent_health_mock.call_count == len(selected_torrents)

num_torrents = 20
Expand All @@ -380,7 +383,8 @@ def add_torrent_to_channel(infohash, last_check):
assert torrent.infohash in outdated_torrents

# Health check requests are sent for all selected torrents
result = await torrent_checker.check_torrents_in_user_channel()
result = torrent_checker.check_torrents_in_user_channel()
await torrent_checker.wait_for_tasks()
assert len(result) == len(selected_torrents)


Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
from __future__ import annotations

import asyncio
import logging
import random
import time
from asyncio import CancelledError
from asyncio import AbstractEventLoop, CancelledError, get_running_loop
from collections import defaultdict
from typing import Dict, List, Optional, Tuple, Union
from typing import Dict, List, Optional

from ipv8.taskmanager import TaskManager
from pony.orm import db_session, desc, select
from pony.utils import between

from ipv8.taskmanager import TaskManager
from tribler.core import notifications
from tribler.core.components.libtorrent.download_manager.download_manager import DownloadManager
from tribler.core.components.database.db.serialization import REGULAR_TORRENT
from tribler.core.components.database.db.store import MetadataStore
from tribler.core.components.libtorrent.download_manager.download_manager import DownloadManager
from tribler.core.components.torrent_checker.torrent_checker import DHT
from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HEALTH_FRESHNESS_SECONDS, HealthInfo, \
TrackerResponse
Expand Down Expand Up @@ -69,8 +71,10 @@ def __init__(self,

async def initialize(self):
self.register_task("check random tracker", self.check_random_tracker, interval=TRACKER_SELECTION_INTERVAL)
self.register_task("check local torrents", self.check_local_torrents, interval=TORRENT_SELECTION_INTERVAL)
self.register_task("check channel torrents", self.check_torrents_in_user_channel,
self.register_task("check local torrents", get_running_loop().run_in_executor, None,
self.check_local_torrents, get_running_loop(), interval=TORRENT_SELECTION_INTERVAL)
self.register_task("check channel torrents", get_running_loop().run_in_executor, None,
self.check_torrents_in_user_channel, get_running_loop(),
interval=USER_CHANNEL_TORRENT_SELECTION_INTERVAL)
await self.create_socket_or_schedule()

Expand Down Expand Up @@ -208,6 +212,16 @@ def load_torrents_checked_from_db(self) -> Dict[bytes, HealthInfo]:
last_check=torrent.last_check, self_checked=True)
return result

def schedule_check(self, torrents: list, loop: AbstractEventLoop) -> None:
"""
Schedule a health check of the given torrents.

:type torrents: list[TorrentState]
"""
for t in torrents:
loop.call_soon_threadsafe(self.register_anonymous_task, f"Check health {t.infohash}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is intentional or not, but I'm guessing check_torrent_health is still doing database stuff on the main thread (since that's where the only running eventloop is).

If it is intentional, you might as well keep the check functions on the main thread, and schedule the torrents_to_check/torrents_to_check_in_user_channel using run_threaded

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it would necessitate a major refactoring of the torrent checker to solve sanely then. Secondly, I think that this refactoring would overlap too much with #7328.

Taking that into account, I'll close this PR in favor of another (future) more thorough refactoring PR.

self.check_torrent_health, t.infohash)

@db_session
def torrents_to_check(self):
"""
Expand All @@ -233,16 +247,18 @@ def torrents_to_check(self):
selected_torrents = random.sample(selected_torrents, min(TORRENT_SELECTION_POOL_SIZE, len(selected_torrents)))
return selected_torrents

async def check_local_torrents(self) -> Tuple[List, List]:
def check_local_torrents(self, loop: AbstractEventLoop | None = None) -> list:
"""
Perform a full health check on a few popular and old torrents in the database.

:returns: the torrents to be checked.
:rtype: list[TorrentState]
"""
loop = loop or get_running_loop()
selected_torrents = self.torrents_to_check()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may need to call db.disconnect after torrents_to_check/torrents_to_check_in_user_channel since they're not running on the main thread anymore.

self._logger.info(f'Check {len(selected_torrents)} local torrents')
coros = [self.check_torrent_health(t.infohash) for t in selected_torrents]
results = await gather_coros(coros)
self._logger.info(f'Results for local torrents check: {results}')
return selected_torrents, results
self.schedule_check(selected_torrents, loop)
return selected_torrents

@db_session
def torrents_to_check_in_user_channel(self):
Expand All @@ -259,16 +275,18 @@ def torrents_to_check_in_user_channel(self):
.limit(USER_CHANNEL_TORRENT_SELECTION_POOL_SIZE))
return channel_torrents

async def check_torrents_in_user_channel(self) -> List[Union[HealthInfo, BaseException]]:
def check_torrents_in_user_channel(self, loop: AbstractEventLoop | None = None) -> list:
"""
Perform a full health check of torrents in user's channel
Perform a full health check of torrents in user's channel.

:returns: the torrents to be checked.
:rtype: list[TorrentState]
"""
loop = loop or get_running_loop()
selected_torrents = self.torrents_to_check_in_user_channel()
self._logger.info(f'Check {len(selected_torrents)} torrents in user channel')
coros = [self.check_torrent_health(t.infohash) for t in selected_torrents]
results = await gather_coros(coros)
self._logger.info(f'Results for torrents in user channel: {results}')
return results
self.schedule_check(selected_torrents, loop)
return selected_torrents

def get_next_tracker(self):
while tracker := self.tracker_manager.get_next_tracker():
Expand Down
Loading