From 8049b921fa6cc324fe23a635ba1bae4b805e5339 Mon Sep 17 00:00:00 2001 From: qstokkink Date: Tue, 19 Dec 2023 15:34:41 +0100 Subject: [PATCH] Store preferable infohashes for queries --- .../community/content_discovery_community.py | 8 +- .../layers/tests/test_user_activity_layer.py | 201 ++++++++++++++++++ .../database/db/layers/user_activity_layer.py | 128 +++++++++++ .../database/db/tribler_database.py | 3 + .../database/restapi/database_endpoint.py | 6 + .../libtorrent/download_manager/download.py | 2 +- .../core/components/user_activity/__init__.py | 0 .../core/components/user_activity/settings.py | 7 + .../user_activity/tests/__init__.py | 0 .../tests/test_user_activity_component.py | 188 ++++++++++++++++ .../core/components/user_activity/types.py | 3 + .../user_activity/user_activity_component.py | 109 ++++++++++ src/tribler/core/config/tribler_config.py | 2 + src/tribler/core/notifications.py | 5 + src/tribler/core/start_core.py | 6 +- 15 files changed, 663 insertions(+), 5 deletions(-) create mode 100644 src/tribler/core/components/database/db/layers/tests/test_user_activity_layer.py create mode 100644 src/tribler/core/components/database/db/layers/user_activity_layer.py create mode 100644 src/tribler/core/components/user_activity/__init__.py create mode 100644 src/tribler/core/components/user_activity/settings.py create mode 100644 src/tribler/core/components/user_activity/tests/__init__.py create mode 100644 src/tribler/core/components/user_activity/tests/test_user_activity_component.py create mode 100644 src/tribler/core/components/user_activity/types.py create mode 100644 src/tribler/core/components/user_activity/user_activity_component.py diff --git a/src/tribler/core/components/content_discovery/community/content_discovery_community.py b/src/tribler/core/components/content_discovery/community/content_discovery_community.py index d8c37520a86..88dcc59b199 100644 --- a/src/tribler/core/components/content_discovery/community/content_discovery_community.py +++ b/src/tribler/core/components/content_discovery/community/content_discovery_community.py @@ -215,8 +215,12 @@ def notify_gui(request, processing_results): if r.obj_state == ObjState.NEW_OBJECT ] if self.composition.notifier: - self.composition.notifier[notifications.remote_query_results]( - {"results": results, "uuid": str(request_uuid), "peer": hexlify(request.peer.mid)}) + self.composition.notifier[notifications.remote_query_results]({ + "query": kwargs.get("txt_filter"), + "results": results, + "uuid": str(request_uuid), + "peer": hexlify(request.peer.mid) + }) peers_to_query = self.get_random_peers(self.composition.max_query_peers) diff --git a/src/tribler/core/components/database/db/layers/tests/test_user_activity_layer.py b/src/tribler/core/components/database/db/layers/tests/test_user_activity_layer.py new file mode 100644 index 00000000000..dfc67a39483 --- /dev/null +++ b/src/tribler/core/components/database/db/layers/tests/test_user_activity_layer.py @@ -0,0 +1,201 @@ +from typing import Generator + +from pony.orm import db_session +from pytest import fixture + +from tribler.core.components.database.db.layers.user_activity_layer import UserActivityLayer +from tribler.core.components.user_activity.types import InfoHash +from tribler.core.utilities.pony_utils import TrackedDatabase + + +@fixture(name="layer") +def fixture_activity_layer() -> Generator[UserActivityLayer, None, None]: + database = TrackedDatabase() + database.bind(provider="sqlite", filename=":memory:") + ual = UserActivityLayer(database) + database.generate_mapping(create_tables=True) + yield ual + database.disconnect() + + +def float_equals(a: float, b: float) -> bool: + return round(a, 5) == round(b, 5) + + +def test_store_no_losers(layer: UserActivityLayer) -> None: + """ + Test that queries can be stored and retrieved. + """ + layer.store("test query", InfoHash(b"\x00" * 20), set()) + + with db_session(): + queries = layer.Query.select()[:] + + assert len(queries) == 1 + assert queries[0].query == "test query" + assert len(queries[0].infohashes) == 1 + assert list(queries[0].infohashes)[0].infohash == b"\x00" * 20 + assert float_equals(list(queries[0].infohashes)[0].preference, 1.0) + + +def test_store_with_loser(layer: UserActivityLayer) -> None: + """ + Test that queries with a loser can be stored and retrieved. + """ + layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20)}) + + with db_session(): + queries = layer.Query.select()[:] + winner, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:] + loser, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:] + + assert len(queries) == 1 + assert queries[0].query == "test query" + assert float_equals(winner.preference, 1.0) + assert float_equals(loser.preference, 0.0) + + +def test_store_with_losers(layer: UserActivityLayer) -> None: + """ + Test that queries with multiple losers can be stored and retrieved. + """ + layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20), + InfoHash(b"\x02" * 20), + InfoHash(b"\x03" * 20)}) + + with db_session(): + queries = layer.Query.select()[:] + winner, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:] + loser_1, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:] + loser_2, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:] + loser_3, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:] + + assert len(queries) == 1 + assert queries[0].query == "test query" + assert float_equals(winner.preference, 1.0) + assert float_equals(loser_1.preference, 0.0) + assert float_equals(loser_2.preference, 0.0) + assert float_equals(loser_3.preference, 0.0) + + +def test_store_weighted_decay(layer: UserActivityLayer) -> None: + """ + Test result decay after updating. + """ + layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20), + InfoHash(b"\x02" * 20), + InfoHash(b"\x03" * 20)}) + layer.store("test query", InfoHash(b"\x01" * 20), {InfoHash(b"\x00" * 20), + InfoHash(b"\x02" * 20), + InfoHash(b"\x03" * 20)}) + + with db_session(): + queries = layer.Query.select()[:] + entry_1, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:] + entry_2, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:] + entry_3, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:] + entry_4, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:] + + assert len(queries) == 1 + assert queries[0].query == "test query" + assert float_equals(entry_1.preference, 0.2) + assert float_equals(entry_2.preference, 0.8) + assert float_equals(entry_3.preference, 0.0) + assert float_equals(entry_4.preference, 0.0) + + +def test_store_delete_old(layer: UserActivityLayer) -> None: + """ + Test result decay after updating. + """ + layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20), + InfoHash(b"\x02" * 20), + InfoHash(b"\x03" * 20)}) + layer.store("test query", InfoHash(b"\x04" * 20), {InfoHash(b"\x00" * 20), + InfoHash(b"\x01" * 20), + InfoHash(b"\x02" * 20)}) + + with db_session(): + queries = layer.Query.select()[:] + entry_1, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:] + entry_2, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:] + entry_3, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:] + should_be_dropped = layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:] + entry_4, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x04" * 20)[:] + + assert len(queries) == 1 + assert queries[0].query == "test query" + assert float_equals(entry_1.preference, 0.2) + assert float_equals(entry_2.preference, 0.0) + assert float_equals(entry_3.preference, 0.0) + assert should_be_dropped == [] + assert float_equals(entry_4.preference, 0.8) + + +def test_store_delete_old_over_e(layer: UserActivityLayer) -> None: + """ + Test if entries are not deleted if their preference is still over the threshold e. + """ + layer.e = 0.0 + layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20), + InfoHash(b"\x02" * 20), + InfoHash(b"\x03" * 20)}) + layer.store("test query", InfoHash(b"\x04" * 20), {InfoHash(b"\x00" * 20), + InfoHash(b"\x01" * 20), + InfoHash(b"\x02" * 20)}) + + with db_session(): + queries = layer.Query.select()[:] + entry_1, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:] + entry_2, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:] + entry_3, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:] + entry_4, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:] + entry_5, = layer.InfohashPreference.select(lambda x: x.infohash == b"\x04" * 20)[:] + + assert len(queries) == 1 + assert queries[0].query == "test query" + assert float_equals(entry_1.preference, 0.2) + assert float_equals(entry_2.preference, 0.0) + assert float_equals(entry_3.preference, 0.0) + assert float_equals(entry_4.preference, 0.0) + assert float_equals(entry_5.preference, 0.8) + + +def test_get_preferable(layer: UserActivityLayer) -> None: + """ + Test if a preferable infohash is correctly retrieved. + """ + layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20)}) + + assert layer.get_preferable(b"\x00" * 20) == b"\x00" * 20 + + +def test_get_preferable_already_best(layer: UserActivityLayer) -> None: + """ + Test if a infohash returns itself when it is preferable. + """ + layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20)}) + + assert layer.get_preferable(b"\x01" * 20) == b"\x00" * 20 + + +def test_get_preferable_unknown(layer: UserActivityLayer) -> None: + """ + Test if a infohash returns itself when it has no known preferable infohashes. + """ + layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20)}) + + assert layer.get_preferable(b"\x02" * 20) == b"\x02" * 20 + + +def test_get_random(layer: UserActivityLayer) -> None: + """ + Test if the preferred infohash always gets returned from a random checked selection. + """ + layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20), InfoHash(b"\x02" * 20)}) + layer.store("test query", InfoHash(b"\x01" * 20), {InfoHash(b"\x00" * 20), InfoHash(b"\x02" * 20)}) + + random_selection = layer.get_preferable_to_random(limit=1) + + assert len(random_selection) == 1 + assert list(random_selection)[0] == b"\x01" * 20 diff --git a/src/tribler/core/components/database/db/layers/user_activity_layer.py b/src/tribler/core/components/database/db/layers/user_activity_layer.py new file mode 100644 index 00000000000..bceb83b9040 --- /dev/null +++ b/src/tribler/core/components/database/db/layers/user_activity_layer.py @@ -0,0 +1,128 @@ +from __future__ import annotations + +import random +import typing +from dataclasses import dataclass + +from pony import orm +from pony.orm import db_session + +from tribler.core.components.user_activity.types import InfoHash +from tribler.core.utilities.pony_utils import TrackedDatabase + +if typing.TYPE_CHECKING: + @dataclass + class InfohashPreference: + infohash: bytes + preference: float + parent_query: Query + + @dataclass + class Query: + query: str + infohashes: typing.Set[InfohashPreference] + + +class UserActivityLayer: + + def __init__(self, database: TrackedDatabase, update_weight: float = 0.8, e: float = 0.01) -> None: + """ + Create a new User Activity scheme for a particular database. + + :param database: The database to bind to. + :param update_weight: The weight of new updates. + :param e: A small value to decide near-zero preference. + """ + self.database = database + + self.e = e + self.update_weight_new = update_weight + self.update_weight_old = 1 - self.update_weight_new + + class Query(database.Entity): + query = orm.PrimaryKey(str) + infohashes = orm.Set("InfohashPreference") + + class InfohashPreference(database.Entity): + infohash = orm.Required(bytes) + preference = orm.Required(float) + parent_query = orm.Required(Query) + orm.PrimaryKey(infohash, parent_query) + + self.Query = Query + self.InfohashPreference = InfohashPreference + + def store(self, query: str, infohash: InfoHash, losing_infohashes: typing.Set[InfoHash]) -> None: + """ + Store a query, its selected infohash, and the infohashes that were not downloaded. + + :param query: The text that the user searched for. + :param infohash: The infohash that the user downloaded. + :param losing_infohashes: The infohashes that the user saw but ignored. + """ + # Convert "win" or "loss" to "1.0" or "0.0". + weights = {ih: 0.0 for ih in losing_infohashes} + weights[infohash] = 1.0 + + # Update or create a new database entry + with db_session: + existing = self.Query.get(query=query) + if existing is not None: + for old_infohash_preference in existing.infohashes: + if old_infohash_preference.infohash in weights: + new_weight = (old_infohash_preference.preference * self.update_weight_old + + weights.pop(old_infohash_preference.infohash, 0.0) * self.update_weight_new) + old_infohash_preference.preference = new_weight + else: + # This infohash did not pop up, candidate for deletion + new_weight = old_infohash_preference.preference * self.update_weight_old + if new_weight < self.e: + old_infohash_preference.delete() + else: + old_infohash_preference.preference = new_weight + if infohash in weights: + weights[infohash] = self.update_weight_new + else: + existing = self.Query(query=query, infohashes=set()) + + for new_infohash, weight in weights.items(): + existing.infohashes.add(self.InfohashPreference(infohash=new_infohash, preference=weight, + parent_query=existing)) + + @db_session + def _select_superior(self, infohash_preference: InfohashPreference) -> InfoHash: + """ + For a given InfohashPreference, get the preferable infohash from the parent query. + """ + all_hashes_for_query = list(infohash_preference.parent_query.infohashes) + all_hashes_for_query.sort(key=lambda x: x.preference, reverse=True) + return typing.cast(InfoHash, all_hashes_for_query[0].infohash) + + def get_preferable(self, infohash: InfoHash) -> InfoHash: + """ + Given an infohash, see if we know of more preferable infohashes. + + :param infohash: The infohash to find better alternatives for. + """ + with db_session: + existing = self.InfohashPreference.select(infohash=infohash)[:] + + if not existing: + return infohash + + return self._select_superior(random.SystemRandom().choice(existing)) + + def get_preferable_to_random(self, limit: int = 1) -> set[InfoHash]: + """ + Retrieve (a) random infohash(es) and then return the preferred infohash for each infohash. + + This method selects up to the limit of random infohashes and then outputs the set of preferable infohashes. + This means that you may start with ``limit`` number of infohashes and worst-case, if they all share the same, + preferable infohash, end up with only one infohash as the output. + + :param limit: The number of infohashes to randomly get the preferred infohash for (the output set may be less). + :returns: A set of infohashes of size 0 up to ``limit``. + """ + with db_session: + random_selection = self.InfohashPreference.select_random(limit=limit) + return {self._select_superior(ih) for ih in random_selection} diff --git a/src/tribler/core/components/database/db/tribler_database.py b/src/tribler/core/components/database/db/tribler_database.py index 141da849bab..63b093aa42d 100644 --- a/src/tribler/core/components/database/db/tribler_database.py +++ b/src/tribler/core/components/database/db/tribler_database.py @@ -6,6 +6,7 @@ from tribler.core.components.database.db.layers.health_data_access_layer import HealthDataAccessLayer from tribler.core.components.database.db.layers.knowledge_data_access_layer import KnowledgeDataAccessLayer +from tribler.core.components.database.db.layers.user_activity_layer import UserActivityLayer from tribler.core.utilities.pony_utils import TrackedDatabase, db_session, get_or_create MEMORY = ':memory:' @@ -31,6 +32,8 @@ def __init__(self, filename: Optional[str] = None, *, create_tables: bool = True self.TorrentHealth = self.health.TorrentHealth self.Tracker = self.health.Tracker + self.user_activity_layer = UserActivityLayer(self.instance) + filename = filename or MEMORY db_does_not_exist = filename == MEMORY or not os.path.isfile(filename) diff --git a/src/tribler/core/components/database/restapi/database_endpoint.py b/src/tribler/core/components/database/restapi/database_endpoint.py index c30ad7c29e0..33fa37dcd5c 100644 --- a/src/tribler/core/components/database/restapi/database_endpoint.py +++ b/src/tribler/core/components/database/restapi/database_endpoint.py @@ -11,6 +11,8 @@ from ipv8.REST.base_endpoint import HTTP_BAD_REQUEST from ipv8.REST.schema import schema + +from tribler.core import notifications from tribler.core.components.database.category_filter.family_filter import default_xxx_filter from tribler.core.components.database.db.layers.knowledge_data_access_layer import ResourceType from tribler.core.components.database.db.serialization import REGULAR_TORRENT, SNIPPET @@ -328,6 +330,10 @@ def search_db(): f'Main query executed in {t2 - t1:.6} seconds;\n' f'Result constructed in {t3 - t2:.6} seconds.') + self.download_manager.notifier[notifications.local_query_results]({ + "query": request.query.get("txt_filter"), + "results": list(pony_query) + }) return search_results, total, max_rowid try: diff --git a/src/tribler/core/components/libtorrent/download_manager/download.py b/src/tribler/core/components/libtorrent/download_manager/download.py index 4be439fc01e..44ca68a5928 100644 --- a/src/tribler/core/components/libtorrent/download_manager/download.py +++ b/src/tribler/core/components/libtorrent/download_manager/download.py @@ -448,7 +448,7 @@ def on_torrent_finished_alert(self, alert: lt.torrent_finished_alert): self.update_lt_status(self.handle.status()) self.checkpoint() downloaded = self.get_state().get_total_transferred(DOWNLOAD) - if downloaded > 0 and self.stream is not None and self.notifier is not None: + if downloaded > 0 and self.notifier is not None: name = self.tdef.get_name_as_unicode() infohash = self.tdef.get_infohash().hex() self.notifier[notifications.torrent_finished](infohash=infohash, name=name, hidden=self.hidden) diff --git a/src/tribler/core/components/user_activity/__init__.py b/src/tribler/core/components/user_activity/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/tribler/core/components/user_activity/settings.py b/src/tribler/core/components/user_activity/settings.py new file mode 100644 index 00000000000..e568faeafac --- /dev/null +++ b/src/tribler/core/components/user_activity/settings.py @@ -0,0 +1,7 @@ +from tribler.core.config.tribler_config_section import TriblerConfigSection + + +class UserActivitySettings(TriblerConfigSection): + enabled: bool = False + max_query_history: int = 500 + health_check_interval: float = 5.0 diff --git a/src/tribler/core/components/user_activity/tests/__init__.py b/src/tribler/core/components/user_activity/tests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/tribler/core/components/user_activity/tests/test_user_activity_component.py b/src/tribler/core/components/user_activity/tests/test_user_activity_component.py new file mode 100644 index 00000000000..c90dbbe91e1 --- /dev/null +++ b/src/tribler/core/components/user_activity/tests/test_user_activity_component.py @@ -0,0 +1,188 @@ +from asyncio import Event, sleep +from dataclasses import dataclass +from typing import Generator +from unittest.mock import Mock + +from pytest import fixture, mark + +from tribler.core import notifications +from tribler.core.components.content_discovery.content_discovery_component import ContentDiscoveryComponent +from tribler.core.components.database.database_component import DatabaseComponent +from tribler.core.components.libtorrent.libtorrent_component import LibtorrentComponent +from tribler.core.components.session import Session +from tribler.core.components.torrent_checker.torrent_checker_component import TorrentCheckerComponent +from tribler.core.components.user_activity.types import InfoHash +from tribler.core.components.user_activity.user_activity_component import UserActivityComponent +from tribler.core.config.tribler_config import TriblerConfig +from tribler.core.utilities.unicode import hexlify + + +@fixture(name="config") +def fixture_config() -> TriblerConfig: + return TriblerConfig() + + +@fixture(name="session") +def fixture_session(config: TriblerConfig) -> Session: + session = Session(config=config) + + for component in [ContentDiscoveryComponent, LibtorrentComponent, DatabaseComponent, TorrentCheckerComponent]: + session.components[component] = Mock(started_event=Event(), failed=False) + session.components[component].started_event.set() + + return session + + +@fixture(name="component") +async def fixture_component(session) -> Generator[UserActivityComponent, None, None]: + component = UserActivityComponent(None) + component.session = session + await component.run() + component.task_manager.cancel_pending_task("Check preferable") + yield component + await component.shutdown() + + +@dataclass(unsafe_hash=True) +class TorrentMetadata: + infohash: InfoHash + + +@mark.parametrize("notification", [notifications.local_query_results, notifications.remote_query_results]) +async def test_notify_query_empty(component: UserActivityComponent, notification) -> None: + """ + Test that local and remote query notifications without a query get ignored. + """ + fake_infohashes = [InfoHash(bytes([i]) * 20) for i in range(2)] + fake_torrent_metadata = [TorrentMetadata(fake_infohashes[i]) for i in range(2)] + fake_query = None + + component.session.notifier.notify(notification, data={"query": fake_query, "results": fake_torrent_metadata}) + await sleep(0) + + assert fake_query not in component.queries + assert fake_infohashes[0] not in component.infohash_to_queries + assert fake_infohashes[1] not in component.infohash_to_queries + assert fake_query not in component.infohash_to_queries[fake_infohashes[0]] + assert fake_query not in component.infohash_to_queries[fake_infohashes[1]] + + +@mark.parametrize("notification", [notifications.local_query_results, notifications.remote_query_results]) +async def test_notify_query_results(component: UserActivityComponent, notification) -> None: + """ + Test that local and remote query notifications get processed correctly. + """ + fake_infohashes = [InfoHash(bytes([i]) * 20) for i in range(2)] + fake_torrent_metadata = [TorrentMetadata(fake_infohashes[i]) for i in range(2)] + fake_query = "test query" + + component.session.notifier.notify(notification, data={"query": fake_query, "results": fake_torrent_metadata}) + await sleep(0) + + assert fake_query in component.queries + assert fake_infohashes[0] in component.infohash_to_queries + assert fake_infohashes[1] in component.infohash_to_queries + assert fake_query in component.infohash_to_queries[fake_infohashes[0]] + assert fake_query in component.infohash_to_queries[fake_infohashes[1]] + + +@mark.parametrize("notification", [notifications.local_query_results, notifications.remote_query_results]) +async def test_notify_query_results_overflow(component: UserActivityComponent, notification) -> None: + """ + Test that local and remote query notifications do not go beyond the maximum history. + + Old information should be purged. However, infohashes should not be purged if they are still in use. + """ + component.max_query_history = 1 + + fake_infohashes = [InfoHash(bytes([i]) * 20) for i in range(2)] + fake_torrent_metadata = [TorrentMetadata(fake_infohashes[i]) for i in range(2)] + fake_query_1 = "test query 1" + fake_query_2 = "test query 2" + + component.session.notifier.notify(notification, data={"query": fake_query_1, "results": fake_torrent_metadata}) + await sleep(0) + component.session.notifier.notify(notification, data={"query": fake_query_2, "results": fake_torrent_metadata[:1]}) + await sleep(0) + + assert fake_query_1 not in component.queries + assert fake_query_2 in component.queries + assert fake_infohashes[0] in component.infohash_to_queries + assert fake_infohashes[1] not in component.infohash_to_queries + assert fake_query_1 not in component.infohash_to_queries[fake_infohashes[0]] + assert fake_query_2 in component.infohash_to_queries[fake_infohashes[0]] + assert fake_query_1 not in component.infohash_to_queries[fake_infohashes[1]] + assert fake_query_2 not in component.infohash_to_queries[fake_infohashes[1]] + + +async def test_notify_finished_untracked(component: UserActivityComponent) -> None: + """ + Test that an untracked infohash does not lead to any information being stored. + """ + fake_infohash = InfoHash(b'\x00' * 20) + untracked_fake_infohash = InfoHash(b'\x01' * 20) + fake_query = "test query" + component.queries[fake_query] = {fake_infohash} + component.infohash_to_queries[fake_infohash] = [fake_query] + + component.session.notifier.notify(notifications.torrent_finished, + infohash=hexlify(untracked_fake_infohash), name="test torrent", hidden=False) + await sleep(0) + + assert not component.task_manager.is_pending_task_active("Store query") + assert not component.database_manager.store.called + + +async def test_notify_finished_tracked(component: UserActivityComponent) -> None: + """ + Test that a tracked infohash leads to information being stored. + """ + fake_infohash = InfoHash(b'\x00' * 20) + fake_query = "test query" + component.queries[fake_query] = {fake_infohash} + component.infohash_to_queries[fake_infohash] = [fake_query] + + component.session.notifier.notify(notifications.torrent_finished, + infohash=hexlify(fake_infohash), name="test torrent", hidden=False) + await sleep(0) + await component.task_manager.wait_for_tasks() + + component.database_manager.store.assert_called_with(fake_query, fake_infohash, set()) + + +async def test_check_preferable_zero(component: UserActivityComponent) -> None: + """ + Test that checking without available random torrents leads to no checks. + """ + component.database_manager.get_preferable_to_random = Mock(return_value={}) + + component.check_preferable() + await sleep(0) + + assert not component.torrent_checker.check_torrent_health.called + + +async def test_check_preferable_one(component: UserActivityComponent) -> None: + """ + Test that checking with one available random torrent leads to one check. + """ + fake_infohash = InfoHash(b'\x00' * 20) + component.database_manager.get_preferable_to_random = Mock(return_value={fake_infohash}) + + component.check_preferable() + await sleep(0) + + component.torrent_checker.check_torrent_health.assert_called_with(fake_infohash) + + +async def test_check_preferable_multiple(component: UserActivityComponent) -> None: + """ + Test that checking with multiple available random torrents leads to as many checks. + """ + fake_infohashes = {InfoHash(bytes([i]) * 20) for i in range(10)} + component.database_manager.get_preferable_to_random = Mock(return_value=fake_infohashes) + + component.check_preferable() + await sleep(0) + + assert component.torrent_checker.check_torrent_health.call_count == 10 diff --git a/src/tribler/core/components/user_activity/types.py b/src/tribler/core/components/user_activity/types.py new file mode 100644 index 00000000000..46094fcd338 --- /dev/null +++ b/src/tribler/core/components/user_activity/types.py @@ -0,0 +1,3 @@ +import typing + +InfoHash = typing.NewType("InfoHash", bytes) diff --git a/src/tribler/core/components/user_activity/user_activity_component.py b/src/tribler/core/components/user_activity/user_activity_component.py new file mode 100644 index 00000000000..45bfad1bda8 --- /dev/null +++ b/src/tribler/core/components/user_activity/user_activity_component.py @@ -0,0 +1,109 @@ +import typing +from asyncio import get_running_loop, wait +from binascii import unhexlify +from collections import OrderedDict, defaultdict + +from ipv8.taskmanager import TaskManager + +from tribler.core import notifications +from tribler.core.components.component import Component +from tribler.core.components.content_discovery.content_discovery_component import ContentDiscoveryComponent +from tribler.core.components.database.database_component import DatabaseComponent +from tribler.core.components.database.db.layers.user_activity_layer import UserActivityLayer +from tribler.core.components.libtorrent.libtorrent_component import LibtorrentComponent +from tribler.core.components.torrent_checker.torrent_checker.torrent_checker import TorrentChecker +from tribler.core.components.torrent_checker.torrent_checker_component import TorrentCheckerComponent +from tribler.core.components.user_activity.settings import UserActivitySettings +from tribler.core.components.user_activity.types import InfoHash +from tribler.core.sentry_reporter.sentry_reporter import SentryReporter + + +class UserActivityComponent(Component): + infohash_to_queries: typing.Dict[InfoHash, typing.List[str]] + queries: typing.Dict[str, typing.Set[InfoHash]] + max_query_history: int + database_manager: UserActivityLayer + torrent_checker: TorrentChecker + task_manager: TaskManager + + def __init__(self, reporter: typing.Optional[SentryReporter] = None) -> None: + super().__init__(reporter) + + self.infohash_to_queries: dict[InfoHash, list[str]] = defaultdict(list) + self.queries: OrderedDict[str, typing.Set[InfoHash]] = OrderedDict() + self.max_query_history = UserActivitySettings().max_query_history + self.database_manager = None + self.torrent_checker = None + self.task_manager = TaskManager() + + async def run(self) -> None: + await super().run() + + # Load settings + self.max_query_history = self.session.config.user_activity.max_query_history + + # Wait for dependencies + await self.require_component(ContentDiscoveryComponent) # remote_query_results notification + await self.require_component(LibtorrentComponent) # torrent_finished notification + database_component = await self.require_component(DatabaseComponent) # local_query_results notification + torrent_checker_component = await self.require_component(TorrentCheckerComponent) + + self.database_manager: UserActivityLayer = database_component.db.user_activity_layer + self.torrent_checker: TorrentChecker = torrent_checker_component.torrent_checker + + # Hook events + self.session.notifier.add_observer(notifications.torrent_finished, self.on_torrent_finished) + self.session.notifier.add_observer(notifications.remote_query_results, self.on_query_results) + self.session.notifier.add_observer(notifications.local_query_results, self.on_query_results) + self.task_manager.register_task("Check preferable", self.check_preferable, + interval=self.session.config.user_activity.health_check_interval) + + def on_query_results(self, data: dict) -> None: + """ + Start tracking a query and its results. + + If any of the results get downloaded, we store the query (see ``on_torrent_finished``). + """ + query = data.get("query") + if query is None: + return + + results = {tmd.infohash for tmd in data["results"]} + for infohash in results: + self.infohash_to_queries[infohash].append(query) + self.queries[query] = results | self.queries.get(query, set()) + + if len(self.queries) > self.max_query_history: + query, results = self.queries.popitem(False) + for infohash in results: + self.infohash_to_queries[infohash].remove(query) + if not self.infohash_to_queries[infohash]: + self.infohash_to_queries.pop(infohash) + + def on_torrent_finished(self, infohash: str, name: str, hidden: bool) -> None: + """ + When a torrent finishes, check if we were tracking the infohash. If so, store the query and its result. + """ + b_infohash = InfoHash(unhexlify(infohash)) + queries = self.infohash_to_queries[b_infohash] + for query in queries: + losing_infohashes = self.queries[query] - {b_infohash} + self.task_manager.register_anonymous_task("Store query", get_running_loop().run_in_executor, + None, self.database_manager.store, + query, b_infohash, losing_infohashes) + + def check_preferable(self) -> None: + """ + Check a preferable torrent. + + This causes a chain of events that leads to the torrent being gossiped more often in the ``ContentDiscovery`` + community. + """ + random_infohashes = self.database_manager.get_preferable_to_random(limit=1) # Note: this set can be empty! + for infohash in random_infohashes: + self.task_manager.register_anonymous_task("Check preferable torrent", + self.torrent_checker.check_torrent_health, infohash) + + async def shutdown(self) -> None: + await super().shutdown() + await self.task_manager.shutdown_task_manager() diff --git a/src/tribler/core/config/tribler_config.py b/src/tribler/core/config/tribler_config.py index ccee0b92a30..86080d8fe00 100644 --- a/src/tribler/core/config/tribler_config.py +++ b/src/tribler/core/config/tribler_config.py @@ -24,6 +24,7 @@ from tribler.core.components.restapi.rest.settings import APISettings from tribler.core.components.torrent_checker.settings import TorrentCheckerSettings from tribler.core.components.tunnel.settings import TunnelCommunitySettings +from tribler.core.components.user_activity.settings import UserActivitySettings from tribler.core.components.watch_folder.settings import WatchFolderSettings from tribler.core.settings import GeneralSettings @@ -54,6 +55,7 @@ class Config: api: APISettings = APISettings() resource_monitor: ResourceMonitorSettings = ResourceMonitorSettings() content_discovery_community: ContentDiscoveryComponentConfig = ContentDiscoveryComponentConfig() + user_activity: UserActivitySettings = UserActivitySettings() # Special configuration options related to the operation mode of the Core upgrader_enabled: bool = True diff --git a/src/tribler/core/notifications.py b/src/tribler/core/notifications.py index 2b2c6304f50..cf5ce094779 100644 --- a/src/tribler/core/notifications.py +++ b/src/tribler/core/notifications.py @@ -31,6 +31,11 @@ def remote_query_results(data: dict): ... +def local_query_results(data: dict): + # Local database search results were received by Tribler. Contains received entries + ... + + def circuit_removed(circuit: Circuit, additional_info: str): # Tribler tunnel circuit has been removed (notification to Core) ... diff --git a/src/tribler/core/start_core.py b/src/tribler/core/start_core.py index 23502f43d0f..f33fa1436c0 100644 --- a/src/tribler/core/start_core.py +++ b/src/tribler/core/start_core.py @@ -3,7 +3,6 @@ import logging.config import os import signal -import sys from pathlib import Path from typing import List, Optional @@ -14,6 +13,7 @@ ) from tribler.core.components.bandwidth_accounting.bandwidth_accounting_component import BandwidthAccountingComponent from tribler.core.components.component import Component +from tribler.core.components.content_discovery.content_discovery_component import ContentDiscoveryComponent from tribler.core.components.database.database_component import DatabaseComponent from tribler.core.components.gui_process_watcher.gui_process_watcher import GuiProcessWatcher from tribler.core.components.gui_process_watcher.gui_process_watcher_component import GuiProcessWatcherComponent @@ -22,7 +22,6 @@ from tribler.core.components.knowledge.knowledge_component import KnowledgeComponent from tribler.core.components.libtorrent.libtorrent_component import LibtorrentComponent from tribler.core.components.payout.payout_component import PayoutComponent -from tribler.core.components.content_discovery.content_discovery_component import ContentDiscoveryComponent from tribler.core.components.reporter.exception_handler import default_core_exception_handler from tribler.core.components.reporter.reporter_component import ReporterComponent from tribler.core.components.resource_monitor.resource_monitor_component import ResourceMonitorComponent @@ -31,6 +30,7 @@ from tribler.core.components.socks_servers.socks_servers_component import SocksServersComponent from tribler.core.components.torrent_checker.torrent_checker_component import TorrentCheckerComponent from tribler.core.components.tunnel.tunnel_component import TunnelsComponent +from tribler.core.components.user_activity.user_activity_component import UserActivityComponent from tribler.core.components.version_check.version_check_component import VersionCheckComponent from tribler.core.components.watch_folder.watch_folder_component import WatchFolderComponent from tribler.core.config.tribler_config import TriblerConfig @@ -77,6 +77,8 @@ def components_gen(config: TriblerConfig): yield TorrentCheckerComponent() if config.ipv8.enabled and config.torrent_checking.enabled and config.content_discovery_community.enabled: yield ContentDiscoveryComponent() + if config.libtorrent.enabled and config.user_activity.enabled: + yield UserActivityComponent() # The components below are skipped if config.gui_test_mode == True if config.gui_test_mode: