diff --git a/src/tribler/core/components.py b/src/tribler/core/components.py index bcb4218ac56..4db77cf7a80 100644 --- a/src/tribler/core/components.py +++ b/src/tribler/core/components.py @@ -9,7 +9,6 @@ from ipv8.loader import CommunityLauncher, after, kwargs, overlay, precondition, set_in_session, walk_strategy from ipv8.overlay import Overlay, SettingsClass from ipv8.peerdiscovery.discovery import DiscoveryStrategy, RandomWalk -from ipv8.taskmanager import TaskManager if TYPE_CHECKING: from ipv8.bootstrapping.bootstrapper_interface import Bootstrapper @@ -321,26 +320,6 @@ def finalize(self, ipv8: IPv8, session: Session, community: Community) -> None: session.rest_manager.get_endpoint("/api/ipv8").endpoints["/tunnel"].tunnels = community -@after("ContentDiscoveryComponent", "TorrentCheckerComponent") -@precondition('session.config.get("user_activity/enabled")') -@overlay("tribler.core.user_activity.community", "UserActivityCommunity") -class UserActivityComponent(BaseLauncher): - """ - Launch instructions for the user activity community. - """ - - def get_kwargs(self, session: Session) -> dict: - """ - Create and forward the rendezvous database for the Community. - """ - from tribler.core.user_activity.manager import UserActivityManager - - out = super().get_kwargs(session) - max_query_history = session.config.get("user_activity/max_query_history") - out["manager"] = UserActivityManager(TaskManager(), session, max_query_history) - return out - - @precondition('session.config.get("versioning/enabled")') class VersioningComponent(ComponentLauncher): """ diff --git a/src/tribler/core/database/layers/user_activity.py b/src/tribler/core/database/layers/user_activity.py deleted file mode 100644 index 2a97bbe78e0..00000000000 --- a/src/tribler/core/database/layers/user_activity.py +++ /dev/null @@ -1,217 +0,0 @@ -from __future__ import annotations - -import logging -import random -import typing - -from pony import orm -from pony.orm import Database, db_session - -from tribler.core.user_activity.types import InfoHash - -if typing.TYPE_CHECKING: - from dataclasses import dataclass - - - @dataclass - class InfohashPreference: - """ - Typing for infohash preference database entries. - """ - - infohash: bytes - preference: float - parent_query: Query - - - @dataclass - class Query: - """ - Typing for query database entries. - """ - - query: str - forwarding_pk: bytes | None - infohashes: typing.Set[InfohashPreference] - -logger = logging.getLogger(__name__) - - -class UserActivityLayer: - """ - A database layer to store queries and corresponding preference. - """ - - def __init__(self, database: Database, update_weight: float = 0.8, e: float = 0.01) -> None: - """ - Create a new User Activity scheme for a particular database. - - :param database: The database to bind to. - :param update_weight: The weight of new updates. - :param e: A small value to decide near-zero preference. - """ - self.database = database - - self.e = e - self.update_weight_new = update_weight - self.update_weight_old = 1 - self.update_weight_new - - class Query(database.Entity): - query = orm.Required(str) - forwarding_pk = orm.Required(bytes) - infohashes = orm.Set("InfohashPreference") - orm.PrimaryKey(query, forwarding_pk) - - class InfohashPreference(database.Entity): - infohash = orm.Required(bytes) - preference = orm.Required(float) - parent_query = orm.Required(Query) - orm.PrimaryKey(infohash, parent_query) - - self.Query = Query - self.InfohashPreference = InfohashPreference - - def store_external(self, query: str, infohashes: list[bytes], weights: list[float], public_key: bytes) -> None: - """ - Store externally shared info. - """ - if len(infohashes) != len(weights): - logger.warning("Refusing to store query for %s: infohashes and weights lists do not match!", - repr(public_key)) - return - - with db_session: - existing = self.Query.get(query=query, forwarding_pk=public_key) - existing_entries = {} - if existing is None: - existing = self.Query(query=query, forwarding_pk=public_key, infohashes=set()) - else: - for infohash in existing.infohashes: - if infohash.infohash not in infohashes: - infohash.delete() - else: - existing_entries[infohash.infohash] = infohash - for i in range(len(infohashes)): - if infohashes[i] in existing_entries: - existing_entries[infohashes[i]].preference = weights[i] - else: - existing.infohashes.add(self.InfohashPreference(infohash=InfoHash(infohashes[i]), - preference=weights[i], - parent_query=existing)) - - def store(self, query: str, infohash: InfoHash, losing_infohashes: typing.Set[InfoHash]) -> None: - """ - Store a query, its selected infohash, and the infohashes that were not downloaded. - - :param query: The text that the user searched for. - :param infohash: The infohash that the user downloaded. - :param losing_infohashes: The infohashes that the user saw but ignored. - """ - # Convert "win" or "loss" to "1.0" or "0.0". - weights = {ih: 0.0 for ih in losing_infohashes} - weights[infohash] = 1.0 - - # Update or create a new database entry - with db_session: - existing = self.Query.get(query=query) - if existing is not None: - for old_infohash_preference in existing.infohashes: - if old_infohash_preference.infohash in weights: - new_weight = (old_infohash_preference.preference * self.update_weight_old - + weights.pop(old_infohash_preference.infohash, 0.0) * self.update_weight_new) - old_infohash_preference.preference = new_weight - else: - # This infohash did not pop up, candidate for deletion - new_weight = old_infohash_preference.preference * self.update_weight_old - if new_weight < self.e: - old_infohash_preference.delete() - else: - old_infohash_preference.preference = new_weight - if infohash in weights: - weights[infohash] = self.update_weight_new - else: - existing = self.Query(query=query, infohashes=set(), forwarding_pk=b"") - - for new_infohash, weight in weights.items(): - existing.infohashes.add(self.InfohashPreference(infohash=new_infohash, preference=weight, - parent_query=existing)) - - @db_session - def _select_superior(self, infohash_preference: InfohashPreference) -> InfoHash: - """ - For a given InfohashPreference, get the preferable infohash from the parent query. - """ - all_hashes_for_query = list(infohash_preference.parent_query.infohashes) - all_hashes_for_query.sort(key=lambda x: x.preference, reverse=True) - return typing.cast(InfoHash, all_hashes_for_query[0].infohash) - - def get_preferable(self, infohash: InfoHash) -> InfoHash: - """ - Given an infohash, see if we know of more preferable infohashes. - - :param infohash: The infohash to find better alternatives for. - """ - with db_session: - existing = self.InfohashPreference.select(infohash=infohash)[:] - - if not existing: - return infohash - - return self._select_superior(random.SystemRandom().choice(existing)) - - def get_preferable_to_random(self, limit: int = 1) -> set[InfoHash]: - """ - Retrieve (a) random infohash(es) and then return the preferred infohash for each infohash. - This method selects up to the limit of random infohashes and then outputs the set of preferable infohashes. - This means that you may start with ``limit`` number of infohashes and worst-case, if they all share the same, - preferable infohash, end up with only one infohash as the output. - - :param limit: The number of infohashes to randomly get the preferred infohash for (the output set may be less). - :returns: A set of infohashes of size 0 up to ``limit``. - """ - with db_session: - random_selection = self.InfohashPreference.select_random(limit=limit) - return {self._select_superior(ih) for ih in random_selection} - - def get_random_query_aggregate(self, neighbors: int, - limit: int = 20) -> tuple[str, list[InfoHash], list[float]] | None: - """ - Select a random query string and aggregate the scores from different peers. - - :param neighbors: The number of connected neighbors. - :param limit: The number of infohashes to randomly get weights for. - :returns: a randomly-selected query and up to ``limit`` associated infohashes, or None. - """ - with db_session: - random_queries = self.Query.select_random(limit=1) - if not random_queries: - return None - random_selection, = random_queries - infohashes = [] - weights = [] - # Option 1: give my knowledge with a chance weighted according to the number of connected neighbors - existing = self.Query.select(query=random_selection.query, forwarding_pk=b"")[:] - if existing and (neighbors == 0 or random.random() < 1.0 - 1/neighbors): - items = list(existing[0].infohashes) - random.shuffle(items) - for infohash_preference in items[:limit]: - infohashes.append(InfoHash(infohash_preference.infohash)) - weights.append(infohash_preference.preference) - return random_selection.query, infohashes, weights - # Option 2: aggregate - results = self.Query.select(lambda q: q.query == random_selection.query)[:] - num_results_div = 1/len(results) - preferences: dict[bytes, float] = {} - for query in results: - for infohash_preference in query.infohashes: - preferences[infohash_preference.infohash] = (preferences.get(infohash_preference.infohash, 0.0) - + infohash_preference.preference * num_results_div) - if not preferences: - return None - items = list(preferences.items()) - random.shuffle(items) - for aggregated in items[:limit]: - infohash, preference = aggregated - infohashes.append(InfoHash(infohash)) - weights.append(preference) - return random_selection.query, infohashes, weights diff --git a/src/tribler/core/database/tribler_database.py b/src/tribler/core/database/tribler_database.py index ed6916952d8..71a9cee76a4 100644 --- a/src/tribler/core/database/tribler_database.py +++ b/src/tribler/core/database/tribler_database.py @@ -10,7 +10,6 @@ from tribler.core.database.layers.health import HealthDataAccessLayer from tribler.core.database.layers.knowledge import KnowledgeDataAccessLayer -from tribler.core.database.layers.user_activity import UserActivityLayer if TYPE_CHECKING: import dataclasses @@ -52,7 +51,6 @@ def __init__(self, filename: str | None = None, *, create_tables: bool = True, * self.knowledge = KnowledgeDataAccessLayer(self.instance) self.health = HealthDataAccessLayer(self.knowledge) - self.user_activity = UserActivityLayer(self.instance) self.Misc = self.define_binding(self.instance) diff --git a/src/tribler/core/user_activity/__init__.py b/src/tribler/core/user_activity/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/src/tribler/core/user_activity/community.py b/src/tribler/core/user_activity/community.py deleted file mode 100644 index 54ef94bf6c6..00000000000 --- a/src/tribler/core/user_activity/community.py +++ /dev/null @@ -1,95 +0,0 @@ -from __future__ import annotations - -from typing import TYPE_CHECKING - -from ipv8.community import Community, CommunitySettings -from ipv8.lazy_community import lazy_wrapper - -from tribler.core.user_activity.payload import InfohashPreferencePayload, PullPreferencePayload - -if TYPE_CHECKING: - from ipv8.types import Peer - - from tribler.core.user_activity.manager import UserActivityManager - - -class UserActivitySettings(CommunitySettings): - """ - Settings for the UserActivityCommunity. - """ - - manager: UserActivityManager - crawler_mid: bytes = b"Jy\xa9\x90G\x86\xec[\xde\xda\xf8(\xe6\x81l\xa2\xe0\xba\xaf\xac" - - -class UserActivityCommunity(Community): - """ - A community that shares preferred infohashes. - """ - - community_id = b"UserActivityOverlay\x00" - settings_class = UserActivitySettings - - def __init__(self, settings: UserActivitySettings) -> None: - """ - Create a new user activity community. - """ - super().__init__(settings) - - self.composition = settings - - self.add_message_handler(InfohashPreferencePayload, self.on_infohash_preference) - self.add_message_handler(PullPreferencePayload, self.on_pull_preference) - - self.register_task("Gossip random preference", self.gossip, interval=5.0) - - async def unload(self) -> None: - """ - Unload our activity manager. - """ - await self.composition.manager.task_manager.shutdown_task_manager() - await super().unload() - - def gossip(self, receivers: list[Peer] | None = None) -> None: - """ - Select a random database entry and send it to a random peer. - """ - neighbors = receivers if receivers is not None else self.get_peers() # Explicit empty list leads to nothing. - if not neighbors: - return - - aggregate_for = 0 if receivers is not None else len(neighbors) - aggregate = self.composition.manager.database_manager.get_random_query_aggregate(aggregate_for) - - if aggregate is not None: - payload = InfohashPreferencePayload(*aggregate) - for peer in neighbors: - self.ez_send(peer, payload) - - @lazy_wrapper(InfohashPreferencePayload) - def on_infohash_preference(self, peer: Peer, payload: InfohashPreferencePayload) -> None: - """ - We received a preference message. - """ - self.composition.manager.database_manager.store_external(payload.query, payload.infohashes, payload.weights, - peer.public_key.key_to_bin()) - - for infohash, _ in sorted(zip(payload.infohashes, payload.weights), key=lambda x: x[1], reverse=True): - self.composition.manager.check(infohash) - break - - @lazy_wrapper(PullPreferencePayload) - def on_pull_preference(self, peer: Peer, payload: PullPreferencePayload) -> None: - """ - We received a pull message. We only allow specific peers to do this! - """ - peer_mid = peer.mid - - if peer_mid != self.composition.crawler_mid: - self.logger.warning("Refusing to serve a pull from %s, not a crawler!", str(peer)) - return - if payload.mid != self.my_peer.mid: - self.logger.warning("Refusing to serve a pull from %s, replay attack?!", str(peer)) - return - - self.gossip([peer]) diff --git a/src/tribler/core/user_activity/manager.py b/src/tribler/core/user_activity/manager.py deleted file mode 100644 index 0cc3aa2175d..00000000000 --- a/src/tribler/core/user_activity/manager.py +++ /dev/null @@ -1,89 +0,0 @@ -from __future__ import annotations - -import typing -from asyncio import get_running_loop -from binascii import unhexlify -from collections import OrderedDict, defaultdict - -from tribler.core.notifier import Notification -from tribler.core.user_activity.types import InfoHash - -if typing.TYPE_CHECKING: - from ipv8.taskmanager import TaskManager - - from tribler.core.database.layers.user_activity import UserActivityLayer - from tribler.core.session import Session - from tribler.core.torrent_checker.torrent_checker import TorrentChecker - - -class UserActivityManager: - """ - A manager for user activity events. - """ - - def __init__(self, task_manager: TaskManager, session: Session, max_query_history: int) -> None: - """ - Create a new activity manager. - """ - super().__init__() - - self.infohash_to_queries: dict[InfoHash, list[str]] = defaultdict(list) - self.queries: OrderedDict[str, typing.Set[InfoHash]] = OrderedDict() - self.max_query_history = max_query_history - self.database_manager: UserActivityLayer = session.db.user_activity - self.torrent_checker: TorrentChecker | None = session.torrent_checker - self.task_manager = task_manager - - # Hook events - session.notifier.add(Notification.torrent_finished, self.on_torrent_finished) - session.notifier.add(Notification.remote_query_results, self.on_query_results) - session.notifier.add(Notification.local_query_results, self.on_query_results) - self.task_manager.register_task("Check preferable", self.check_preferable, - interval=session.config.get("user_activity/health_check_interval")) - - def on_query_results(self, query: str, **data: dict) -> None: - """ - Start tracking a query and its results. - If any of the results get downloaded, we store the query (see ``on_torrent_finished``). - """ - results = {tmd["infohash"] for tmd in data["results"]} - for infohash in results: - self.infohash_to_queries[infohash].append(query) - self.queries[query] = results | self.queries.get(query, set()) - - if len(self.queries) > self.max_query_history: - query, results = self.queries.popitem(False) - for infohash in results: - self.infohash_to_queries[infohash].remove(query) - if not self.infohash_to_queries[infohash]: - self.infohash_to_queries.pop(infohash) - - def on_torrent_finished(self, infohash: str, name: str, hidden: bool) -> None: - """ - When a torrent finishes, check if we were tracking the infohash. If so, store the query and its result. - """ - b_infohash = InfoHash(unhexlify(infohash)) - queries = self.infohash_to_queries[b_infohash] - for query in queries: - losing_infohashes = self.queries[query] - {b_infohash} - self.task_manager.register_anonymous_task("Store query", get_running_loop().run_in_executor, - None, self.database_manager.store, - query, b_infohash, losing_infohashes) - - def check_preferable(self) -> None: - """ - Check a preferable torrent. - This causes a chain of events that leads to the torrent being gossiped more often in the ``ContentDiscovery`` - community. - """ - random_infohashes = self.database_manager.get_preferable_to_random(limit=1) # Note: this set can be empty! - for infohash in random_infohashes: - self.check(infohash) - - def check(self, infohash: bytes) -> None: - """ - Check the health of a given infohash. - """ - if self.torrent_checker: - self.task_manager.register_anonymous_task("Check preferable torrent", - self.torrent_checker.check_torrent_health, infohash) diff --git a/src/tribler/core/user_activity/payload.py b/src/tribler/core/user_activity/payload.py deleted file mode 100644 index ccd7fdd1254..00000000000 --- a/src/tribler/core/user_activity/payload.py +++ /dev/null @@ -1,46 +0,0 @@ -from __future__ import annotations - -from ipv8.messaging.lazy_payload import VariablePayloadWID, vp_compile -from typing_extensions import Self - - -@vp_compile -class InfohashPreferencePayload(VariablePayloadWID): - """ - A network payload containing a query and corresponding infohashes with their perceived preference. - """ - - msg_id = 1 - - names = ["query", "infohashes", "weights"] - format_list = ["varlenHutf8", "varlenHx20", "arrayH-d"] - - query: str - infohashes: list[bytes] - weights: list[float] - - @classmethod - def fix_unpack_infohashes(cls: type[Self], wire_value: bytes) -> list[bytes]: - """ - Convert the wire-format to a list of 20 byte values. - """ - return [wire_value[i:i + 20] for i in range(0, len(wire_value), 20)] - - def fix_pack_infohashes(self, user_value: list[bytes]) -> bytes: - """ - Convert a list of bytest to one big bytes. - """ - return b"".join(user_value) - -@vp_compile -class PullPreferencePayload(VariablePayloadWID): - """ - Ask for a random preference. - """ - - msg_id = 2 - - names = ["mid"] - format_list = ["varlenH"] - - mid: bytes diff --git a/src/tribler/core/user_activity/types.py b/src/tribler/core/user_activity/types.py deleted file mode 100644 index 46094fcd338..00000000000 --- a/src/tribler/core/user_activity/types.py +++ /dev/null @@ -1,3 +0,0 @@ -import typing - -InfoHash = typing.NewType("InfoHash", bytes) diff --git a/src/tribler/test_unit/core/database/layers/test_user_activity.py b/src/tribler/test_unit/core/database/layers/test_user_activity.py deleted file mode 100644 index 1f0ed7da609..00000000000 --- a/src/tribler/test_unit/core/database/layers/test_user_activity.py +++ /dev/null @@ -1,287 +0,0 @@ -from ipv8.test.base import TestBase -from pony.orm import Database, db_session - -from tribler.core.database.layers.user_activity import UserActivityLayer -from tribler.core.user_activity.types import InfoHash - - -class TestUserActivityLayer(TestBase): - """ - Tests for the UserActivityLayer class. - """ - - def setUp(self) -> None: - """ - Create a new memory database and a user activity layer. - """ - super().setUp() - self.database = Database() - self.database.bind(provider="sqlite", filename=":memory:") - self.layer = UserActivityLayer(self.database) - self.database.generate_mapping(create_tables=True) - - async def tearDown(self) -> None: - """ - Disconnect the database. - """ - self.database.disconnect() - await super().tearDown() - - def float_equals(self, a: float, b: float) -> bool: - """ - Check if two floats are roughly equal. - """ - return round(a, 5) == round(b, 5) - - def test_store_no_losers(self) -> None: - """ - Test that queries can be stored and retrieved. - """ - self.layer.store("test query", InfoHash(b"\x00" * 20), set()) - - with db_session(): - queries = self.layer.Query.select()[:] - - self.assertEqual(1, len(queries)) - self.assertEqual("test query", queries[0].query) - self.assertEqual(1, len(queries[0].infohashes)) - self.assertEqual(b"\x00" * 20, next(iter(queries[0].infohashes)).infohash) - self.assertTrue(self.float_equals(next(iter(queries[0].infohashes)).preference, 1.0)) - - def test_store_with_loser(self) -> None: - """ - Test that queries with a loser can be stored and retrieved. - """ - self.layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20)}) - - with db_session(): - queries = self.layer.Query.select()[:] - winner, = self.layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:] - loser, = self.layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:] - - self.assertEqual(1, len(queries)) - self.assertEqual("test query", queries[0].query) - self.assertTrue(self.float_equals(winner.preference, 1.0)) - self.assertTrue(self.float_equals(loser.preference, 0.0)) - - def test_store_with_losers(self) -> None: - """ - Test that queries with multiple losers can be stored and retrieved. - """ - self.layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20), - InfoHash(b"\x02" * 20), - InfoHash(b"\x03" * 20)}) - - with db_session(): - queries = self.layer.Query.select()[:] - winner, = self.layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:] - loser_1, = self.layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:] - loser_2, = self.layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:] - loser_3, = self.layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:] - - self.assertEqual(1, len(queries)) - self.assertEqual("test query", queries[0].query) - self.assertTrue(self.float_equals(winner.preference, 1.0)) - self.assertTrue(self.float_equals(loser_1.preference, 0.0)) - self.assertTrue(self.float_equals(loser_2.preference, 0.0)) - self.assertTrue(self.float_equals(loser_3.preference, 0.0)) - - def test_store_weighted_decay(self) -> None: - """ - Test result decay after updating. - """ - self.layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20), - InfoHash(b"\x02" * 20), - InfoHash(b"\x03" * 20)}) - self.layer.store("test query", InfoHash(b"\x01" * 20), {InfoHash(b"\x00" * 20), - InfoHash(b"\x02" * 20), - InfoHash(b"\x03" * 20)}) - - with db_session(): - queries = self.layer.Query.select()[:] - entry_1, = self.layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:] - entry_2, = self.layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:] - entry_3, = self.layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:] - entry_4, = self.layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:] - - self.assertEqual(1, len(queries)) - self.assertEqual("test query", queries[0].query) - self.assertTrue(self.float_equals(entry_1.preference, 0.2)) - self.assertTrue(self.float_equals(entry_2.preference, 0.8)) - self.assertTrue(self.float_equals(entry_3.preference, 0.0)) - self.assertTrue(self.float_equals(entry_4.preference, 0.0)) - - def test_store_delete_old(self) -> None: - """ - Test result decay after updating. - """ - self.layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20), - InfoHash(b"\x02" * 20), - InfoHash(b"\x03" * 20)}) - self.layer.store("test query", InfoHash(b"\x04" * 20), {InfoHash(b"\x00" * 20), - InfoHash(b"\x01" * 20), - InfoHash(b"\x02" * 20)}) - - with db_session(): - queries = self.layer.Query.select()[:] - entry_1, = self.layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:] - entry_2, = self.layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:] - entry_3, = self.layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:] - should_be_dropped = self.layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:] - entry_4, = self.layer.InfohashPreference.select(lambda x: x.infohash == b"\x04" * 20)[:] - - self.assertEqual(1, len(queries)) - self.assertEqual("test query", queries[0].query) - self.assertTrue(self.float_equals(entry_1.preference, 0.2)) - self.assertTrue(self.float_equals(entry_2.preference, 0.0)) - self.assertTrue(self.float_equals(entry_3.preference, 0.0)) - self.assertEqual([], should_be_dropped) - self.assertTrue(self.float_equals(entry_4.preference, 0.8)) - - def test_store_delete_old_over_e(self) -> None: - """ - Test if entries are not deleted if their preference is still over the threshold e. - """ - self.layer.e = 0.0 - self.layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20), - InfoHash(b"\x02" * 20), - InfoHash(b"\x03" * 20)}) - self.layer.store("test query", InfoHash(b"\x04" * 20), {InfoHash(b"\x00" * 20), - InfoHash(b"\x01" * 20), - InfoHash(b"\x02" * 20)}) - - with db_session(): - queries = self.layer.Query.select()[:] - entry_1, = self.layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:] - entry_2, = self.layer.InfohashPreference.select(lambda x: x.infohash == b"\x01" * 20)[:] - entry_3, = self.layer.InfohashPreference.select(lambda x: x.infohash == b"\x02" * 20)[:] - entry_4, = self.layer.InfohashPreference.select(lambda x: x.infohash == b"\x03" * 20)[:] - entry_5, = self.layer.InfohashPreference.select(lambda x: x.infohash == b"\x04" * 20)[:] - - self.assertEqual(1, len(queries)) - self.assertEqual("test query", queries[0].query) - self.assertTrue(self.float_equals(entry_1.preference, 0.2)) - self.assertTrue(self.float_equals(entry_2.preference, 0.0)) - self.assertTrue(self.float_equals(entry_3.preference, 0.0)) - self.assertTrue(self.float_equals(entry_4.preference, 0.0)) - self.assertTrue(self.float_equals(entry_5.preference, 0.8)) - - def test_store_external_imbalanced(self) -> None: - """ - Test if imbalanced infohash and weight lists are rejected. - """ - self.layer.store_external("test query", [b"\x00" * 20], [], public_key=b"123") - - with db_session(): - queries = self.layer.Query.select()[:] - - self.assertEqual(0, len(queries)) - - def test_store_external_one(self) -> None: - """ - Test if an external entry is stored in the database. - """ - self.layer.store_external("test query", [b"\x00" * 20], [1.0], public_key=b"123") - - with db_session(): - queries = self.layer.Query.select()[:] - entry_1, = self.layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:] - - self.assertEqual(1, len(queries)) - self.assertEqual("test query", queries[0].query) - self.assertTrue(self.float_equals(entry_1.preference, 1.0)) - - def test_store_external_update(self) -> None: - """ - Test if an external entry can be updated in the database. - """ - self.layer.store_external("test query", [b"\x00" * 20], [1.0], public_key=b"123") - self.layer.store_external("test query", [b"\x00" * 20], [0.0], public_key=b"123") - - with db_session(): - queries = self.layer.Query.select()[:] - entry_1, = self.layer.InfohashPreference.select(lambda x: x.infohash == b"\x00" * 20)[:] - - self.assertEqual(1, len(queries)) - self.assertEqual("test query", queries[0].query) - self.assertTrue(self.float_equals(entry_1.preference, 0.0)) - - def test_store_external_many(self) -> None: - """ - Test if external entries are stored in the database. - """ - self.layer.store_external("test query", [b"\x00" * 20, b"\x01" * 20], [0.5, 0.0], public_key=b"123") - self.layer.store_external("test query", [b"\x00" * 20], [1.0], public_key=b"456") - self.layer.store_external("test query 2", [b"\x00" * 20, b"\x01" * 20], [0.0, 0.75], public_key=b"789") - - with db_session(): - queries = self.layer.Query.select()[:] - - self.assertEqual(3, len(queries)) - self.assertSetEqual({"test query", "test query 2"}, {query.query for query in queries}) - - def test_get_preferable(self) -> None: - """ - Test if a preferable infohash is correctly retrieved. - """ - self.layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20)}) - - self.assertEqual(b"\x00" * 20, self.layer.get_preferable(InfoHash(b"\x00" * 20))) - - def test_get_preferable_already_best(self) -> None: - """ - Test if a infohash returns itself when it is preferable. - """ - self.layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20)}) - - self.assertEqual(b"\x00" * 20, self.layer.get_preferable(InfoHash(b"\x01" * 20))) - - def test_get_preferable_unknown(self) -> None: - """ - Test if a infohash returns itself when it has no known preferable infohashes. - """ - self.layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20)}) - - self.assertEqual(b"\x02" * 20, self.layer.get_preferable(InfoHash(b"\x02" * 20))) - - def test_get_random(self) -> None: - """ - Test if the preferred infohash always gets returned from a random checked selection. - """ - self.layer.store("test query", InfoHash(b"\x00" * 20), {InfoHash(b"\x01" * 20), InfoHash(b"\x02" * 20)}) - self.layer.store("test query", InfoHash(b"\x01" * 20), {InfoHash(b"\x00" * 20), InfoHash(b"\x02" * 20)}) - - random_selection = self.layer.get_preferable_to_random(limit=1) - - self.assertEqual(1, len(random_selection)) - self.assertEqual(b"\x01" * 20, next(iter(random_selection))) - - def test_get_random_query_aggregate(self) -> None: - """ - Test if aggregates are created correctly. - """ - self.layer.store_external("test query", [b"\x00" * 20, b"\x01" * 20], [0.5, 0.0], public_key=b"123") - self.layer.store_external("test query", [b"\x00" * 20], [1.0], public_key=b"456") - self.layer.store_external("test query 2", [b"\x00" * 20, b"\x01" * 20], [0.0, 0.75], public_key=b"789") - - query, infohashes, weights = self.layer.get_random_query_aggregate(0) - - self.assertIn(query, {"test query", "test query 2"}) - self.assertEqual(2, len(infohashes)) - self.assertEqual(2, len(weights)) - self.assertSetEqual({0.0, 0.75}, set(weights)) # (1.0 + 0.5)/2 and 0.0 (query 1) OR 0.0 and 0.75 (query 2) - - def test_get_random_query_aggregate_prefer_local(self) -> None: - """ - Test if local info is correctly retrieved. - """ - self.layer.store_external("test query", [b"\x00" * 20, b"\x01" * 20], [0.5, 0.0], public_key=b"") - self.layer.store_external("test query", [b"\x00" * 20], [1.0], public_key=b"456") - self.layer.store_external("test query", [b"\x00" * 20, b"\x01" * 20], [0.0, 0.75], public_key=b"789") - - query, infohashes, weights = self.layer.get_random_query_aggregate(0) - - self.assertIn(query, {"test query", "test query 2"}) - self.assertEqual(2, len(infohashes)) - self.assertEqual(2, len(weights)) - self.assertSetEqual({0.0, 0.5}, set(weights)) # Only 0.0 and 0.5 should be included diff --git a/src/tribler/test_unit/core/user_activity/__init__.py b/src/tribler/test_unit/core/user_activity/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/src/tribler/test_unit/core/user_activity/test_community.py b/src/tribler/test_unit/core/user_activity/test_community.py deleted file mode 100644 index c4a60e5396a..00000000000 --- a/src/tribler/test_unit/core/user_activity/test_community.py +++ /dev/null @@ -1,149 +0,0 @@ -from unittest.mock import AsyncMock, Mock, call - -from ipv8.test.base import TestBase -from ipv8.test.mocking.endpoint import MockEndpointListener - -from tribler.core.database.layers.user_activity import UserActivityLayer -from tribler.core.user_activity.community import UserActivityCommunity -from tribler.core.user_activity.payload import InfohashPreferencePayload, PullPreferencePayload - - -class TestUserActivityCommunity(TestBase[UserActivityCommunity]): - """ - Tests for the UserActivityCommunity class. - """ - - def setUp(self) -> None: - """ - Create two communities. - """ - super().setUp() - - self.initialize(UserActivityCommunity, 2) - - def database_manager(self, i: int) -> UserActivityLayer: - """ - Get the database manager of node i. - """ - return self.overlay(i).composition.manager.database_manager - - async def test_gossip_aggregate(self) -> None: - """ - Test if valid aggregates are gossiped to a random connected peer. - """ - self.overlay(0).composition.manager = AsyncMock() - self.overlay(1).composition.manager = AsyncMock() - self.database_manager(0).get_random_query_aggregate = Mock(return_value=( - "test", [b"\x00" * 20, b"\x01" * 20], [1.0, 2.0] - )) - - with self.assertReceivedBy(1, [InfohashPreferencePayload]) as received: - self.overlay(0).gossip() - await self.deliver_messages() - payload, = received - - self.assertEqual("test", payload.query) - self.assertListEqual([b"\x00" * 20, b"\x01" * 20], payload.infohashes) - self.assertListEqual([1.0, 2.0], payload.weights) - self.assertEqual(call(1), self.database_manager(0).get_random_query_aggregate.call_args) - self.assertEqual(call("test", [b"\x00" * 20, b"\x01" * 20], [1.0, 2.0], self.key_bin(0)), - self.database_manager(1).store_external.call_args) - self.assertEqual(call(b"\x01" * 20), self.overlay(1).composition.manager.check.call_args) - - async def test_gossip_no_aggregate(self) -> None: - """ - Test if missing aggregates are not gossiped. - """ - self.overlay(0).composition.manager = AsyncMock() - self.overlay(1).composition.manager = AsyncMock() - self.database_manager(0).get_random_query_aggregate = Mock(return_value=None) - - with self.assertReceivedBy(1, []): - self.overlay(0).gossip() - await self.deliver_messages() - - async def test_gossip_target_peer(self) -> None: - """ - Test if gossip can be sent to a target peer. - """ - self.add_node_to_experiment(self.create_node()) - self.overlay(0).composition.manager = AsyncMock() - self.overlay(1).composition.manager = AsyncMock() - self.overlay(2).composition.manager = AsyncMock() - self.database_manager(0).get_random_query_aggregate = Mock(return_value=( - "test", [b"\x00" * 20, b"\x01" * 20], [1.0, 2.0] - )) - - with self.assertReceivedBy(1, []), self.assertReceivedBy(2, [InfohashPreferencePayload]) as received: - self.overlay(0).gossip([self.peer(2)]) - await self.deliver_messages() - payload, = received - - self.assertEqual("test", payload.query) - self.assertListEqual([b"\x00" * 20, b"\x01" * 20], payload.infohashes) - self.assertListEqual([1.0, 2.0], payload.weights) - self.assertEqual(call(0), self.database_manager(0).get_random_query_aggregate.call_args) - self.assertEqual(call("test", [b"\x00" * 20, b"\x01" * 20], [1.0, 2.0], self.key_bin(0)), - self.database_manager(2).store_external.call_args) - self.assertEqual(call(b"\x01" * 20), self.overlay(2).composition.manager.check.call_args) - - async def test_pull_known_crawler(self) -> None: - """ - Test if a known crawler is allowed to crawl. - """ - self.overlay(0).composition.manager = AsyncMock() - self.overlay(1).composition.manager = AsyncMock() - self.overlay(1).composition.crawler_mid = self.mid(0) - self.database_manager(1).get_random_query_aggregate = Mock(return_value=( - "test", [b"\x00" * 20, b"\x01" * 20], [1.0, 2.0] - )) - - with self.assertReceivedBy(0, [InfohashPreferencePayload]) as received: - self.overlay(0).ez_send(self.peer(1), PullPreferencePayload(self.mid(1))) - await self.deliver_messages() - payload, = received - - self.assertEqual("test", payload.query) - self.assertListEqual([b"\x00" * 20, b"\x01" * 20], payload.infohashes) - self.assertListEqual([1.0, 2.0], payload.weights) - self.assertEqual(call(0), self.database_manager(1).get_random_query_aggregate.call_args) - - async def test_pull_unknown_crawler(self) -> None: - """ - Test if an unknown crawler does not receive any information. - """ - self.overlay(0).composition.manager = AsyncMock() - self.overlay(1).composition.manager = AsyncMock() - self.overlay(1).composition.crawler_mid = bytes(b ^ 255 for b in self.mid(0)) - self.database_manager(1).get_random_query_aggregate = Mock(return_value=( - "test", [b"\x00" * 20, b"\x01" * 20], [1.0, 2.0] - )) - - with self.assertReceivedBy(0, []): - self.overlay(0).ez_send(self.peer(1), PullPreferencePayload(self.mid(1))) - await self.deliver_messages() - - async def test_pull_replay_attack(self) -> None: - """ - Test if an unknown crawler does not receive any information. - """ - self.add_node_to_experiment(self.create_node()) - self.overlay(0).composition.manager = AsyncMock() - self.overlay(1).composition.manager = AsyncMock() - self.overlay(2).composition.manager = AsyncMock() - self.overlay(1).composition.crawler_mid = self.mid(0) - self.overlay(2).composition.crawler_mid = self.mid(0) - self.database_manager(1).get_random_query_aggregate = Mock(return_value=( - "test", [b"\x00" * 20, b"\x01" * 20], [1.0, 2.0] - )) - - packet_sniffer = MockEndpointListener(self.overlay(1).endpoint) - self.overlay(1).endpoint.add_listener(packet_sniffer) - self.overlay(0).ez_send(self.peer(1), PullPreferencePayload(self.mid(1))) - await self.deliver_messages() - packet, _ = packet_sniffer.received_packets - _, data = packet - - with self.assertReceivedBy(1, []): - self.endpoint(1).send(self.address(2), data) - await self.deliver_messages() diff --git a/src/tribler/test_unit/core/user_activity/test_manager.py b/src/tribler/test_unit/core/user_activity/test_manager.py deleted file mode 100644 index d741229aa3c..00000000000 --- a/src/tribler/test_unit/core/user_activity/test_manager.py +++ /dev/null @@ -1,210 +0,0 @@ -from asyncio import sleep -from binascii import hexlify -from unittest.mock import Mock, call - -from ipv8.taskmanager import TaskManager -from ipv8.test.base import TestBase - -from tribler.core.notifier import Notification, Notifier -from tribler.core.user_activity.manager import UserActivityManager -from tribler.core.user_activity.types import InfoHash - - -class TestUserActivityManager(TestBase): - """ - Tests for the UserActivityManager class. - """ - - def setUp(self) -> None: - """ - Create a new user activity manager. - """ - super().setUp() - self.task_manager = TaskManager() - self.session = Mock( - notifier=Notifier(), - db=Mock(), - torrent_checker=Mock() - ) - self.manager = UserActivityManager(self.task_manager, self.session, 500) - self.task_manager.cancel_pending_task("Check preferable") - - async def tearDown(self) -> None: - """ - Stop the task manager. - """ - await self.task_manager.shutdown_task_manager() - await super().tearDown() - - async def test_notify_local_query_results(self) -> None: - """ - Test that local query notifications get processed correctly. - """ - fake_infohashes = [InfoHash(bytes([i]) * 20) for i in range(2)] - fake_torrent_metadata = [{"infohash": fake_infohashes[i]} for i in range(2)] - fake_query = "test query" - - self.session.notifier.notify(Notification.local_query_results, - query=fake_query, results=fake_torrent_metadata) - await sleep(0) - - self.assertIn(fake_query, self.manager.queries) - self.assertIn(fake_infohashes[0], self.manager.infohash_to_queries) - self.assertIn(fake_infohashes[1], self.manager.infohash_to_queries) - self.assertIn(fake_query, self.manager.infohash_to_queries[fake_infohashes[0]]) - self.assertIn(fake_query, self.manager.infohash_to_queries[fake_infohashes[1]]) - - async def test_notify_remote_query_results(self) -> None: - """ - Test that remote query notifications get processed correctly. - """ - fake_infohashes = [InfoHash(bytes([i]) * 20) for i in range(2)] - fake_torrent_metadata = [{"infohash": fake_infohashes[i]} for i in range(2)] - fake_query = "test query" - - self.session.notifier.notify(Notification.remote_query_results, - query=fake_query, results=fake_torrent_metadata, uuid='123', peer=[]) - await sleep(0) - - self.assertIn(fake_query, self.manager.queries) - self.assertIn(fake_infohashes[0], self.manager.infohash_to_queries) - self.assertIn(fake_infohashes[1], self.manager.infohash_to_queries) - self.assertIn(fake_query, self.manager.infohash_to_queries[fake_infohashes[0]]) - self.assertIn(fake_query, self.manager.infohash_to_queries[fake_infohashes[1]]) - - async def test_notify_local_query_results_overflow(self) -> None: - """ - Test that local query notifications do not go beyond the maximum history. - Old information should be purged. However, infohashes should not be purged if they are still in use. - """ - self.manager.max_query_history = 1 - - fake_infohashes = [InfoHash(bytes([i]) * 20) for i in range(2)] - fake_torrent_metadata = [{"infohash": fake_infohashes[i]} for i in range(2)] - fake_query_1 = "test query 1" - fake_query_2 = "test query 2" - - self.session.notifier.notify(Notification.local_query_results, - query=fake_query_1, results=fake_torrent_metadata) - await sleep(0) - self.session.notifier.notify(Notification.local_query_results, - query=fake_query_2, results=fake_torrent_metadata[:1]) - await sleep(0) - - self.assertNotIn(fake_query_1, self.manager.queries) - self.assertIn(fake_query_2, self.manager.queries) - self.assertIn(fake_infohashes[0], self.manager.infohash_to_queries) - self.assertNotIn(fake_infohashes[1], self.manager.infohash_to_queries) - self.assertNotIn(fake_query_1, self.manager.infohash_to_queries[fake_infohashes[0]]) - self.assertIn(fake_query_2, self.manager.infohash_to_queries[fake_infohashes[0]]) - self.assertNotIn(fake_query_1, self.manager.infohash_to_queries[fake_infohashes[1]]) - self.assertNotIn(fake_query_2, self.manager.infohash_to_queries[fake_infohashes[1]]) - - async def test_notify_remote_query_results_overflow(self) -> None: - """ - Test that remote query notifications do not go beyond the maximum history. - Old information should be purged. However, infohashes should not be purged if they are still in use. - """ - self.manager.max_query_history = 1 - - fake_infohashes = [InfoHash(bytes([i]) * 20) for i in range(2)] - fake_torrent_metadata = [{"infohash": fake_infohashes[i]} for i in range(2)] - fake_query_1 = "test query 1" - fake_query_2 = "test query 2" - - self.session.notifier.notify(Notification.remote_query_results, - query=fake_query_1, results=fake_torrent_metadata, uuid='123', peer=[]) - await sleep(0) - self.session.notifier.notify(Notification.remote_query_results, - query=fake_query_2, results=fake_torrent_metadata[:1], uuid='123', peer=[]) - await sleep(0) - - self.assertNotIn(fake_query_1, self.manager.queries) - self.assertIn(fake_query_2, self.manager.queries) - self.assertIn(fake_infohashes[0], self.manager.infohash_to_queries) - self.assertNotIn(fake_infohashes[1], self.manager.infohash_to_queries) - self.assertNotIn(fake_query_1, self.manager.infohash_to_queries[fake_infohashes[0]]) - self.assertIn(fake_query_2, self.manager.infohash_to_queries[fake_infohashes[0]]) - self.assertNotIn(fake_query_1, self.manager.infohash_to_queries[fake_infohashes[1]]) - self.assertNotIn(fake_query_2, self.manager.infohash_to_queries[fake_infohashes[1]]) - - async def test_notify_finished_untracked(self) -> None: - """ - Test that an untracked infohash does not lead to any information being stored. - """ - fake_infohash = InfoHash(b'\x00' * 20) - untracked_fake_infohash = InfoHash(b'\x01' * 20) - fake_query = "test query" - self.manager.queries[fake_query] = {fake_infohash} - self.manager.infohash_to_queries[fake_infohash] = [fake_query] - - self.session.notifier.notify(Notification.torrent_finished, - infohash=hexlify(untracked_fake_infohash).decode(), - name="test torrent", - hidden=False) - await sleep(0) - - self.assertFalse(self.manager.task_manager.is_pending_task_active("Store query")) - self.assertEqual(None, self.manager.database_manager.store.call_args) - - async def test_notify_finished_tracked(self) -> None: - """ - Test that a tracked infohash leads to information being stored. - """ - fake_infohash = InfoHash(b'\x00' * 20) - fake_query = "test query" - self.manager.queries[fake_query] = {fake_infohash} - self.manager.infohash_to_queries[fake_infohash] = [fake_query] - - self.session.notifier.notify(Notification.torrent_finished, - infohash=hexlify(fake_infohash).decode(), name="test torrent", hidden=False) - await sleep(0) - await self.manager.task_manager.wait_for_tasks() - - self.assertEqual(call(fake_query, fake_infohash, set()), self.manager.database_manager.store.call_args) - - async def test_check_preferable_zero(self) -> None: - """ - Test that checking without available random torrents leads to no checks. - """ - self.manager.database_manager.get_preferable_to_random = Mock(return_value={}) - - self.manager.check_preferable() - await sleep(0) - - self.assertEqual(None, self.manager.torrent_checker.check_torrent_health.call_args) - - async def test_check_preferable_one(self) -> None: - """ - Test that checking with one available random torrent leads to one check. - """ - fake_infohash = InfoHash(b'\x00' * 20) - self.manager.database_manager.get_preferable_to_random = Mock(return_value={fake_infohash}) - - self.manager.check_preferable() - await sleep(0) - - self.assertEqual(call(fake_infohash), self.manager.torrent_checker.check_torrent_health.call_args) - - async def test_check_preferable_multiple(self) -> None: - """ - Test that checking with multiple available random torrents leads to as many checks. - """ - fake_infohashes = {InfoHash(bytes([i]) * 20) for i in range(10)} - self.manager.database_manager.get_preferable_to_random = Mock(return_value=fake_infohashes) - - self.manager.check_preferable() - await sleep(0) - - self.assertEqual(10, self.manager.torrent_checker.check_torrent_health.call_count) - - async def test_check(self) -> None: - """ - Test that checking an infohash schedules a check. - """ - fake_infohash = InfoHash(b'\x00' * 20) - - self.manager.check(fake_infohash) - await sleep(0) - - self.assertEqual(call(fake_infohash), self.manager.torrent_checker.check_torrent_health.call_args) diff --git a/src/tribler/tribler_config.py b/src/tribler/tribler_config.py index 99b9fd90976..57c0ba11a6f 100644 --- a/src/tribler/tribler_config.py +++ b/src/tribler/tribler_config.py @@ -136,16 +136,6 @@ class TunnelCommunityConfig(TypedDict): max_circuits: int -class UserActivityConfig(TypedDict): - """ - Settings for the user activity component. - """ - - enabled: bool - max_query_history: int - health_check_interval: float - - class TriblerConfig(TypedDict): """ The main Tribler settings and all of its components' sub-settings. @@ -163,7 +153,6 @@ class TriblerConfig(TypedDict): rendezvous: RendezvousConfig torrent_checker: TorrentCheckerConfig tunnel_community: TunnelCommunityConfig - user_activity: UserActivityConfig versioning: VersioningConfig state_dir: str @@ -220,7 +209,6 @@ class TriblerConfig(TypedDict): "rendezvous": RendezvousConfig(enabled=True), "torrent_checker": TorrentCheckerConfig(enabled=True), "tunnel_community": TunnelCommunityConfig(enabled=True, min_circuits=3, max_circuits=8), - "user_activity": UserActivityConfig(enabled=True, max_query_history=500, health_check_interval=5.0), "versioning": VersioningConfig(enabled=True), "state_dir": str((Path(os.environ.get("APPDATA", "~")) / ".Tribler").expanduser().absolute()), diff --git a/src/tribler/ui/src/models/settings.model.tsx b/src/tribler/ui/src/models/settings.model.tsx index b3434a73212..3aa101bad8b 100644 --- a/src/tribler/ui/src/models/settings.model.tsx +++ b/src/tribler/ui/src/models/settings.model.tsx @@ -89,11 +89,6 @@ export interface Settings { min_circuits: number; max_circuits: number; }, - user_activity: { - enabled: boolean; - max_query_history: number; - health_check_interval: number; - }, state_dir: string; memory_db: boolean; ui: GuiSettings;