diff --git a/scripts/exit_node/run_exit_node.py b/scripts/exit_node/run_exit_node.py index 2c802883122..c7c220337ce 100644 --- a/scripts/exit_node/run_exit_node.py +++ b/scripts/exit_node/run_exit_node.py @@ -58,7 +58,7 @@ def make_config(options) -> TriblerConfig: config.ipv8.address = options.ipv8_address config.dht.enabled = True config.tunnel_community.exitnode_enabled = bool(options.exit) - config.popularity_community.enabled = False + config.content_discovery_community.enabled = False config.tunnel_community.testnet = bool(options.testnet) config.chant.enabled = False config.bootstrap.enabled = False diff --git a/src/tribler/core/components/conftest.py b/src/tribler/core/components/conftest.py index 0eb9a89fcb6..1f6fadea4d2 100644 --- a/src/tribler/core/components/conftest.py +++ b/src/tribler/core/components/conftest.py @@ -36,7 +36,7 @@ def tribler_config(tmp_path) -> TriblerConfig: config.libtorrent.enabled = False config.libtorrent.dht_readiness_timeout = 0 config.tunnel_community.enabled = False - config.popularity_community.enabled = False + config.content_discovery_community.enabled = False config.dht.enabled = False config.libtorrent.dht = False config.chant.enabled = False diff --git a/src/tribler/core/components/metadata_store/remote_query_community/__init__.py b/src/tribler/core/components/content_discovery/__init__.py similarity index 100% rename from src/tribler/core/components/metadata_store/remote_query_community/__init__.py rename to src/tribler/core/components/content_discovery/__init__.py diff --git a/src/tribler/core/components/metadata_store/remote_query_community/tests/__init__.py b/src/tribler/core/components/content_discovery/community/__init__.py similarity index 100% rename from src/tribler/core/components/metadata_store/remote_query_community/tests/__init__.py rename to src/tribler/core/components/content_discovery/community/__init__.py diff --git a/src/tribler/core/components/content_discovery/community/cache.py b/src/tribler/core/components/content_discovery/community/cache.py new file mode 100644 index 00000000000..abbaca12857 --- /dev/null +++ b/src/tribler/core/components/content_discovery/community/cache.py @@ -0,0 +1,36 @@ +from asyncio import Future + +from ipv8.requestcache import RandomNumberCache +from tribler.core.components.metadata_store.utils import RequestTimeoutException + + +class SelectRequest(RandomNumberCache): + def __init__(self, request_cache, prefix, request_kwargs, peer, processing_callback=None, timeout_callback=None): + super().__init__(request_cache, prefix) + self.request_kwargs = request_kwargs + # The callback to call on results of processing of the response payload + self.processing_callback = processing_callback + # The maximum number of packets to receive from any given peer from a single request. + # This limit is imposed as a safety precaution to prevent spam/flooding + self.packets_limit = 10 + + self.peer = peer + # Indicate if at least a single packet was returned by the queried peer. + self.peer_responded = False + + self.timeout_callback = timeout_callback + + def on_timeout(self): + if self.timeout_callback is not None: + self.timeout_callback(self) + + +class EvaSelectRequest(SelectRequest): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + # For EVA transfer it is meaningless to send more than one message + self.packets_limit = 1 + + self.processing_results = Future() + self.register_future(self.processing_results, on_timeout=RequestTimeoutException()) diff --git a/src/tribler/core/components/content_discovery/community/content_discovery_community.py b/src/tribler/core/components/content_discovery/community/content_discovery_community.py new file mode 100644 index 00000000000..cfb2f0648b2 --- /dev/null +++ b/src/tribler/core/components/content_discovery/community/content_discovery_community.py @@ -0,0 +1,421 @@ +from __future__ import annotations + +import json +import random +import struct +import sys +import time +import uuid +from binascii import unhexlify +from itertools import count +from typing import Any, Dict, List, Optional, Set + +from ipv8.types import Peer +from pony.orm import OperationalError, db_session + +from ipv8.community import Community +from ipv8.lazy_community import lazy_wrapper +from ipv8.requestcache import RequestCache +from tribler.core import notifications +from tribler.core.components.content_discovery.community.cache import EvaSelectRequest, SelectRequest +from tribler.core.components.content_discovery.community.payload import ( + PopularTorrentsRequest, + RemoteSelectPayload, + RemoteSelectPayloadEva, + SelectResponsePayload, + TorrentsHealthPayload, + VersionRequest, + VersionResponse +) +from tribler.core.components.content_discovery.community.settings import ContentDiscoverySettings +from tribler.core.components.ipv8.eva.protocol import EVAProtocol +from tribler.core.components.ipv8.eva.result import TransferResult +from tribler.core.components.knowledge.community.knowledge_validator import is_valid_resource +from tribler.core.components.metadata_store.db.orm_bindings.torrent_metadata import LZ4_EMPTY_ARCHIVE, entries_to_chunk +from tribler.core.components.metadata_store.db.store import ObjState +from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HealthInfo +from tribler.core.upgrade.tags_to_knowledge.previous_dbs.knowledge_db import ResourceType +from tribler.core.utilities.pony_utils import run_threaded +from tribler.core.utilities.unicode import hexlify +from tribler.core.utilities.utilities import get_normally_distributed_positive_integers +from tribler.core.version import version_id + + +class ContentDiscoveryCommunity(Community): + """ + Community for disseminating the content across the network. + + Push: + - Every 5 seconds it gossips 10 random torrents to a random peer. + Pull: + - Every time it receives an introduction request, it sends a request + to return their popular torrents. + + Gossiping is for checked torrents only. + """ + community_id = unhexlify('9aca62f878969c437da9844cba29a134917e1648') + settings_class = ContentDiscoverySettings + + def __init__(self, settings: ContentDiscoverySettings): + super().__init__(settings) + self.composition = settings + + self.add_message_handler(TorrentsHealthPayload, self.on_torrents_health) + self.add_message_handler(PopularTorrentsRequest, self.on_popular_torrents_request) + self.add_message_handler(VersionRequest, self.on_version_request) + self.add_message_handler(VersionResponse, self.on_version_response) + self.add_message_handler(RemoteSelectPayload, self.on_remote_select) + self.add_message_handler(RemoteSelectPayloadEva, self.on_remote_select_eva) + self.add_message_handler(SelectResponsePayload, self.on_remote_select_response) + + self.request_cache = RequestCache() + + self.eva = EVAProtocol(self, self.on_receive, self.on_send_complete, self.on_error) + self.remote_queries_in_progress = 0 + self.next_remote_query_num = count().__next__ # generator of sequential numbers, for logging & debug purposes + + self.logger.info('Content Discovery Community initialized (peer mid %s)', hexlify(self.my_peer.mid)) + self.register_task("gossip_random_torrents", self.gossip_random_torrents_health, + interval=self.composition.random_torrent_interval) + + async def unload(self): + await self.eva.shutdown() + await self.request_cache.shutdown() + await super().unload() + + def sanitize_dict(self, parameters: dict[str, Any], decode=True) -> None: + for field in self.composition.binary_fields: + value = parameters.get(field) + if value is not None: + parameters[field] = unhexlify(value) if decode else hexlify(value) + + def sanitize_query(self, query_dict: Dict[str, Any], cap=100) -> Dict[str, Any]: + sanitized_dict = dict(query_dict) + + # We impose a cap on max numbers of returned entries to prevent DDOS-like attacks + first = sanitized_dict.get("first", None) or 0 + last = sanitized_dict.get("last", None) + last = last if (last is not None and last <= (first + cap)) else (first + cap) + sanitized_dict.update({"first": first, "last": last}) + + # convert hex fields to binary + self.sanitize_dict(sanitized_dict, decode=True) + + return sanitized_dict + + def convert_to_json(self, parameters): + sanitized = dict(parameters) + # Convert frozenset to string + if "metadata_type" in sanitized: + sanitized["metadata_type"] = [int(mt) for mt in sanitized["metadata_type"] if mt] + + self.sanitize_dict(sanitized, decode=False) + + if "origin_id" in parameters: + sanitized["origin_id"] = int(parameters["origin_id"]) + + return json.dumps(sanitized) + + def get_alive_checked_torrents(self) -> List[HealthInfo]: + if not self.composition.torrent_checker: + return [] + + # Filter torrents that have seeders + return [health for health in self.composition.torrent_checker.torrents_checked.values() if + health.seeders > 0 and health.leechers >= 0] + + def gossip_random_torrents_health(self): + """ + Gossip random torrent health information to another peer. + """ + if not self.get_peers() or not self.composition.torrent_checker: + return + + self.ez_send(random.choice(self.get_peers()), TorrentsHealthPayload.create(self.get_random_torrents(), {})) + + @lazy_wrapper(TorrentsHealthPayload) + async def on_torrents_health(self, peer, payload): + self.logger.debug(f"Received torrent health information for " + f"{len(payload.torrents_checked)} popular torrents and" + f" {len(payload.random_torrents)} random torrents") + + health_tuples = payload.random_torrents + payload.torrents_checked + health_list = [HealthInfo(infohash, last_check=last_check, seeders=seeders, leechers=leechers) + for infohash, seeders, leechers, last_check in health_tuples] + + for infohash in await run_threaded(self.composition.metadata_store.db, self.process_torrents_health, + health_list): + # Get a single result per infohash to avoid duplicates + self.send_remote_select(peer=peer, infohash=infohash, last=1) + + @db_session + def process_torrents_health(self, health_list: List[HealthInfo]): + infohashes_to_resolve = set() + for health in health_list: + added = self.composition.metadata_store.process_torrent_health(health) + if added: + infohashes_to_resolve.add(health.infohash) + return infohashes_to_resolve + + @lazy_wrapper(PopularTorrentsRequest) + async def on_popular_torrents_request(self, peer, payload): + self.logger.debug("Received popular torrents health request") + popular_torrents = self.get_likely_popular_torrents() + self.ez_send(peer, TorrentsHealthPayload.create({}, popular_torrents)) + + def get_likely_popular_torrents(self) -> List[HealthInfo]: + checked_and_alive = self.get_alive_checked_torrents() + if not checked_and_alive: + return [] + + num_torrents = len(checked_and_alive) + num_torrents_to_send = min(self.composition.random_torrent_count, num_torrents) + likely_popular_indices = self._get_likely_popular_indices(num_torrents_to_send, num_torrents) + + sorted_torrents = sorted(list(checked_and_alive), key=lambda health: -health.seeders) + likely_popular_torrents = [sorted_torrents[i] for i in likely_popular_indices] + return likely_popular_torrents + + def _get_likely_popular_indices(self, size, limit) -> List[int]: + """ + Returns a list of indices favoring the lower value numbers. + + Assuming lower indices being more popular than higher value indices, the returned list + favors the lower indexed popular values. + @param size: Number of indices to return + @param limit: Max number of indices that can be returned. + @return: List of non-repeated positive indices. + """ + return get_normally_distributed_positive_integers(size=size, upper_limit=limit) + + def get_random_torrents(self) -> List[HealthInfo]: + checked_and_alive = list(self.get_alive_checked_torrents()) + if not checked_and_alive: + return [] + + num_torrents = len(checked_and_alive) + num_torrents_to_send = min(self.composition.random_torrent_count, num_torrents) + + random_torrents = random.sample(checked_and_alive, num_torrents_to_send) + return random_torrents + + def get_random_peers(self, sample_size=None): + # Randomly sample sample_size peers from the complete list of our peers + all_peers = self.get_peers() + return random.sample(all_peers, min(sample_size or len(all_peers), len(all_peers))) + + def send_search_request(self, **kwargs): + # Send a remote query request to multiple random peers to search for some terms + request_uuid = uuid.uuid4() + + def notify_gui(request, processing_results): + results = [ + r.md_obj.to_simple_dict() + for r in processing_results + if r.obj_state == ObjState.NEW_OBJECT + ] + if self.composition.notifier: + self.composition.notifier[notifications.remote_query_results]( + {"results": results, "uuid": str(request_uuid), "peer": hexlify(request.peer.mid)}) + + peers_to_query = self.get_random_peers(self.composition.max_query_peers) + + for p in peers_to_query: + self.send_remote_select(p, **kwargs, processing_callback=notify_gui) + + return request_uuid, peers_to_query + + def send_version_request(self, peer): + self.logger.info(f"Sending version request to {peer.address}") + self.ez_send(peer, VersionRequest()) + + @lazy_wrapper(VersionRequest) + async def on_version_request(self, peer, _): + self.logger.info(f"Received version request from {peer.address}") + version_response = VersionResponse(version_id, sys.platform) + self.ez_send(peer, version_response) + + @lazy_wrapper(VersionResponse) + async def on_version_response(self, peer, payload): + self.logger.info(f"Received version response from {peer.address}") + self.process_version_response(peer, payload.version, payload.platform) + + def process_version_response(self, peer, version, platform): + """ + This is the method the implementation community or the experiment will implement + to process the version and platform information. + """ + + async def on_receive(self, result: TransferResult): + self.logger.debug(f"EVA data received: peer {hexlify(result.peer.mid)}, info {result.info}") + packet = (result.peer.address, result.data) + self.on_packet(packet) + + async def on_send_complete(self, result: TransferResult): + self.logger.debug(f"EVA outgoing transfer complete: peer {hexlify(result.peer.mid)}, info {result.info}") + + async def on_error(self, peer, exception): + self.logger.warning(f"EVA transfer error:{exception.__class__.__name__}:{exception}, Peer: {hexlify(peer.mid)}") + + def send_remote_select(self, peer, processing_callback=None, force_eva_response=False, **kwargs): + request_class = EvaSelectRequest if force_eva_response else SelectRequest + request = request_class( + self.request_cache, + hexlify(peer.mid), + kwargs, + peer, + processing_callback=processing_callback, + timeout_callback=self._on_query_timeout, + ) + self.request_cache.add(request) + + self.logger.debug(f"Select to {hexlify(peer.mid)} with ({kwargs})") + args = (request.number, self.convert_to_json(kwargs).encode('utf8')) + if force_eva_response: + self.ez_send(peer, RemoteSelectPayloadEva(*args)) + else: + self.ez_send(peer, RemoteSelectPayload(*args)) + return request + + def should_limit_rate_for_query(self, sanitized_parameters: Dict[str, Any]) -> bool: + return 'txt_filter' in sanitized_parameters + + async def process_rpc_query_rate_limited(self, sanitized_parameters: Dict[str, Any]) -> List: + query_num = self.next_remote_query_num() + if self.remote_queries_in_progress and self.should_limit_rate_for_query(sanitized_parameters): + self.logger.warning(f'Ignore remote query {query_num} as another one is already processing. ' + f'The ignored query: {sanitized_parameters}') + return [] + + self.logger.info(f'Process remote query {query_num}: {sanitized_parameters}') + self.remote_queries_in_progress += 1 + t = time.time() + try: + return await self.process_rpc_query(sanitized_parameters) + finally: + self.remote_queries_in_progress -= 1 + self.logger.info(f'Remote query {query_num} processed in {time.time() - t} seconds: {sanitized_parameters}') + + async def process_rpc_query(self, sanitized_parameters: Dict[str, Any]) -> List: + """ + Retrieve the result of a database query from a third party, encoded as raw JSON bytes (through `dumps`). + :raises TypeError: if the JSON contains invalid keys. + :raises ValueError: if no JSON could be decoded. + :raises pony.orm.dbapiprovider.OperationalError: if an illegal query was performed. + """ + if self.composition.tribler_db: + # tags should be extracted because `get_entries_threaded` doesn't expect them as a parameter + tags = sanitized_parameters.pop('tags', None) + + infohash_set = await run_threaded(self.composition.tribler_db.instance, self.search_for_tags, tags) + if infohash_set: + sanitized_parameters['infohash_set'] = {bytes.fromhex(s) for s in infohash_set} + + return await self.composition.metadata_store.get_entries_threaded(**sanitized_parameters) + + @db_session + def search_for_tags(self, tags: Optional[List[str]]) -> Optional[Set[str]]: + if not tags or not self.composition.tribler_db: + return None + valid_tags = {tag for tag in tags if is_valid_resource(tag)} + result = self.composition.tribler_db.knowledge.get_subjects_intersection( + subjects_type=ResourceType.TORRENT, + objects=valid_tags, + predicate=ResourceType.TAG, + case_sensitive=False + ) + return result + + def send_db_results(self, peer, request_payload_id, db_results, force_eva_response=False): + + # Special case of empty results list - sending empty lz4 archive + if len(db_results) == 0: + self.ez_send(peer, SelectResponsePayload(request_payload_id, LZ4_EMPTY_ARCHIVE)) + return + + index = 0 + while index < len(db_results): + transfer_size = ( + self.eva.settings.binary_size_limit if force_eva_response else self.composition.maximum_payload_size + ) + data, index = entries_to_chunk(db_results, transfer_size, start_index=index, include_health=True) + payload = SelectResponsePayload(request_payload_id, data) + if force_eva_response or (len(data) > self.composition.maximum_payload_size): + self.eva.send_binary(peer, struct.pack('>i', request_payload_id), + self.ezr_pack(payload.msg_id, payload)) + else: + self.ez_send(peer, payload) + + @lazy_wrapper(RemoteSelectPayloadEva) + async def on_remote_select_eva(self, peer, request_payload): + await self._on_remote_select_basic(peer, request_payload, force_eva_response=True) + + @lazy_wrapper(RemoteSelectPayload) + async def on_remote_select(self, peer, request_payload): + await self._on_remote_select_basic(peer, request_payload) + + def parse_parameters(self, json_bytes: bytes) -> Dict[str, Any]: + return self.sanitize_query(json.loads(json_bytes), self.composition.max_response_size) + + async def _on_remote_select_basic(self, peer, request_payload, force_eva_response=False): + try: + sanitized_parameters = self.parse_parameters(request_payload.json) + # Drop selects with deprecated queries + if any(param in sanitized_parameters for param in self.composition.deprecated_parameters): + self.logger.warning(f"Remote select with deprecated parameters: {sanitized_parameters}") + self.ez_send(peer, SelectResponsePayload(request_payload.id, LZ4_EMPTY_ARCHIVE)) + return + db_results = await self.process_rpc_query_rate_limited(sanitized_parameters) + + self.send_db_results(peer, request_payload.id, db_results, force_eva_response) + except (OperationalError, TypeError, ValueError) as error: + self.logger.error(f"Remote select. The error occurred: {error}") + + @lazy_wrapper(SelectResponsePayload) + async def on_remote_select_response(self, peer, response_payload): + """ + Match the response that we received from the network to a query cache + and process it by adding the corresponding entries to the MetadataStore database. + This processes both direct responses and pushback (updates) responses + """ + self.logger.debug(f"Response from {hexlify(peer.mid)}") + + request: SelectRequest | None = self.request_cache.get(hexlify(peer.mid), response_payload.id) + if request is None: + return + + # Check for limit on the number of packets per request + if request.packets_limit > 1: + request.packets_limit -= 1 + else: + self.request_cache.pop(hexlify(peer.mid), response_payload.id) + + processing_results = await self.composition.metadata_store.process_compressed_mdblob_threaded( + response_payload.raw_blob + ) + self.logger.debug(f"Response result: {processing_results}") + + if isinstance(request, EvaSelectRequest) and not request.processing_results.done(): + request.processing_results.set_result(processing_results) + + if isinstance(request, SelectRequest) and request.processing_callback: + request.processing_callback(request, processing_results) + + # Remember that at least a single packet was received from the queried peer. + if isinstance(request, SelectRequest): + request.peer_responded = True + + return processing_results + + def _on_query_timeout(self, request_cache): + if not request_cache.peer_responded: + self.logger.debug( + "Remote query timeout, deleting peer: %s %s %s", + str(request_cache.peer.address), + hexlify(request_cache.peer.mid), + str(request_cache.request_kwargs), + ) + self.network.remove_peer(request_cache.peer) + + def send_ping(self, peer: Peer) -> None: + self.send_introduction_request(peer) diff --git a/src/tribler/core/components/popularity/community/payload.py b/src/tribler/core/components/content_discovery/community/payload.py similarity index 70% rename from src/tribler/core/components/popularity/community/payload.py rename to src/tribler/core/components/content_discovery/community/payload.py index 74956f9f075..8a54bba9e6f 100644 --- a/src/tribler/core/components/popularity/community/payload.py +++ b/src/tribler/core/components/content_discovery/community/payload.py @@ -52,6 +52,51 @@ def create(cls, random_torrents_checked: List[HealthInfo], popular_torrents_chec random_torrent_tuples, popular_torrent_tuples) -@dataclass(msg_id=2) -class PopularTorrentsRequest: - pass +@vp_compile +class PopularTorrentsRequest(VariablePayload): + msg_id=2 + + +@vp_compile +class VersionRequest(VariablePayload): + msg_id = 101 + + +@vp_compile +class VersionResponse(VariablePayload): + msg_id = 102 + format_list = ['varlenI', 'varlenI'] + names = ['version', 'platform'] + + def fix_pack_version(self, value): + return value.encode('utf-8') + + def fix_pack_platform(self, value): + return value.encode('utf-8') + + @classmethod + def fix_unpack_version(cls, value): + return value.decode('utf-8') + + @classmethod + def fix_unpack_platform(cls, value): + return value.decode('utf-8') + + +@vp_compile +class RemoteSelectPayload(VariablePayload): + msg_id = 201 + format_list = ['I', 'varlenH'] + names = ['id', 'json'] + + +@vp_compile +class RemoteSelectPayloadEva(RemoteSelectPayload): + msg_id = 209 + + +@vp_compile +class SelectResponsePayload(VariablePayload): + msg_id = 202 + format_list = ['I', 'raw'] + names = ['id', 'raw_blob'] diff --git a/src/tribler/core/components/content_discovery/community/settings.py b/src/tribler/core/components/content_discovery/community/settings.py new file mode 100644 index 00000000000..922bdbe3f8b --- /dev/null +++ b/src/tribler/core/components/content_discovery/community/settings.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +from typing import Sequence + +from ipv8.community import CommunitySettings +from tribler.core.components.database.db.tribler_database import TriblerDatabase +from tribler.core.components.metadata_store.db.store import MetadataStore +from tribler.core.components.torrent_checker.torrent_checker.torrent_checker import TorrentChecker +from tribler.core.utilities.notifier import Notifier + + +class ContentDiscoverySettings(CommunitySettings): + random_torrent_interval: float = 5 # seconds + random_torrent_count: int = 10 + max_query_peers: int = 20 + maximum_payload_size: int = 1300 + max_response_size: int = 100 # Max number of entries returned by SQL query + + binary_fields: Sequence[str] = ("infohash", "channel_pk") + deprecated_parameters: Sequence[str] = ('subscribed', 'attribute_ranges', 'complete_channel') + + metadata_store: MetadataStore + torrent_checker: TorrentChecker + tribler_db: TriblerDatabase | None = None + notifier: Notifier | None = None diff --git a/src/tribler/core/components/popularity/__init__.py b/src/tribler/core/components/content_discovery/community/tests/__init__.py similarity index 100% rename from src/tribler/core/components/popularity/__init__.py rename to src/tribler/core/components/content_discovery/community/tests/__init__.py diff --git a/src/tribler/core/components/content_discovery/community/tests/test_content_discovery_community.py b/src/tribler/core/components/content_discovery/community/tests/test_content_discovery_community.py new file mode 100644 index 00000000000..7fdbcda4c9d --- /dev/null +++ b/src/tribler/core/components/content_discovery/community/tests/test_content_discovery_community.py @@ -0,0 +1,791 @@ +from __future__ import annotations + +import asyncio +import os +import string +import sys +import time +from asyncio import Future +from binascii import hexlify +from operator import attrgetter +from random import choices, randint, random +from typing import List +from unittest.mock import AsyncMock, Mock, patch + +import pytest + +from ipv8.keyvault.crypto import default_eccrypto +from ipv8.messaging.payload import IntroductionRequestPayload +from pony.orm import OperationalError, db_session + +from ipv8.messaging.serialization import default_serializer +from ipv8.test.base import TestBase +from ipv8.test.mocking.ipv8 import MockIPv8 +from tribler.core import notifications +from tribler.core.components.content_discovery.community.payload import SelectResponsePayload, VersionResponse, \ + TorrentsHealthPayload, PopularTorrentsRequest +from tribler.core.components.content_discovery.community.settings import ContentDiscoverySettings +from tribler.core.components.database.db.layers.knowledge_data_access_layer import KnowledgeDataAccessLayer, \ + ResourceType, SHOW_THRESHOLD +from tribler.core.components.database.db.layers.tests.test_knowledge_data_access_layer_base import \ + Resource, TestKnowledgeAccessLayerBase +from tribler.core.components.database.db.tribler_database import TriblerDatabase +from tribler.core.components.metadata_store.db.orm_bindings.torrent_metadata import LZ4_EMPTY_ARCHIVE, NEW +from tribler.core.components.metadata_store.db.serialization import CHANNEL_THUMBNAIL, NULL_KEY, REGULAR_TORRENT +from tribler.core.components.metadata_store.db.store import MetadataStore +from tribler.core.components.content_discovery.community.content_discovery_community import ContentDiscoveryCommunity +from tribler.core.components.torrent_checker.torrent_checker.torrent_checker import TorrentChecker +from tribler.core.components.torrent_checker.torrent_checker.torrentchecker_session import HealthInfo +from tribler.core.tests.tools.base_test import MockObject +from tribler.core.utilities.path_util import Path +from tribler.core.utilities.utilities import random_infohash +from tribler.core.version import version_id + + +def _generate_single_checked_torrent(status: str = None) -> HealthInfo: + """ + Assumptions + DEAD -> peers: 0 + POPULAR -> Peers: [101, 1000] + DEFAULT -> peers: [1, 100] # alive + """ + + def get_peers_for(health_status): + if health_status == 'DEAD': + return 0 + if health_status == 'POPULAR': + return randint(101, 1000) + return randint(1, 100) + + return HealthInfo(random_infohash(), seeders=get_peers_for(status), leechers=get_peers_for(status)) + + +def _generate_checked_torrents(count: int, status: str = None) -> List[HealthInfo]: + return [_generate_single_checked_torrent(status) for _ in range(count)] + + +def random_string(): + return ''.join(choices(string.ascii_uppercase + string.digits, k=100)) + + +def add_random_torrent(metadata_cls, name="test", seeders=None, leechers=None, last_check=None): + d = {"infohash": random_infohash(), "public_key": NULL_KEY, "title": name, "tags": "", "size": 1234, "status": NEW} + torrent_metadata = metadata_cls.from_dict(d) + if seeders: + torrent_metadata.health.seeders = seeders + if leechers: + torrent_metadata.health.leechers = leechers + if last_check: + torrent_metadata.health.last_check = last_check + return torrent_metadata + + +class TestContentDiscoveryCommunity(TestBase[ContentDiscoveryCommunity]): + NUM_NODES = 2 + + def setUp(self): + super().setUp() + self.count = 0 + self.metadata_store_set = set() + self.initialize(ContentDiscoveryCommunity, self.NUM_NODES) + + async def tearDown(self): + for metadata_store in self.metadata_store_set: + metadata_store.shutdown() + await super().tearDown() + + def create_node(self, settings: ContentDiscoverySettings | None = None, create_dht: bool = False, + enable_statistics: bool = False): + mds = MetadataStore(Path(self.temporary_directory()) / f"{self.count}", + Path(self.temporary_directory()), + default_eccrypto.generate_key("curve25519")) + self.metadata_store_set.add(mds) + torrent_checker = MockObject() + torrent_checker.torrents_checked = {} + + tribler_db = TriblerDatabase(str(Path(self.temporary_directory()) / "tags.db")) + + self.count += 1 + + return MockIPv8("curve25519", ContentDiscoveryCommunity, + ContentDiscoverySettings(metadata_store=mds, + torrent_checker=torrent_checker, + tribler_db=tribler_db)) + + @db_session + def fill_database(self, metadata_store, last_check_now=False): + for torrent_ind in range(5): + last_check = int(time.time()) if last_check_now else 0 + metadata_store.TorrentState( + infohash=str(torrent_ind).encode() * 20, seeders=torrent_ind + 1, last_check=last_check) + + async def init_first_node_and_gossip(self, checked_torrent_info: HealthInfo, deliver_timeout: float = 0.1): + self.torrent_checker(0).torrents_checked[checked_torrent_info.infohash] = checked_torrent_info + + await self.introduce_nodes() + + self.overlay(0).gossip_random_torrents_health() + + await self.deliver_messages(timeout=deliver_timeout) + + def metadata_store(self, i: int) -> MetadataStore: + return self.overlay(i).composition.metadata_store + + def torrent_checker(self, i: int) -> TorrentChecker: + return self.overlay(i).composition.torrent_checker + + def torrent_metadata(self, i): + return self.metadata_store(i).TorrentMetadata + + async def test_torrents_health_gossip(self): + """ + Test whether torrent health information is correctly gossiped around + """ + checked_torrent_info = HealthInfo(b'a' * 20, seeders=200, leechers=0) + + with db_session: + assert self.metadata_store(0).TorrentState.select().count() == 0 + assert self.metadata_store(1).TorrentState.select().count() == 0 + + await self.init_first_node_and_gossip(checked_torrent_info) + + # Check whether node 1 has new torrent health information + with db_session: + torrent = self.metadata_store(1).TorrentState.select().first() + assert torrent.infohash == checked_torrent_info.infohash + assert torrent.seeders == checked_torrent_info.seeders + assert torrent.leechers == checked_torrent_info.leechers + assert torrent.last_check == checked_torrent_info.last_check + + async def test_torrents_health_gossip_no_checker(self): + """ + Test whether no torrent health information is spread without a torrent checker. + """ + self.overlay(0).composition.torrent_checker = None + + with self.assertReceivedBy(1, [], message_filter = [TorrentsHealthPayload]): + self.overlay(0).gossip_random_torrents_health() + await self.deliver_messages() + + async def test_torrents_health_gossip_no_live(self): + """ + Test whether torrent health information is spread when no live torrents are known + """ + with self.assertReceivedBy(1, [TorrentsHealthPayload], message_filter = [TorrentsHealthPayload]) as received: + self.overlay(0).gossip_random_torrents_health() + await self.deliver_messages() + message, = received + + assert message.random_torrents_length == 0 + assert message.torrents_checked_length == 0 + + def test_get_alive_torrents(self): + dead_torrents = _generate_checked_torrents(100, 'DEAD') + popular_torrents = _generate_checked_torrents(100, 'POPULAR') + alive_torrents = _generate_checked_torrents(100) + + all_checked_torrents = dead_torrents + alive_torrents + popular_torrents + self.torrent_checker(0).torrents_checked.update( + {health.infohash: health for health in all_checked_torrents}) + + actual_alive_torrents = self.overlay(0).get_alive_checked_torrents() + assert len(actual_alive_torrents) == len(alive_torrents + popular_torrents) + + def test_get_alive_torrents_no_checker(self): + self.overlay(0).composition.torrent_checker = None + + assert [] == self.overlay(0).get_alive_checked_torrents() + + async def test_torrents_health_gossip_multiple(self): + """ + Test whether torrent health information is correctly gossiped around + """ + dead_torrents = _generate_checked_torrents(100, 'DEAD') + popular_torrents = _generate_checked_torrents(100, 'POPULAR') + alive_torrents = _generate_checked_torrents(100) + + all_checked_torrents = dead_torrents + alive_torrents + popular_torrents + + # Given, initially there are no torrents in the database + with db_session: + node0_count = self.metadata_store(0).TorrentState.select().count() + node1_count = self.metadata_store(1).TorrentState.select().count() + assert node0_count == 0 + assert node1_count == 0 + + # Setup, node 0 checks some torrents, both dead and alive (including popular ones). + self.torrent_checker(0).torrents_checked.update({health.infohash: health for health in all_checked_torrents}) + self.overlay(0).gossip_random_torrents_health() + await self.deliver_messages() + + # Since on introduction request callback, node asks for popular torrents, we expect that + # popular torrents are shared by node 0 to node 1. + with db_session: + node0_count = self.metadata_store(0).TorrentState.select().count() + node1_count = self.metadata_store(1).TorrentState.select().count() + + assert node0_count == 0 # Nothing received from Node 1 because it hasn't checked anything to share. + assert node1_count == self.overlay(1).composition.random_torrent_count + + node1_db_last_count = node1_count + + # Now, assuming Node 0 gossips random torrents to Node 1 multiple times to simulate periodic nature + for _ in range(10): + self.overlay(0).gossip_random_torrents_health() + await self.deliver_messages(timeout=0.1) + + # After gossip, Node 1 should have received some random torrents from Node 0. + # Note that random torrents can also include popular torrents sent during introduction + # and random torrents sent in earlier gossip since no state is maintained. + with db_session: + node0_count = self.metadata_store(0).TorrentState.select().count() + node1_count = self.metadata_store(1).TorrentState.select().count() + + assert node0_count == 0 # Still nothing received from Node 1 because it hasn't checked torrents + assert node1_count >= node1_db_last_count + + node1_db_last_count = node1_count + + async def test_torrents_health_update(self): + """ + Test updating the local torrent health information from network + """ + self.fill_database(self.metadata_store(1)) + + checked_torrent_info = HealthInfo(b'0' * 20, seeders=200, leechers=0) + await self.init_first_node_and_gossip(checked_torrent_info, deliver_timeout=0.5) + + # Check whether node 1 has new torrent health information + with db_session: + state = self.metadata_store(1).TorrentState.get(infohash=b'0' * 20) + self.assertIsNot(state.last_check, 0) + + async def test_unknown_torrent_query_back(self): + """ + Test querying sender for metadata upon receiving an unknown torrent + """ + + infohash = b'1' * 20 + with db_session: + self.metadata_store(0).TorrentMetadata(infohash=infohash) + await self.init_first_node_and_gossip( + HealthInfo(infohash, seeders=200, leechers=0)) + with db_session: + assert self.metadata_store(1).TorrentMetadata.get() + + async def test_skip_torrent_query_back_for_known_torrent(self): + # Test that we _don't_ send the query if we already know about the infohash + infohash = b'1' * 20 + with db_session: + self.metadata_store(0).TorrentMetadata(infohash=infohash) + self.metadata_store(1).TorrentMetadata(infohash=infohash) + self.overlay(1).send_remote_select = Mock() + + await self.init_first_node_and_gossip(HealthInfo(infohash, seeders=200, leechers=0)) + + self.overlay(1).send_remote_select.assert_not_called() + + async def test_popularity_search(self): + """ + Test searching several nodes for metadata entries based on title text + """ + with db_session: + # Add test metadata to node ID2 + self.torrent_metadata(1)(title="ubuntu torrent", infohash=random_infohash()) + self.torrent_metadata(1)(title="debian torrent", infohash=random_infohash()) + + notifier = Mock() + self.overlay(0).composition.notifier = {notifications.remote_query_results: notifier} + self.overlay(0).send_search_request(**{"txt_filter": "ubuntu*"}) + + await self.deliver_messages() + + notifier.assert_called() + + def test_version_response_payload(self): + """ + Check if the version response is correctly serialized. + """ + version = "v7.10.0" + platform = "linux" + + version_response = VersionResponse(version, platform) + serialized = default_serializer.pack_serializable(version_response) + deserialized, _ = default_serializer.unpack_serializable(VersionResponse, serialized) + + self.assertEqual(version_response.version, version) + self.assertEqual(version_response.platform, platform) + self.assertEqual(deserialized.version, version) + self.assertEqual(deserialized.platform, platform) + + async def test_request_for_version(self): + """ + Test whether version request is responded well. + """ + await self.introduce_nodes() + + on_process_version_response_called = Future() + + def on_process_version_response(peer, version, platform): + self.assertEqual(peer, self.peer(1)) + self.assertEqual(version, version_id) + self.assertEqual(platform, sys.platform) + on_process_version_response_called.set_result(True) + + self.overlay(0).process_version_response = on_process_version_response + self.overlay(0).send_version_request(self.peer(1)) + + return await on_process_version_response_called + + def test_search_for_tags_no_db(self): + # test that in case of missed `tribler_db`, function `search_for_tags` returns None + self.overlay(0).composition.tribler_db = None + assert self.overlay(0).search_for_tags(tags=['tag']) is None + + @patch.object(KnowledgeDataAccessLayer, 'get_subjects_intersection') + def test_search_for_tags_only_valid_tags(self, mocked_get_subjects_intersection: Mock): + # test that function `search_for_tags` uses only valid tags + self.overlay(0).search_for_tags(tags=['invalid_tag' * 50, 'valid_tag']) + mocked_get_subjects_intersection.assert_called_with( + subjects_type=ResourceType.TORRENT, + objects={'valid_tag'}, + predicate=ResourceType.TAG, + case_sensitive=False + ) + + @patch.object(MetadataStore, 'get_entries_threaded', new_callable=AsyncMock) + async def test_process_rpc_query_no_tags(self, mocked_get_entries_threaded: AsyncMock): + # test that in case of missed tags, the remote search works like normal remote search + parameters = {'first': 0, 'infohash_set': None, 'last': 100} + await self.overlay(0).process_rpc_query(parameters) + + expected_parameters = {'infohash_set': None} + expected_parameters.update(parameters) + mocked_get_entries_threaded.assert_called_with(**expected_parameters) + + async def test_process_rpc_query_with_tags(self): + # This is full test that checked whether search by tags works or not + # + # Test assumes that two databases were filled by the following data (TagsDatabase and MDS): + infohash1 = os.urandom(20) + infohash2 = os.urandom(20) + infohash3 = os.urandom(20) + + @db_session + def fill_tags_database(): + TestKnowledgeAccessLayerBase.add_operation_set( + self.overlay(0).composition.tribler_db, + { + hexlify(infohash1).decode(): [ + Resource(predicate=ResourceType.TAG, name='tag1', count=SHOW_THRESHOLD), + ], + hexlify(infohash2).decode(): [ + Resource(predicate=ResourceType.TAG, name='tag1', count=SHOW_THRESHOLD - 1), + ] + } + ) + + @db_session + def fill_mds(): + with db_session: + def _add(infohash): + torrent = {"infohash": infohash, "title": 'title', "tags": "", "size": 1, "status": NEW} + self.metadata_store(0).TorrentMetadata.from_dict(torrent) + + _add(infohash1) + _add(infohash2) + _add(infohash3) + + fill_tags_database() + fill_mds() + + # Then we try to query search for three tags: 'tag1', 'tag2', 'tag3' + parameters = {'first': 0, 'infohash_set': None, 'last': 100, 'tags': ['tag1']} + with db_session: + query_results = [r.to_dict() for r in await self.overlay(0).process_rpc_query(parameters)] + + # Expected results: only one infohash (b'infohash1') should be returned. + result_infohash_list = [r['infohash'] for r in query_results] + assert result_infohash_list == [infohash1] + + async def test_remote_select(self): + """ + Test querying metadata entries from a remote machine + """ + + # Fill Node 0 DB with channels and torrents + with db_session: + for i in range(20): + add_random_torrent( + self.torrent_metadata(0), + name=f"ubuntu {i}", + seeders=2 * i, + leechers=i, + last_check=int(time.time()) + i, + ) + + kwargs_dict = {"txt_filter": "ubuntu*", "metadata_type": [REGULAR_TORRENT]} + callback = Mock() + self.nodes[1].overlay.send_remote_select(self.nodes[0].my_peer, **kwargs_dict, processing_callback=callback) + + await self.deliver_messages(timeout=0.5) + # Test optional response processing callback + callback.assert_called() + + # All the matching torrent entries should have been sent to Node 1 + with db_session: + torrents0 = sorted(self.metadata_store(0).get_entries(**kwargs_dict), key=attrgetter('infohash')) + torrents1 = sorted(self.metadata_store(1).get_entries(**kwargs_dict), key=attrgetter('infohash')) + self.assertEqual(len(torrents0), len(torrents1)) + self.assertEqual(len(torrents0), 20) + for t0, t1 in zip(torrents0, torrents1): + assert t0.health.seeders == t1.health.seeders + assert t0.health.leechers == t1.health.leechers + assert t0.health.last_check == t1.health.last_check + + # Test getting empty response for a query + kwargs_dict = {"txt_filter": "ubuntu*", "origin_id": 352127} + callback = Mock() + self.overlay(1).send_remote_select(self.peer(0), **kwargs_dict, processing_callback=callback) + await self.deliver_messages(timeout=0.5) + callback.assert_called() + + async def test_remote_select_deprecated(self): + """ + Test deprecated search keys receiving an empty archive response. + """ + with self.assertReceivedBy(0, [SelectResponsePayload]) as responses: + self.overlay(0).send_remote_select(self.peer(1), subscribed=1) + await self.deliver_messages() + response, = responses + + assert response.raw_blob == LZ4_EMPTY_ARCHIVE + + @pytest.mark.timeout(10) + async def test_remote_select_torrents(self): + """ + Test dropping packets that go over the response limit for a remote select. + """ + with db_session: + torrent_infohash = random_infohash() + self.torrent_metadata(0)(infohash=torrent_infohash, public_key=NULL_KEY, title='title1') + + callback_called = asyncio.Event() + processing_results = [] + + def callback(_, results): + processing_results.extend(results) + callback_called.set() + + self.overlay(1).send_remote_select( + self.peer(0), metadata_type=[REGULAR_TORRENT], infohash=torrent_infohash, processing_callback=callback + ) + + await callback_called.wait() + + assert len(processing_results) == 1 + obj = processing_results[0].md_obj + assert isinstance(obj, self.metadata_store(1).TorrentMetadata) + assert obj.title == 'title1' + assert obj.health.seeders == 0 + + + async def test_remote_select_packets_limit(self): + """ + Test dropping packets that go over the response limit for a remote select. + """ + with db_session: + for _ in range(0, 100): + add_random_torrent(self.torrent_metadata(0), name=random_string()) + + kwargs_dict = {"metadata_type": [REGULAR_TORRENT]} + + def add_result(request, processing_results): + add_result.result_count += 1 + add_result.result_count = 0 + + expected = [SelectResponsePayload] + with self.assertReceivedBy(1, expected, message_filter=[SelectResponsePayload]) as received: + self.overlay(1).send_remote_select(self.peer(0), **kwargs_dict, processing_callback=add_result) + while len(received) < 11: # Packet limit + 1 + await self.deliver_messages() + if len(received) > len(expected): + expected.extend([SelectResponsePayload] * (len(received) - len(expected))) + + # Give asyncio some breathing room to process all the packets + while add_result.result_count < 10: + await asyncio.sleep(0.1) + + assert [] == self.overlay(1).request_cache.get_tasks() # The list of outstanding requests should be empty + assert add_result.result_count == 10 # The packet limit is 10 + + def test_sanitize_query(self): + req_response_list = [ + ({"first": None, "last": None}, {"first": 0, "last": 100}), + ({"first": 123, "last": None}, {"first": 123, "last": 223}), + ({"first": None, "last": 1000}, {"first": 0, "last": 100}), + ({"first": 100, "last": None}, {"first": 100, "last": 200}), + ({"first": 123}, {"first": 123, "last": 223}), + ({"last": 123}, {"first": 0, "last": 100}), + ({}, {"first": 0, "last": 100}), + ] + for req, resp in req_response_list: + assert self.overlay(0).sanitize_query(req) == resp + + def test_sanitize_query_binary_fields(self): + for field in ("infohash", "channel_pk"): + field_in_b = b'0' * 20 + field_in_hex = hexlify(field_in_b) + assert self.overlay(0).sanitize_query({field: field_in_hex})[field] == field_in_b + + async def test_unknown_query_attribute(self): + rqc_node1 = self.nodes[0].overlay + rqc_node2 = self.nodes[1].overlay + + # only the new attribute + rqc_node2.send_remote_select(rqc_node1.my_peer, **{'new_attribute': 'some_value'}) + await self.deliver_messages(timeout=0.1) + + # mixed: the old and a new attribute + rqc_node2.send_remote_select(rqc_node1.my_peer, **{'infohash': b'0' * 20, 'foo': 'bar'}) + await self.deliver_messages(timeout=0.1) + + async def test_process_rpc_query_match_many(self): + """ + Check if a correct query with a match in our database returns a result. + """ + with db_session: + add_random_torrent(self.torrent_metadata(0), name="torrent1") + add_random_torrent(self.torrent_metadata(0), name="torrent2") + + results = await self.overlay(0).process_rpc_query({}) + self.assertEqual(2, len(results)) + + torrent1_md, torrent2_md = results if results[0].title == "torrent1" else results[::-1] + self.assertEqual("torrent1", torrent1_md.title) + self.assertEqual("torrent2", torrent2_md.title) + + async def test_process_rpc_query_match_one(self): + """ + Check if a correct query with one match in our database returns one result. + """ + with db_session: + add_random_torrent(self.torrent_metadata(0), name="a torrent") + + results = await self.overlay(0).process_rpc_query({}) + self.assertEqual(1, len(results)) + + (torrent_md,) = results + self.assertEqual("a torrent", torrent_md.title) + + async def test_process_rpc_query_match_none(self): + """ + Check if a correct query with no match in our database returns no result. + """ + results = await self.overlay(0).process_rpc_query({}) + self.assertEqual(0, len(results)) + + def test_parse_parameters_match_empty_json(self): + """ + Check if processing an empty request causes a ValueError (JSONDecodeError) to be raised. + """ + with self.assertRaises(ValueError): + self.overlay(0).parse_parameters(b'') + + def test_parse_parameters_match_illegal_json(self): + """ + Check if processing a request with illegal JSON causes a UnicodeDecodeError to be raised. + """ + with self.assertRaises(UnicodeDecodeError): + self.overlay(0).parse_parameters(b'{"akey":\x80}') + + async def test_process_rpc_query_match_invalid_json(self): + """ + Check if processing a request with invalid JSON causes a ValueError to be raised. + """ + query = b'{"id_":' + b'\x31' * 200 + b'}' + with self.assertRaises(ValueError): + parameters = self.overlay(0).parse_parameters(query) + await self.overlay(0).process_rpc_query(parameters) + + async def test_process_rpc_query_match_invalid_key(self): + """ + Check if processing a request with invalid flags causes a UnicodeDecodeError to be raised. + """ + with self.assertRaises(TypeError): + parameters = self.overlay(0).parse_parameters(b'{"bla":":("}') + await self.overlay(0).process_rpc_query(parameters) + + async def test_process_rpc_query_no_column(self): + """ + Check if processing a request with no database columns causes an OperationalError. + """ + with self.assertRaises(OperationalError): + parameters = self.overlay(0).parse_parameters(b'{"txt_filter":{"key":"bla"}}') + await self.overlay(0).process_rpc_query(parameters) + + async def test_remote_query_big_response(self): + value = os.urandom(10000) + with db_session: + add_random_torrent(self.metadata_store(1).TorrentMetadata, name=hexlify(value).decode()) + + kwargs_dict = {"metadata_type": [CHANNEL_THUMBNAIL]} + callback = Mock() + self.overlay(0).send_remote_select(self.peer(1), **kwargs_dict, processing_callback=callback) + + await self.deliver_messages(timeout=0.5) + # Test optional response processing callback + callback.assert_called() + + # All the matching torrent entries should have been sent to Node 1 + with db_session: + torrents0 = self.metadata_store(0).get_entries(**kwargs_dict) + torrents1 = self.metadata_store(1).get_entries(**kwargs_dict) + self.assertEqual(len(torrents0), len(torrents1)) + + async def test_drop_silent_peer(self): + kwargs_dict = {"txt_filter": "ubuntu*"} + + with self.overlay(1).request_cache.passthrough(): + # Stop peer 0 from responding + self.network(1).remove_peer = Mock() + self.overlay(1).send_remote_select(self.nodes[0].my_peer, **kwargs_dict) + await asyncio.sleep(0.0) + # node 0 must have called remove_peer because of the timeout + self.network(1).remove_peer.assert_called() + + async def test_dont_drop_silent_peer_on_empty_response(self): + # Test that even in the case of an empty response packet, remove_peer is not called on timeout + + was_called = [] + + async def mock_on_remote_select_response(*_, **__): + was_called.append(True) + return [] + + kwargs_dict = {"txt_filter": "ubuntu*"} + self.network(1).remove_peer = Mock() + self.metadata_store(1).process_compressed_mdblob_threaded = mock_on_remote_select_response + self.overlay(1).send_remote_select(self.peer(0), **kwargs_dict) + await self.deliver_messages() + assert was_called # Positive check to prevent always passing test + self.network(1).remove_peer.assert_not_called() + + async def test_remote_select_force_eva(self): + """ + Test requesting usage of EVA for sending multiple smaller entries. + """ + with db_session: + for _ in range(0, 10): + add_random_torrent(self.torrent_metadata(1), name=hexlify(os.urandom(250)).decode()) + + kwargs_dict = {"metadata_type": [REGULAR_TORRENT]} + + callback = AsyncMock() + self.overlay(0).composition.metadata_store.process_compressed_mdblob_threaded = callback + + self.overlay(0).send_remote_select(self.peer(1), **kwargs_dict, force_eva_response=True) + + await self.deliver_messages() + + callback.assert_called() + + async def test_remote_select_force_eva_error(self): + """ + Test handling of EVA errors. + """ + with db_session: + for _ in range(0, 10): + add_random_torrent(self.torrent_metadata(1), name=hexlify(os.urandom(250)).decode()) + + kwargs_dict = {"metadata_type": [REGULAR_TORRENT]} + + callback = AsyncMock() + self.overlay(0).composition.metadata_store.process_compressed_mdblob_threaded = callback + await self.overlay(0).eva.shutdown() + + self.overlay(0).send_remote_select(self.peer(1), **kwargs_dict, force_eva_response=True) + + await self.deliver_messages() + + assert not callback.called + + async def test_multiple_parallel_request(self): + # Peer A has two torrents "foo" and "bar" + with db_session: + add_random_torrent(self.torrent_metadata(0), name="foo") + add_random_torrent(self.torrent_metadata(0), name="bar") + + # Peer B sends two parallel full-text search queries, only one of them should be processed + callback1 = Mock() + kwargs1 = {"txt_filter": "foo", "metadata_type": [REGULAR_TORRENT]} + self.overlay(1).send_remote_select(self.peer(0), **kwargs1, processing_callback=callback1) + + callback2 = Mock() + kwargs2 = {"txt_filter": "bar", "metadata_type": [REGULAR_TORRENT]} + self.overlay(1).send_remote_select(self.peer(0), **kwargs2, processing_callback=callback2) + + original_get_entries = MetadataStore.get_entries + + # Add a delay to ensure that the first query is still being processed when the second one arrives + # (the mds.get_entries() method is a synchronous one and is called from a worker thread) + + def slow_get_entries(self, *args, **kwargs): + time.sleep(0.1) + return original_get_entries(self, *args, **kwargs) + + with patch.object(self.overlay(0), 'logger') as logger,\ + patch.object(MetadataStore, 'get_entries', slow_get_entries): + await self.deliver_messages(timeout=0.5) + + torrents1 = list(self.metadata_store(1).get_entries(**kwargs1)) + torrents2 = list(self.metadata_store(1).get_entries(**kwargs2)) + + # Both remote queries should return results to the peer B... + assert callback1.called and callback2.called + # ...but one of them should return an empty list, as the database query was not actually executed + assert bool(torrents1) != bool(torrents2) + + # Check that on peer A there is exactly one warning about an ignored remote query + warnings = [call.args[0] for call in logger.warning.call_args_list] + assert len([msg for msg in warnings if msg.startswith('Ignore remote query')]) == 1 + + async def test_ping(self): + """ + Test if the keep-alive message works. + """ + with self.assertReceivedBy(1, [IntroductionRequestPayload]): + self.overlay(0).send_ping(self.peer(1)) + await self.deliver_messages() + + async def test_deprecated_popular_torrents_request_no_live(self): + """ + The new protocol no longer uses PopularTorrentsRequest but still supports it. + """ + with self.assertReceivedBy(0, [TorrentsHealthPayload]) as received: + self.overlay(0).ez_send(self.peer(1), PopularTorrentsRequest()) + await self.deliver_messages() + message, = received + + assert message.random_torrents_length == 0 + assert message.torrents_checked_length == 0 + assert message.random_torrents == [] + assert message.torrents_checked == [] + + + async def test_deprecated_popular_torrents_request_live(self): + """ + The new protocol no longer uses PopularTorrentsRequest but still supports it. + """ + checked_torrent_info = HealthInfo(b'0' * 20, seeders=200, leechers=0) + self.torrent_checker(1).torrents_checked[checked_torrent_info.infohash] = checked_torrent_info + + with self.assertReceivedBy(0, [TorrentsHealthPayload], message_filter=[TorrentsHealthPayload]) as received: + self.overlay(0).ez_send(self.peer(1), PopularTorrentsRequest()) + await self.deliver_messages() + message, = received + + assert message.random_torrents_length == 0 + assert message.torrents_checked_length == 1 + assert message.random_torrents == [] + assert message.torrents_checked[0] == (b'00000000000000000000', 200, 0, message.torrents_checked[0][3]) \ No newline at end of file diff --git a/src/tribler/core/components/content_discovery/content_discovery_component.py b/src/tribler/core/components/content_discovery/content_discovery_component.py new file mode 100644 index 00000000000..d9a9d22728e --- /dev/null +++ b/src/tribler/core/components/content_discovery/content_discovery_component.py @@ -0,0 +1,39 @@ +from ipv8.peerdiscovery.churn import RandomChurn +from tribler.core.components.component import Component +from tribler.core.components.content_discovery.community.content_discovery_community import ContentDiscoveryCommunity +from tribler.core.components.ipv8.ipv8_component import INFINITE, Ipv8Component +from tribler.core.components.metadata_store.metadata_store_component import MetadataStoreComponent +from tribler.core.components.reporter.reporter_component import ReporterComponent +from tribler.core.components.torrent_checker.torrent_checker_component import TorrentCheckerComponent + + +class ContentDiscoveryComponent(Component): + community: ContentDiscoveryCommunity = None + + _ipv8_component: Ipv8Component = None + + async def run(self): + await super().run() + await self.get_component(ReporterComponent) + + self._ipv8_component = await self.require_component(Ipv8Component) + metadata_store_component = await self.require_component(MetadataStoreComponent) + torrent_checker_component = await self.require_component(TorrentCheckerComponent) + + self.community = ContentDiscoveryCommunity(ContentDiscoveryCommunity.settings_class( + my_peer = self._ipv8_component.peer, + endpoint = self._ipv8_component.ipv8.endpoint, + network = self._ipv8_component.ipv8.network, + maximum_payload_size = self.session.config.content_discovery_community.maximum_payload_size, + metadata_store=metadata_store_component.mds, + torrent_checker=torrent_checker_component.torrent_checker, + notifier=self.session.notifier + )) + + self._ipv8_component.initialise_community_by_default(self.community, default_random_walk_max_peers=30) + self._ipv8_component.ipv8.add_strategy(self.community, RandomChurn(self.community), INFINITE) + + async def shutdown(self): + await super().shutdown() + if self._ipv8_component and self.community: + await self._ipv8_component.unload_community(self.community) diff --git a/src/tribler/core/components/content_discovery/settings.py b/src/tribler/core/components/content_discovery/settings.py new file mode 100644 index 00000000000..e5f980e7ae7 --- /dev/null +++ b/src/tribler/core/components/content_discovery/settings.py @@ -0,0 +1,9 @@ +from pydantic import Field + +from tribler.core.config.tribler_config_section import TriblerConfigSection + + +class ContentDiscoveryComponentConfig(TriblerConfigSection): + enabled: bool = True + testnet: bool = Field(default=False, env='CHANT_TESTNET') + maximum_payload_size: int = 1300 diff --git a/src/tribler/core/components/popularity/tests/test_popularity_component.py b/src/tribler/core/components/content_discovery/tests/test_content_discovery_component.py similarity index 80% rename from src/tribler/core/components/popularity/tests/test_popularity_component.py rename to src/tribler/core/components/content_discovery/tests/test_content_discovery_component.py index 64f7309699d..5afb0b4bee8 100644 --- a/src/tribler/core/components/popularity/tests/test_popularity_component.py +++ b/src/tribler/core/components/content_discovery/tests/test_content_discovery_component.py @@ -4,7 +4,7 @@ from tribler.core.components.knowledge.knowledge_component import KnowledgeComponent from tribler.core.components.libtorrent.libtorrent_component import LibtorrentComponent from tribler.core.components.metadata_store.metadata_store_component import MetadataStoreComponent -from tribler.core.components.popularity.popularity_component import PopularityComponent +from tribler.core.components.content_discovery.content_discovery_component import ContentDiscoveryComponent from tribler.core.components.session import Session from tribler.core.components.socks_servers.socks_servers_component import SocksServersComponent from tribler.core.components.torrent_checker.torrent_checker_component import TorrentCheckerComponent @@ -13,11 +13,11 @@ # pylint: disable=protected-access -async def test_popularity_component(tribler_config): +async def test_content_discovery_component(tribler_config): components = [DatabaseComponent(), SocksServersComponent(), LibtorrentComponent(), TorrentCheckerComponent(), KnowledgeComponent(), MetadataStoreComponent(), KeyComponent(), Ipv8Component(), - PopularityComponent()] + ContentDiscoveryComponent()] async with Session(tribler_config, components) as session: - comp = session.get_instance(PopularityComponent) + comp = session.get_instance(ContentDiscoveryComponent) assert comp.community assert comp._ipv8_component diff --git a/src/tribler/core/components/metadata_store/remote_query_community/remote_query_community.py b/src/tribler/core/components/metadata_store/remote_query_community/remote_query_community.py deleted file mode 100644 index 0128765043c..00000000000 --- a/src/tribler/core/components/metadata_store/remote_query_community/remote_query_community.py +++ /dev/null @@ -1,339 +0,0 @@ -import json -import logging -import struct -import time -from asyncio import Future -from binascii import unhexlify -from itertools import count -from typing import Any, Dict, List, Optional, Set - -from ipv8.lazy_community import lazy_wrapper -from ipv8.messaging.lazy_payload import VariablePayload, vp_compile -from ipv8.requestcache import NumberCache, RandomNumberCache, RequestCache -from pony.orm import db_session -from pony.orm.dbapiprovider import OperationalError - -from tribler.core.components.database.db.layers.knowledge_data_access_layer import ResourceType -from tribler.core.components.ipv8.eva.protocol import EVAProtocol -from tribler.core.components.ipv8.eva.result import TransferResult -from tribler.core.components.ipv8.tribler_community import TriblerCommunity -from tribler.core.components.knowledge.community.knowledge_validator import is_valid_resource -from tribler.core.components.metadata_store.db.orm_bindings.torrent_metadata import LZ4_EMPTY_ARCHIVE, entries_to_chunk -from tribler.core.components.metadata_store.db.store import MetadataStore -from tribler.core.components.metadata_store.remote_query_community.settings import RemoteQueryCommunitySettings -from tribler.core.components.metadata_store.utils import RequestTimeoutException -from tribler.core.utilities.pony_utils import run_threaded -from tribler.core.utilities.unicode import hexlify - -BINARY_FIELDS = ("infohash", "channel_pk") -DEPRECATED_PARAMETERS = ['subscribed', 'attribute_ranges', 'complete_channel'] - - -def sanitize_query(query_dict: Dict[str, Any], cap=100) -> Dict[str, Any]: - sanitized_dict = dict(query_dict) - - # We impose a cap on max numbers of returned entries to prevent DDOS-like attacks - first = sanitized_dict.get("first", None) - last = sanitized_dict.get("last", None) - first = first or 0 - last = last if (last is not None and last <= (first + cap)) else (first + cap) - sanitized_dict.update({"first": first, "last": last}) - - # convert hex fields to binary - for field in BINARY_FIELDS: - value = sanitized_dict.get(field) - if value is not None: - sanitized_dict[field] = unhexlify(value) - - return sanitized_dict - - -def convert_to_json(parameters): - sanitized = dict(parameters) - # Convert frozenset to string - if "metadata_type" in sanitized: - sanitized["metadata_type"] = [int(mt) for mt in sanitized["metadata_type"] if mt] - - for field in BINARY_FIELDS: - value = parameters.get(field) - if value is not None: - sanitized[field] = hexlify(value) - - if "origin_id" in parameters: - sanitized["origin_id"] = int(parameters["origin_id"]) - - return json.dumps(sanitized) - - -@vp_compile -class RemoteSelectPayload(VariablePayload): - msg_id = 201 - format_list = ['I', 'varlenH'] - names = ['id', 'json'] - - -@vp_compile -class RemoteSelectPayloadEva(RemoteSelectPayload): - msg_id = 209 - - -@vp_compile -class SelectResponsePayload(VariablePayload): - msg_id = 202 - format_list = ['I', 'raw'] - names = ['id', 'raw_blob'] - - -class SelectRequest(RandomNumberCache): - def __init__(self, request_cache, prefix, request_kwargs, peer, processing_callback=None, timeout_callback=None): - super().__init__(request_cache, prefix) - self.request_kwargs = request_kwargs - # The callback to call on results of processing of the response payload - self.processing_callback = processing_callback - # The maximum number of packets to receive from any given peer from a single request. - # This limit is imposed as a safety precaution to prevent spam/flooding - self.packets_limit = 10 - - self.peer = peer - # Indicate if at least a single packet was returned by the queried peer. - self.peer_responded = False - - self.timeout_callback = timeout_callback - - def on_timeout(self): - if self.timeout_callback is not None: - self.timeout_callback(self) - - -class EvaSelectRequest(SelectRequest): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - # For EVA transfer it is meaningless to send more than one message - self.packets_limit = 1 - - self.processing_results = Future() - self.register_future(self.processing_results, on_timeout=RequestTimeoutException()) - - -class PushbackWindow(NumberCache): - def __init__(self, request_cache, prefix, original_request_id): - super().__init__(request_cache, prefix, original_request_id) - - # The maximum number of packets to receive from any given peer from a single request. - # This limit is imposed as a safety precaution to prevent spam/flooding - self.packets_limit = 10 - - def on_timeout(self): - pass - - -class RemoteQueryCommunity(TriblerCommunity): - """ - Community for general purpose SELECT-like queries into remote Channels database - """ - - def __init__(self, my_peer, endpoint, network, - rqc_settings: RemoteQueryCommunitySettings = None, - metadata_store=None, - tribler_db=None, - **kwargs): - super().__init__(my_peer, endpoint, network=network, **kwargs) - - self.rqc_settings = rqc_settings - self.mds: MetadataStore = metadata_store - self.tribler_db = tribler_db - # This object stores requests for "select" queries that we sent to other hosts. - # We keep track of peers we actually requested for data so people can't randomly push spam at us. - # Also, this keeps track of hosts we responded to. There is a possibility that - # those hosts will push back updates at us, so we need to allow it. - self.request_cache = RequestCache() - - self.add_message_handler(RemoteSelectPayload, self.on_remote_select) - self.add_message_handler(RemoteSelectPayloadEva, self.on_remote_select_eva) - self.add_message_handler(SelectResponsePayload, self.on_remote_select_response) - - self.eva = EVAProtocol(self, self.on_receive, self.on_send_complete, self.on_error) - self.remote_queries_in_progress = 0 - self.next_remote_query_num = count().__next__ # generator of sequential numbers, for logging & debug purposes - - async def on_receive(self, result: TransferResult): - self.logger.debug(f"EVA data received: peer {hexlify(result.peer.mid)}, info {result.info}") - packet = (result.peer.address, result.data) - self.on_packet(packet) - - async def on_send_complete(self, result: TransferResult): - self.logger.debug(f"EVA outgoing transfer complete: peer {hexlify(result.peer.mid)}, info {result.info}") - - async def on_error(self, peer, exception): - self.logger.warning(f"EVA transfer error:{exception.__class__.__name__}:{exception}, Peer: {hexlify(peer.mid)}") - - def send_remote_select(self, peer, processing_callback=None, force_eva_response=False, **kwargs): - request_class = EvaSelectRequest if force_eva_response else SelectRequest - request = request_class( - self.request_cache, - hexlify(peer.mid), - kwargs, - peer, - processing_callback=processing_callback, - timeout_callback=self._on_query_timeout, - ) - self.request_cache.add(request) - - self.logger.debug(f"Select to {hexlify(peer.mid)} with ({kwargs})") - args = (request.number, convert_to_json(kwargs).encode('utf8')) - if force_eva_response: - self.ez_send(peer, RemoteSelectPayloadEva(*args)) - else: - self.ez_send(peer, RemoteSelectPayload(*args)) - return request - - def should_limit_rate_for_query(self, sanitized_parameters: Dict[str, Any]) -> bool: - return 'txt_filter' in sanitized_parameters - - async def process_rpc_query_rate_limited(self, sanitized_parameters: Dict[str, Any]) -> List: - query_num = self.next_remote_query_num() - if self.remote_queries_in_progress and self.should_limit_rate_for_query(sanitized_parameters): - self.logger.warning(f'Ignore remote query {query_num} as another one is already processing. ' - f'The ignored query: {sanitized_parameters}') - return [] - - self.logger.info(f'Process remote query {query_num}: {sanitized_parameters}') - self.remote_queries_in_progress += 1 - t = time.time() - try: - return await self.process_rpc_query(sanitized_parameters) - finally: - self.remote_queries_in_progress -= 1 - self.logger.info(f'Remote query {query_num} processed in {time.time() - t} seconds: {sanitized_parameters}') - - async def process_rpc_query(self, sanitized_parameters: Dict[str, Any]) -> List: - """ - Retrieve the result of a database query from a third party, encoded as raw JSON bytes (through `dumps`). - :raises TypeError: if the JSON contains invalid keys. - :raises ValueError: if no JSON could be decoded. - :raises pony.orm.dbapiprovider.OperationalError: if an illegal query was performed. - """ - if self.tribler_db: - # tags should be extracted because `get_entries_threaded` doesn't expect them as a parameter - tags = sanitized_parameters.pop('tags', None) - - infohash_set = await run_threaded(self.tribler_db.instance, self.search_for_tags, tags) - if infohash_set: - sanitized_parameters['infohash_set'] = {bytes.fromhex(s) for s in infohash_set} - - return await self.mds.get_entries_threaded(**sanitized_parameters) - - @db_session - def search_for_tags(self, tags: Optional[List[str]]) -> Optional[Set[str]]: - if not tags or not self.tribler_db: - return None - valid_tags = {tag for tag in tags if is_valid_resource(tag)} - result = self.tribler_db.knowledge.get_subjects_intersection( - subjects_type=ResourceType.TORRENT, - objects=valid_tags, - predicate=ResourceType.TAG, - case_sensitive=False - ) - return result - - def send_db_results(self, peer, request_payload_id, db_results, force_eva_response=False): - - # Special case of empty results list - sending empty lz4 archive - if len(db_results) == 0: - self.ez_send(peer, SelectResponsePayload(request_payload_id, LZ4_EMPTY_ARCHIVE)) - return - - index = 0 - while index < len(db_results): - transfer_size = ( - self.eva.settings.binary_size_limit if force_eva_response else self.rqc_settings.maximum_payload_size - ) - data, index = entries_to_chunk(db_results, transfer_size, start_index=index, include_health=True) - payload = SelectResponsePayload(request_payload_id, data) - if force_eva_response or (len(data) > self.rqc_settings.maximum_payload_size): - self.eva.send_binary(peer, struct.pack('>i', request_payload_id), - self.ezr_pack(payload.msg_id, payload)) - else: - self.ez_send(peer, payload) - - @lazy_wrapper(RemoteSelectPayloadEva) - async def on_remote_select_eva(self, peer, request_payload): - await self._on_remote_select_basic(peer, request_payload, force_eva_response=True) - - @lazy_wrapper(RemoteSelectPayload) - async def on_remote_select(self, peer, request_payload): - await self._on_remote_select_basic(peer, request_payload) - - def parse_parameters(self, json_bytes: bytes) -> Dict[str, Any]: - parameters = json.loads(json_bytes) - return sanitize_query(parameters, self.rqc_settings.max_response_size) - - async def _on_remote_select_basic(self, peer, request_payload, force_eva_response=False): - try: - sanitized_parameters = self.parse_parameters(request_payload.json) - # Drop selects with deprecated queries - if any(param in sanitized_parameters for param in DEPRECATED_PARAMETERS): - self.logger.warning(f"Remote select with deprecated parameters: {sanitized_parameters}") - self.ez_send(peer, SelectResponsePayload(request_payload.id, LZ4_EMPTY_ARCHIVE)) - return - db_results = await self.process_rpc_query_rate_limited(sanitized_parameters) - - # When we send our response to a host, we open a window of opportunity - # for it to push back updates - if db_results and not self.request_cache.has(hexlify(peer.mid), request_payload.id): - self.request_cache.add(PushbackWindow(self.request_cache, hexlify(peer.mid), request_payload.id)) - - self.send_db_results(peer, request_payload.id, db_results, force_eva_response) - except (OperationalError, TypeError, ValueError) as error: - self.logger.error(f"Remote select. The error occurred: {error}") - - @lazy_wrapper(SelectResponsePayload) - async def on_remote_select_response(self, peer, response_payload): - """ - Match the response that we received from the network to a query cache - and process it by adding the corresponding entries to the MetadataStore database. - This processes both direct responses and pushback (updates) responses - """ - self.logger.debug(f"Response from {hexlify(peer.mid)}") - - # ACHTUNG! the returned request cache can be any one of SelectRequest, PushbackWindow - request = self.request_cache.get(hexlify(peer.mid), response_payload.id) - if request is None: - return - - # Check for limit on the number of packets per request - if request.packets_limit > 1: - request.packets_limit -= 1 - else: - self.request_cache.pop(hexlify(peer.mid), response_payload.id) - - processing_results = await self.mds.process_compressed_mdblob_threaded(response_payload.raw_blob) - self.logger.debug(f"Response result: {processing_results}") - - if isinstance(request, EvaSelectRequest) and not request.processing_results.done(): - request.processing_results.set_result(processing_results) - - if isinstance(request, SelectRequest) and request.processing_callback: - request.processing_callback(request, processing_results) - - # Remember that at least a single packet was received was received from the queried peer. - if isinstance(request, SelectRequest): - request.peer_responded = True - - return processing_results - - def _on_query_timeout(self, request_cache): - if not request_cache.peer_responded: - self.logger.debug( - "Remote query timeout, deleting peer: %s %s %s", - str(request_cache.peer.address), - hexlify(request_cache.peer.mid), - str(request_cache.request_kwargs), - ) - self.network.remove_peer(request_cache.peer) - - async def unload(self): - await self.eva.shutdown() - await self.request_cache.shutdown() - await super().unload() diff --git a/src/tribler/core/components/metadata_store/remote_query_community/settings.py b/src/tribler/core/components/metadata_store/remote_query_community/settings.py deleted file mode 100644 index ae1c1e49dfa..00000000000 --- a/src/tribler/core/components/metadata_store/remote_query_community/settings.py +++ /dev/null @@ -1,19 +0,0 @@ -from tribler.core.config.tribler_config_section import TriblerConfigSection - - -class RemoteQueryCommunitySettings(TriblerConfigSection): - minimal_blob_size: int = 200 - maximum_payload_size: int = 1300 - max_entries: int = maximum_payload_size // minimal_blob_size - - # The next option is currently used by GigaChannelCommunity only. We probably should move it to the - # GigaChannelCommunity settings or to a dedicated search-related section. The value of the option is corresponding - # with the TARGET_PEERS_NUMBER of src/tribler/core/components/gigachannel/community/sync_strategy.py, that is, to - # the number of peers that GigaChannelCommunity will have after a long run (initially, the number of peers in - # GigaChannelCommunity can rise up to several hundred due to DiscoveryBooster). The number of parallel remote - # requests should be not too small (to have various results from remote peers) and not too big (to avoid flooding - # the network with exceedingly high number of queries). TARGET_PEERS_NUMBER looks like a good middle ground here. - max_query_peers: int = 20 - - max_response_size: int = 100 # Max number of entries returned by SQL query - push_updates_back_enabled = True diff --git a/src/tribler/core/components/metadata_store/remote_query_community/tests/test_remote_query_community.py b/src/tribler/core/components/metadata_store/remote_query_community/tests/test_remote_query_community.py deleted file mode 100644 index a174eccaf15..00000000000 --- a/src/tribler/core/components/metadata_store/remote_query_community/tests/test_remote_query_community.py +++ /dev/null @@ -1,433 +0,0 @@ -import asyncio -import random -import string -import time -from asyncio import sleep -from binascii import hexlify, unhexlify -from operator import attrgetter -from os import urandom -from unittest.mock import Mock, patch - -import pytest -from ipv8.keyvault.crypto import default_eccrypto -from pony.orm import db_session -from pony.orm.dbapiprovider import OperationalError - -from tribler.core.components.ipv8.adapters_tests import TriblerTestBase -from tribler.core.components.metadata_store.db.orm_bindings.torrent_metadata import NEW, LZ4_EMPTY_ARCHIVE -from tribler.core.components.metadata_store.db.serialization import CHANNEL_THUMBNAIL, REGULAR_TORRENT, \ - NULL_KEY -from tribler.core.components.metadata_store.db.store import MetadataStore -from tribler.core.components.metadata_store.remote_query_community.remote_query_community import ( - RemoteQueryCommunity, - sanitize_query, SelectResponsePayload, -) -from tribler.core.components.metadata_store.remote_query_community.settings import RemoteQueryCommunitySettings -from tribler.core.utilities.path_util import Path -from tribler.core.utilities.unicode import hexlify -from tribler.core.utilities.utilities import random_infohash - - -# pylint: disable=protected-access - - -def random_string(): - return ''.join(random.choices(string.ascii_uppercase + string.digits, k=100)) - - -def add_random_torrent(metadata_cls, name="test", seeders=None, leechers=None, last_check=None): - d = {"infohash": random_infohash(), "public_key": NULL_KEY, "title": name, "tags": "", "size": 1234, "status": NEW} - torrent_metadata = metadata_cls.from_dict(d) - if seeders: - torrent_metadata.health.seeders = seeders - if leechers: - torrent_metadata.health.leechers = leechers - if last_check: - torrent_metadata.health.last_check = last_check - return torrent_metadata - - -class BasicRemoteQueryCommunity(RemoteQueryCommunity): - community_id = unhexlify('eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee') - - -class TestRemoteQueryCommunity(TriblerTestBase): - """ - Unit tests for the base RemoteQueryCommunity which do not need a real Session. - """ - - def __init__(self, methodName='runTest'): - random.seed(123) - super().__init__(methodName) - - def setUp(self): - random.seed(456) - super().setUp() - self.count = 0 - self.metadata_store_set = set() - self.initialize(BasicRemoteQueryCommunity, 2) - - async def tearDown(self): - for metadata_store in self.metadata_store_set: - metadata_store.shutdown() - await super().tearDown() - - def create_node(self, *args, **kwargs): - metadata_store = MetadataStore( - Path(self.temporary_directory()) / f"{self.count}.db", - Path(self.temporary_directory()), - default_eccrypto.generate_key("curve25519"), - disable_sync=True, - ) - self.metadata_store_set.add(metadata_store) - kwargs['metadata_store'] = metadata_store - kwargs['rqc_settings'] = RemoteQueryCommunitySettings() - node = super().create_node(*args, **kwargs) - self.count += 1 - return node - - def torrent_metadata(self, i): - return self.nodes[i].overlay.mds.TorrentMetadata - - async def test_remote_select(self): - """ - Test querying metadata entries from a remote machine - """ - mds0 = self.nodes[0].overlay.mds - mds1 = self.nodes[1].overlay.mds - - # Fill Node 0 DB with channels and torrents - with db_session: - for i in range(20): - add_random_torrent( - mds0.TorrentMetadata, - name=f"ubuntu {i}", - seeders=2 * i, - leechers=i, - last_check=int(time.time()) + i, - ) - - kwargs_dict = {"txt_filter": "ubuntu*", "metadata_type": [REGULAR_TORRENT]} - callback = Mock() - self.nodes[1].overlay.send_remote_select(self.nodes[0].my_peer, **kwargs_dict, processing_callback=callback) - - await self.deliver_messages(timeout=0.5) - # Test optional response processing callback - callback.assert_called() - - # All the matching torrent entries should have been sent to Node 1 - with db_session: - torrents0 = sorted(mds0.get_entries(**kwargs_dict), key=attrgetter('infohash')) - torrents1 = sorted(mds1.get_entries(**kwargs_dict), key=attrgetter('infohash')) - self.assertEqual(len(torrents0), len(torrents1)) - self.assertEqual(len(torrents0), 20) - for t0, t1 in zip(torrents0, torrents1): - assert t0.health.seeders == t1.health.seeders - assert t0.health.leechers == t1.health.leechers - assert t0.health.last_check == t1.health.last_check - - # Test getting empty response for a query - kwargs_dict = {"txt_filter": "ubuntu*", "origin_id": 352127} - callback = Mock() - self.nodes[1].overlay.send_remote_select(self.nodes[0].my_peer, **kwargs_dict, processing_callback=callback) - await self.deliver_messages(timeout=0.5) - callback.assert_called() - - async def test_remote_select_deprecated(self): - """ - Test deprecated search keys receiving an empty archive response. - """ - with self.assertReceivedBy(0, [SelectResponsePayload]) as responses: - self.overlay(0).send_remote_select(self.peer(1), subscribed=1) - await self.deliver_messages() - response, = responses - - assert response.raw_blob == LZ4_EMPTY_ARCHIVE - - async def test_push_entry_update(self): - """ - Test if sending back information on updated version of a metadata entry works - """ - - @pytest.mark.timeout(10) - async def test_remote_select_torrents(self): - """ - Test dropping packets that go over the response limit for a remote select. - - """ - peer = self.nodes[0].my_peer - mds0 = self.nodes[0].overlay.mds - mds1 = self.nodes[1].overlay.mds - - with db_session: - torrent_infohash = random_infohash() - mds0.TorrentMetadata(infohash=torrent_infohash, public_key=NULL_KEY, title='title1') - - callback_called = asyncio.Event() - processing_results = [] - - def callback(_, results): - processing_results.extend(results) - callback_called.set() - - self.nodes[1].overlay.send_remote_select( - peer, metadata_type=[REGULAR_TORRENT], infohash=torrent_infohash, processing_callback=callback - ) - - await callback_called.wait() - - assert len(processing_results) == 1 - obj = processing_results[0].md_obj - assert isinstance(obj, mds1.TorrentMetadata) - assert obj.title == 'title1' - assert obj.health.seeders == 0 - - - async def test_remote_select_packets_limit(self): - """ - Test dropping packets that go over the response limit for a remote select. - - """ - mds0 = self.nodes[0].overlay.mds - mds1 = self.nodes[1].overlay.mds - - with db_session: - for _ in range(0, 100): - md = add_random_torrent(mds0.TorrentMetadata, name=random_string()) - key = default_eccrypto.generate_key("curve25519") - md.public_key = key.pub().key_to_bin()[10:] - md.signature = md.serialized(key)[-64:] - - peer = self.nodes[0].my_peer - kwargs_dict = {"metadata_type": [REGULAR_TORRENT]} - self.nodes[1].overlay.send_remote_select(peer, **kwargs_dict) - # There should be an outstanding request in the list - self.assertTrue(self.nodes[1].overlay.request_cache._identifiers) # pylint: disable=protected-access - - await self.deliver_messages(timeout=1.5) - - with db_session: - received_torrents = list(mds1.TorrentMetadata.select()) - # We should receive less than 6 packets, so all the channels should not fit there. - received_torrents_count = len(received_torrents) - assert 40 <= received_torrents_count < 60 - - # The list of outstanding requests should be empty - self.assertFalse(self.nodes[1].overlay.request_cache._identifiers) # pylint: disable=protected-access - - def test_sanitize_query(self): - req_response_list = [ - ({"first": None, "last": None}, {"first": 0, "last": 100}), - ({"first": 123, "last": None}, {"first": 123, "last": 223}), - ({"first": None, "last": 1000}, {"first": 0, "last": 100}), - ({"first": 100, "last": None}, {"first": 100, "last": 200}), - ({"first": 123}, {"first": 123, "last": 223}), - ({"last": 123}, {"first": 0, "last": 100}), - ({}, {"first": 0, "last": 100}), - ] - for req, resp in req_response_list: - assert sanitize_query(req) == resp - - def test_sanitize_query_binary_fields(self): - for field in ("infohash", "channel_pk"): - field_in_b = b'0' * 20 - field_in_hex = hexlify(field_in_b) - assert sanitize_query({field: field_in_hex})[field] == field_in_b - - async def test_unknown_query_attribute(self): - rqc_node1 = self.nodes[0].overlay - rqc_node2 = self.nodes[1].overlay - - # only the new attribute - rqc_node2.send_remote_select(rqc_node1.my_peer, **{'new_attribute': 'some_value'}) - await self.deliver_messages(timeout=0.1) - - # mixed: the old and a new attribute - rqc_node2.send_remote_select(rqc_node1.my_peer, **{'infohash': b'0' * 20, 'foo': 'bar'}) - await self.deliver_messages(timeout=0.1) - - async def test_process_rpc_query_match_many(self): - """ - Check if a correct query with a match in our database returns a result. - """ - with db_session: - add_random_torrent(self.torrent_metadata(0), name="torrent1") - add_random_torrent(self.torrent_metadata(0), name="torrent2") - - results = await self.overlay(0).process_rpc_query({}) - self.assertEqual(2, len(results)) - - torrent1_md, torrent2_md = results if results[0].title == "torrent1" else results[::-1] - self.assertEqual("torrent1", torrent1_md.title) - self.assertEqual("torrent2", torrent2_md.title) - - async def test_process_rpc_query_match_one(self): - """ - Check if a correct query with one match in our database returns one result. - """ - with db_session: - add_random_torrent(self.torrent_metadata(0), name="a torrent") - - results = await self.overlay(0).process_rpc_query({}) - self.assertEqual(1, len(results)) - - (torrent_md,) = results - self.assertEqual("a torrent", torrent_md.title) - - async def test_process_rpc_query_match_none(self): - """ - Check if a correct query with no match in our database returns no result. - """ - results = await self.overlay(0).process_rpc_query({}) - self.assertEqual(0, len(results)) - - def test_parse_parameters_match_empty_json(self): - """ - Check if processing an empty request causes a ValueError (JSONDecodeError) to be raised. - """ - with self.assertRaises(ValueError): - self.overlay(0).parse_parameters(b'') - - def test_parse_parameters_match_illegal_json(self): - """ - Check if processing a request with illegal JSON causes a UnicodeDecodeError to be raised. - """ - with self.assertRaises(UnicodeDecodeError): - self.overlay(0).parse_parameters(b'{"akey":\x80}') - - async def test_process_rpc_query_match_invalid_json(self): - """ - Check if processing a request with invalid JSON causes a ValueError to be raised. - """ - query = b'{"id_":' + b'\x31' * 200 + b'}' - with self.assertRaises(ValueError): - parameters = self.overlay(0).parse_parameters(query) - await self.overlay(0).process_rpc_query(parameters) - - async def test_process_rpc_query_match_invalid_key(self): - """ - Check if processing a request with invalid flags causes a UnicodeDecodeError to be raised. - """ - with self.assertRaises(TypeError): - parameters = self.overlay(0).parse_parameters(b'{"bla":":("}') - await self.overlay(0).process_rpc_query(parameters) - - async def test_process_rpc_query_no_column(self): - """ - Check if processing a request with no database columns causes an OperationalError. - """ - with self.assertRaises(OperationalError): - parameters = self.overlay(0).parse_parameters(b'{"txt_filter":{"key":"bla"}}') - await self.overlay(0).process_rpc_query(parameters) - - async def test_remote_query_big_response(self): - - mds0 = self.nodes[0].overlay.mds - mds1 = self.nodes[1].overlay.mds - - value = urandom(10000) - with db_session: - add_random_torrent(mds1.TorrentMetadata, name=hexlify(value)) - - kwargs_dict = {"metadata_type": [CHANNEL_THUMBNAIL]} - callback = Mock() - self.nodes[0].overlay.send_remote_select(self.nodes[1].my_peer, **kwargs_dict, processing_callback=callback) - - await self.deliver_messages(timeout=0.5) - # Test optional response processing callback - callback.assert_called() - - # All the matching torrent entries should have been sent to Node 1 - with db_session: - torrents0 = mds0.get_entries(**kwargs_dict) - torrents1 = mds1.get_entries(**kwargs_dict) - self.assertEqual(len(torrents0), len(torrents1)) - - async def test_drop_silent_peer(self): - kwargs_dict = {"txt_filter": "ubuntu*"} - - basic_path = 'tribler.core.components.metadata_store.remote_query_community.remote_query_community' - - with self.overlay(1).request_cache.passthrough(): - # Stop peer 0 from responding - with patch(basic_path + '.RemoteQueryCommunity._on_remote_select_basic'): - self.nodes[1].overlay.network.remove_peer = Mock() - self.nodes[1].overlay.send_remote_select(self.nodes[0].my_peer, **kwargs_dict) - await sleep(0.0) - # node 0 must have called remove_peer because of the timeout - self.nodes[1].overlay.network.remove_peer.assert_called() - - async def test_dont_drop_silent_peer_on_empty_response(self): - # Test that even in the case of an empty response packet, remove_peer is not called on timeout - - was_called = [] - - async def mock_on_remote_select_response(*_, **__): - was_called.append(True) - return [] - - kwargs_dict = {"txt_filter": "ubuntu*"} - self.nodes[1].overlay.network.remove_peer = Mock() - self.nodes[1].overlay.mds.process_compressed_mdblob_threaded = mock_on_remote_select_response - self.nodes[1].overlay.send_remote_select(self.nodes[0].my_peer, **kwargs_dict) - await self.deliver_messages() - assert was_called # Positive check to prevent always passing test - self.nodes[1].overlay.network.remove_peer.assert_not_called() - - async def test_remote_select_force_eva(self): - # Test requesting usage of EVA for sending multiple smaller entries - with db_session: - for _ in range(0, 10): - add_random_torrent(self.nodes[1].overlay.mds.TorrentMetadata, name=hexlify(urandom(250))) - - kwargs_dict = {"metadata_type": [REGULAR_TORRENT]} - - self.nodes[1].overlay.eva.send_binary = Mock() - self.nodes[0].overlay.send_remote_select(self.nodes[1].my_peer, **kwargs_dict, force_eva_response=True) - - await self.deliver_messages(timeout=0.5) - - self.nodes[1].overlay.eva.send_binary.assert_called_once() - - async def test_multiple_parallel_request(self): - peer_a = self.nodes[0].my_peer - a = self.nodes[0].overlay - b = self.nodes[1].overlay - - # Peer A has two torrents "foo" and "bar" - with db_session: - add_random_torrent(a.mds.TorrentMetadata, name="foo") - add_random_torrent(a.mds.TorrentMetadata, name="bar") - - # Peer B sends two parallel full-text search queries, only one of them should be processed - callback1 = Mock() - kwargs1 = {"txt_filter": "foo", "metadata_type": [REGULAR_TORRENT]} - b.send_remote_select(peer_a, **kwargs1, processing_callback=callback1) - - callback2 = Mock() - kwargs2 = {"txt_filter": "bar", "metadata_type": [REGULAR_TORRENT]} - b.send_remote_select(peer_a, **kwargs2, processing_callback=callback2) - - original_get_entries = MetadataStore.get_entries - - # Add a delay to ensure that the first query is still being processed when the second one arrives - # (the mds.get_entries() method is a synchronous one and is called from a worker thread) - - def slow_get_entries(self, *args, **kwargs): - time.sleep(0.1) - return original_get_entries(self, *args, **kwargs) - - with patch.object(a, 'logger') as logger, patch.object(MetadataStore, 'get_entries', slow_get_entries): - await self.deliver_messages(timeout=0.5) - - torrents1 = list(b.mds.get_entries(**kwargs1)) - torrents2 = list(b.mds.get_entries(**kwargs2)) - - # Both remote queries should return results to the peer B... - assert callback1.called and callback2.called - # ...but one of them should return an empty list, as the database query was not actually executed - assert bool(torrents1) != bool(torrents2) - - # Check that on peer A there is exactly one warning about an ignored remote query - warnings = [call.args[0] for call in logger.warning.call_args_list] - assert len([msg for msg in warnings if msg.startswith('Ignore remote query')]) == 1 diff --git a/src/tribler/core/components/metadata_store/remote_query_community/tests/test_remote_search_by_tags.py b/src/tribler/core/components/metadata_store/remote_query_community/tests/test_remote_search_by_tags.py deleted file mode 100644 index ef55fbfd485..00000000000 --- a/src/tribler/core/components/metadata_store/remote_query_community/tests/test_remote_search_by_tags.py +++ /dev/null @@ -1,130 +0,0 @@ -import os -from unittest.mock import AsyncMock, Mock, PropertyMock, patch - -from ipv8.keyvault.crypto import default_eccrypto -from pony.orm import db_session - -from tribler.core.components.database.db.layers.knowledge_data_access_layer import KnowledgeDataAccessLayer, \ - ResourceType, SHOW_THRESHOLD -from tribler.core.components.database.db.layers.tests.test_knowledge_data_access_layer_base import Resource, \ - TestKnowledgeAccessLayerBase -from tribler.core.components.database.db.tribler_database import TriblerDatabase -from tribler.core.components.ipv8.adapters_tests import TriblerTestBase -from tribler.core.components.metadata_store.db.orm_bindings.torrent_metadata import NEW -from tribler.core.components.metadata_store.db.store import MetadataStore -from tribler.core.components.metadata_store.remote_query_community.remote_query_community import RemoteQueryCommunity -from tribler.core.components.metadata_store.remote_query_community.settings import RemoteQueryCommunitySettings -from tribler.core.components.metadata_store.remote_query_community.tests.test_remote_query_community import ( - BasicRemoteQueryCommunity, -) -from tribler.core.utilities.path_util import Path -from tribler.core.utilities.unicode import hexlify - - -class TestRemoteSearchByTags(TriblerTestBase): - """ In this test set we will use only one node's instance as it is sufficient - for testing remote search by tags - """ - - def setUp(self): - super().setUp() - self.metadata_store = None - self.tribler_db = None - self.initialize(BasicRemoteQueryCommunity, 1) - - async def tearDown(self): - if self.metadata_store: - self.metadata_store.shutdown() - if self.tribler_db: - self.tribler_db.shutdown() - - await super().tearDown() - - def create_node(self, *args, **kwargs): - self.metadata_store = MetadataStore( - Path(self.temporary_directory()) / "mds.db", - Path(self.temporary_directory()), - default_eccrypto.generate_key("curve25519"), - disable_sync=True, - ) - self.tribler_db = TriblerDatabase(str(Path(self.temporary_directory()) / "tags.db")) - - kwargs['metadata_store'] = self.metadata_store - kwargs['tribler_db'] = self.tribler_db - kwargs['rqc_settings'] = RemoteQueryCommunitySettings() - return super().create_node(*args, **kwargs) - - @property - def rqc(self) -> RemoteQueryCommunity: - return self.overlay(0) - - @patch.object(RemoteQueryCommunity, 'tribler_db', new=PropertyMock(return_value=None), create=True) - def test_search_for_tags_no_db(self): - # test that in case of missed `tribler_db`, function `search_for_tags` returns None - assert self.rqc.search_for_tags(tags=['tag']) is None - - @patch.object(KnowledgeDataAccessLayer, 'get_subjects_intersection') - def test_search_for_tags_only_valid_tags(self, mocked_get_subjects_intersection: Mock): - # test that function `search_for_tags` uses only valid tags - self.rqc.search_for_tags(tags=['invalid_tag' * 50, 'valid_tag']) - mocked_get_subjects_intersection.assert_called_with( - subjects_type=ResourceType.TORRENT, - objects={'valid_tag'}, - predicate=ResourceType.TAG, - case_sensitive=False - ) - - @patch.object(MetadataStore, 'get_entries_threaded', new_callable=AsyncMock) - async def test_process_rpc_query_no_tags(self, mocked_get_entries_threaded: AsyncMock): - # test that in case of missed tags, the remote search works like normal remote search - parameters = {'first': 0, 'infohash_set': None, 'last': 100} - await self.rqc.process_rpc_query(parameters) - - expected_parameters = {'infohash_set': None} - expected_parameters.update(parameters) - mocked_get_entries_threaded.assert_called_with(**expected_parameters) - - async def test_process_rpc_query_with_tags(self): - # This is full test that checked whether search by tags works or not - # - # Test assumes that two databases were filled by the following data (TagsDatabase and MDS): - infohash1 = os.urandom(20) - infohash2 = os.urandom(20) - infohash3 = os.urandom(20) - - @db_session - def fill_tags_database(): - TestKnowledgeAccessLayerBase.add_operation_set( - self.rqc.tribler_db, - { - hexlify(infohash1): [ - Resource(predicate=ResourceType.TAG, name='tag1', count=SHOW_THRESHOLD), - ], - hexlify(infohash2): [ - Resource(predicate=ResourceType.TAG, name='tag1', count=SHOW_THRESHOLD - 1), - ] - } - ) - - @db_session - def fill_mds(): - with db_session: - def _add(infohash): - torrent = {"infohash": infohash, "title": 'title', "tags": "", "size": 1, "status": NEW} - self.rqc.mds.TorrentMetadata.from_dict(torrent) - - _add(infohash1) - _add(infohash2) - _add(infohash3) - - fill_tags_database() - fill_mds() - - # Then we try to query search for three tags: 'tag1', 'tag2', 'tag3' - parameters = {'first': 0, 'infohash_set': None, 'last': 100, 'tags': ['tag1']} - with db_session: - query_results = [r.to_dict() for r in await self.rqc.process_rpc_query(parameters)] - - # Expected results: only one infohash (b'infohash1') should be returned. - result_infohash_list = [r['infohash'] for r in query_results] - assert result_infohash_list == [infohash1] diff --git a/src/tribler/core/components/metadata_store/restapi/search_endpoint.py b/src/tribler/core/components/metadata_store/restapi/search_endpoint.py index bd04fde58f8..a1570aefef9 100644 --- a/src/tribler/core/components/metadata_store/restapi/search_endpoint.py +++ b/src/tribler/core/components/metadata_store/restapi/search_endpoint.py @@ -15,7 +15,7 @@ from tribler.core.components.metadata_store.restapi.metadata_endpoint import MetadataEndpointBase from tribler.core.components.metadata_store.restapi.metadata_schema import SearchMetadataParameters, MetadataSchema, \ RemoteQueryParameters -from tribler.core.components.popularity.community.popularity_community import PopularityCommunity +from tribler.core.components.content_discovery.community.content_discovery_community import ContentDiscoveryCommunity from tribler.core.components.restapi.rest.rest_endpoint import HTTP_BAD_REQUEST, RESTResponse from tribler.core.utilities.pony_utils import run_threaded from tribler.core.utilities.utilities import froze_it @@ -31,7 +31,7 @@ class SearchEndpoint(MetadataEndpointBase): """ path = '/search' - def __init__(self, popularity_community: PopularityCommunity, *args, **kwargs): + def __init__(self, popularity_community: ContentDiscoveryCommunity, *args, **kwargs): MetadataEndpointBase.__init__(self, *args, **kwargs) self.popularity_community = popularity_community diff --git a/src/tribler/core/components/metadata_store/restapi/tests/test_search_endpoint.py b/src/tribler/core/components/metadata_store/restapi/tests/test_search_endpoint.py index d149d6c90f7..5a584d3ae88 100644 --- a/src/tribler/core/components/metadata_store/restapi/tests/test_search_endpoint.py +++ b/src/tribler/core/components/metadata_store/restapi/tests/test_search_endpoint.py @@ -30,13 +30,13 @@ def needle_in_haystack_mds(metadata_store): @pytest.fixture -def mock_popularity_community(): +def mock_content_discovery_community(): return Mock() @pytest.fixture -def endpoint(mock_popularity_community, needle_in_haystack_mds, tribler_db): - return SearchEndpoint(mock_popularity_community, needle_in_haystack_mds, tribler_db=tribler_db) +def endpoint(mock_content_discovery_community, needle_in_haystack_mds, tribler_db): + return SearchEndpoint(mock_content_discovery_community, needle_in_haystack_mds, tribler_db=tribler_db) async def test_search_wrong_mdtype(rest_api): @@ -231,7 +231,7 @@ def test_build_snippets_no_infohash(endpoint: SearchEndpoint): assert result == search_results -async def test_create_remote_search_request(rest_api, mock_popularity_community): +async def test_create_remote_search_request(rest_api, mock_content_discovery_community): """ Test that remote search call is sent on a REST API search request """ @@ -244,7 +244,7 @@ def mock_send(**kwargs): return request_uuid, peers # Test querying for keywords - mock_popularity_community.send_search_request = mock_send + mock_content_discovery_community.send_search_request = mock_send search_txt = "foo" await do_request( rest_api, @@ -264,7 +264,7 @@ def mock_send(**kwargs): assert hexlify(sent['channel_pk']) == channel_pk -async def test_create_remote_search_request_illegal(rest_api, mock_popularity_community): +async def test_create_remote_search_request_illegal(rest_api, mock_content_discovery_community): """ Test that remote search call is sent on a REST API search request """ diff --git a/src/tribler/core/components/metadata_store/settings.py b/src/tribler/core/components/metadata_store/settings.py new file mode 100644 index 00000000000..4a2bc8202f8 --- /dev/null +++ b/src/tribler/core/components/metadata_store/settings.py @@ -0,0 +1,10 @@ +from pydantic import Field + +from tribler.core.config.tribler_config_section import TriblerConfigSection +from tribler.core.utilities.simpledefs import STATEDIR_CHANNELS_DIR + + +class ChantSettings(TriblerConfigSection): + enabled: bool = True + testnet: bool = Field(default=False, env='CHANT_TESTNET') + channels_dir: str = STATEDIR_CHANNELS_DIR diff --git a/src/tribler/core/components/popularity/community/__init__.py b/src/tribler/core/components/popularity/community/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/src/tribler/core/components/popularity/community/popularity_community.py b/src/tribler/core/components/popularity/community/popularity_community.py deleted file mode 100644 index 0a6453faf77..00000000000 --- a/src/tribler/core/components/popularity/community/popularity_community.py +++ /dev/null @@ -1,184 +0,0 @@ -from __future__ import annotations - -import random -import time -import uuid -from binascii import unhexlify -from typing import List, TYPE_CHECKING - -from ipv8.lazy_community import lazy_wrapper -from pony.orm import db_session - -from tribler.core import notifications -from tribler.core.components.ipv8.discovery_booster import DiscoveryBooster -from tribler.core.components.metadata_store.db.store import ObjState -from tribler.core.components.metadata_store.remote_query_community.remote_query_community import RemoteQueryCommunity -from tribler.core.components.popularity.community.payload import PopularTorrentsRequest, TorrentsHealthPayload -from tribler.core.components.popularity.community.version_community_mixin import VersionCommunityMixin -from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HealthInfo -from tribler.core.utilities.notifier import Notifier -from tribler.core.utilities.pony_utils import run_threaded -from tribler.core.utilities.unicode import hexlify -from tribler.core.utilities.utilities import get_normally_distributed_positive_integers - -if TYPE_CHECKING: - from tribler.core.components.torrent_checker.torrent_checker.torrent_checker import TorrentChecker - -max_address_cache_lifetime = 5.0 # seconds - - -class PopularityCommunity(RemoteQueryCommunity, VersionCommunityMixin): - """ - Community for disseminating the content across the network. - - Push: - - Every 5 seconds it gossips 10 random torrents to a random peer. - Pull: - - Every time it receives an introduction request, it sends a request - to return their popular torrents. - - Gossiping is for checked torrents only. - """ - GOSSIP_INTERVAL_FOR_RANDOM_TORRENTS = 5 # seconds - GOSSIP_POPULAR_TORRENT_COUNT = 10 - GOSSIP_RANDOM_TORRENT_COUNT = 10 - - community_id = unhexlify('9aca62f878969c437da9844cba29a134917e1648') - - def __init__(self, *args, torrent_checker=None, notifier=None, **kwargs): - # Creating a separate instance of Network for this community to find more peers - super().__init__(*args, **kwargs) - self.torrent_checker: TorrentChecker = torrent_checker - self.notifier: Notifier = notifier - - self.add_message_handler(TorrentsHealthPayload, self.on_torrents_health) - self.add_message_handler(PopularTorrentsRequest, self.on_popular_torrents_request) - - self.logger.info('Popularity Community initialized (peer mid %s)', hexlify(self.my_peer.mid)) - self.register_task("gossip_random_torrents", self.gossip_random_torrents_health, - interval=PopularityCommunity.GOSSIP_INTERVAL_FOR_RANDOM_TORRENTS) - - # Init version community message handlers - self.init_version_community() - - self.address_cache = {} - self.address_cache_created_at = time.time() - - self.discovery_booster = DiscoveryBooster() - self.discovery_booster.apply(self) - - def introduction_request_callback(self, peer, dist, payload): - super().introduction_request_callback(peer, dist, payload) - # Send request to peer to send popular torrents - self.ez_send(peer, PopularTorrentsRequest()) - - def get_alive_checked_torrents(self) -> List[HealthInfo]: - if not self.torrent_checker: - return [] - - # Filter torrents that have seeders - return [health for health in self.torrent_checker.torrents_checked.values() if - health.seeders > 0 and health.leechers >= 0] - - def gossip_random_torrents_health(self): - """ - Gossip random torrent health information to another peer. - """ - if not self.get_peers() or not self.torrent_checker: - return - - random_torrents = self.get_random_torrents() - random_peer = random.choice(self.get_peers()) - - self.ez_send(random_peer, TorrentsHealthPayload.create(random_torrents, {})) - - @lazy_wrapper(TorrentsHealthPayload) - async def on_torrents_health(self, peer, payload): - self.logger.debug(f"Received torrent health information for " - f"{len(payload.torrents_checked)} popular torrents and" - f" {len(payload.random_torrents)} random torrents") - - health_tuples = payload.random_torrents + payload.torrents_checked - health_list = [HealthInfo(infohash, last_check=last_check, seeders=seeders, leechers=leechers) - for infohash, seeders, leechers, last_check in health_tuples] - - for infohash in await run_threaded(self.mds.db, self.process_torrents_health, health_list): - # Get a single result per infohash to avoid duplicates - self.send_remote_select(peer=peer, infohash=infohash, last=1) - - @db_session - def process_torrents_health(self, health_list: List[HealthInfo]): - infohashes_to_resolve = set() - for health in health_list: - added = self.mds.process_torrent_health(health) - if added: - infohashes_to_resolve.add(health.infohash) - return infohashes_to_resolve - - @lazy_wrapper(PopularTorrentsRequest) - async def on_popular_torrents_request(self, peer, payload): - self.logger.debug("Received popular torrents health request") - popular_torrents = self.get_likely_popular_torrents() - self.ez_send(peer, TorrentsHealthPayload.create({}, popular_torrents)) - - def get_likely_popular_torrents(self) -> List[HealthInfo]: - checked_and_alive = self.get_alive_checked_torrents() - if not checked_and_alive: - return [] - - num_torrents = len(checked_and_alive) - num_torrents_to_send = min(PopularityCommunity.GOSSIP_RANDOM_TORRENT_COUNT, num_torrents) - likely_popular_indices = self._get_likely_popular_indices(num_torrents_to_send, num_torrents) - - sorted_torrents = sorted(list(checked_and_alive), key=lambda health: -health.seeders) - likely_popular_torrents = [sorted_torrents[i] for i in likely_popular_indices] - return likely_popular_torrents - - def _get_likely_popular_indices(self, size, limit) -> List[int]: - """ - Returns a list of indices favoring the lower value numbers. - - Assuming lower indices being more popular than higher value indices, the returned list - favors the lower indexed popular values. - @param size: Number of indices to return - @param limit: Max number of indices that can be returned. - @return: List of non-repeated positive indices. - """ - return get_normally_distributed_positive_integers(size=size, upper_limit=limit) - - def get_random_torrents(self) -> List[HealthInfo]: - checked_and_alive = list(self.get_alive_checked_torrents()) - if not checked_and_alive: - return [] - - num_torrents = len(checked_and_alive) - num_torrents_to_send = min(PopularityCommunity.GOSSIP_RANDOM_TORRENT_COUNT, num_torrents) - - random_torrents = random.sample(checked_and_alive, num_torrents_to_send) - return random_torrents - - def get_random_peers(self, sample_size=None): - # Randomly sample sample_size peers from the complete list of our peers - all_peers = self.get_peers() - return random.sample(all_peers, min(sample_size or len(all_peers), len(all_peers))) - - def send_search_request(self, **kwargs): - # Send a remote query request to multiple random peers to search for some terms - request_uuid = uuid.uuid4() - - def notify_gui(request, processing_results): - results = [ - r.md_obj.to_simple_dict() - for r in processing_results - if r.obj_state == ObjState.NEW_OBJECT - ] - if self.notifier: - self.notifier[notifications.remote_query_results]( - {"results": results, "uuid": str(request_uuid), "peer": hexlify(request.peer.mid)}) - - peers_to_query = self.get_random_peers(self.rqc_settings.max_query_peers) - - for p in peers_to_query: - self.send_remote_select(p, **kwargs, processing_callback=notify_gui) - - return request_uuid, peers_to_query diff --git a/src/tribler/core/components/popularity/community/sync_strategy.py b/src/tribler/core/components/popularity/community/sync_strategy.py deleted file mode 100644 index a1ef37fb3be..00000000000 --- a/src/tribler/core/components/popularity/community/sync_strategy.py +++ /dev/null @@ -1,23 +0,0 @@ -from random import choice - -from ipv8.peerdiscovery.discovery import DiscoveryStrategy - -TARGET_PEERS_NUMBER = 20 - - -class RemovePeers(DiscoveryStrategy): - """ - Synchronization strategy for remote query community. - - Remove a random peer, if we have enough peers to walk to. - """ - - def __init__(self, overlay, target_peers_number=TARGET_PEERS_NUMBER): - super().__init__(overlay) - self.target_peers_number = target_peers_number - - def take_step(self): - with self.walk_lock: - peers = self.overlay.get_peers() - if peers and len(peers) > self.target_peers_number: - self.overlay.network.remove_peer(choice(peers)) diff --git a/src/tribler/core/components/popularity/community/tests/__init__.py b/src/tribler/core/components/popularity/community/tests/__init__.py deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/src/tribler/core/components/popularity/community/tests/test_popularity_community.py b/src/tribler/core/components/popularity/community/tests/test_popularity_community.py deleted file mode 100644 index f449782eccc..00000000000 --- a/src/tribler/core/components/popularity/community/tests/test_popularity_community.py +++ /dev/null @@ -1 +0,0 @@ -import time from random import randint from typing import List from unittest.mock import Mock from ipv8.keyvault.crypto import default_eccrypto from pony.orm import db_session from tribler.core import notifications from tribler.core.components.ipv8.adapters_tests import TriblerMockIPv8, TriblerTestBase from tribler.core.components.metadata_store.db.store import MetadataStore from tribler.core.components.metadata_store.remote_query_community.settings import RemoteQueryCommunitySettings from tribler.core.components.popularity.community.popularity_community import PopularityCommunity from tribler.core.components.torrent_checker.torrent_checker.torrentchecker_session import HealthInfo from tribler.core.tests.tools.base_test import MockObject from tribler.core.utilities.path_util import Path from tribler.core.utilities.utilities import random_infohash def _generate_single_checked_torrent(status: str = None) -> HealthInfo: """ Assumptions DEAD -> peers: 0 POPULAR -> Peers: [101, 1000] DEFAULT -> peers: [1, 100] # alive """ def get_peers_for(health_status): if health_status == 'DEAD': return 0 if health_status == 'POPULAR': return randint(101, 1000) return randint(1, 100) return HealthInfo(random_infohash(), seeders=get_peers_for(status), leechers=get_peers_for(status)) def _generate_checked_torrents(count: int, status: str = None) -> List[HealthInfo]: return [_generate_single_checked_torrent(status) for _ in range(count)] class TestPopularityCommunity(TriblerTestBase): NUM_NODES = 2 def setUp(self): super().setUp() self.count = 0 self.metadata_store_set = set() self.initialize(PopularityCommunity, self.NUM_NODES) async def tearDown(self): for metadata_store in self.metadata_store_set: metadata_store.shutdown() await super().tearDown() def create_node(self, *args, **kwargs): mds = MetadataStore(Path(self.temporary_directory()) / f"{self.count}", Path(self.temporary_directory()), default_eccrypto.generate_key("curve25519")) self.metadata_store_set.add(mds) torrent_checker = MockObject() torrent_checker.torrents_checked = {} self.count += 1 rqc_settings = RemoteQueryCommunitySettings() return TriblerMockIPv8("curve25519", PopularityCommunity, metadata_store=mds, torrent_checker=torrent_checker, rqc_settings=rqc_settings ) @db_session def fill_database(self, metadata_store, last_check_now=False): for torrent_ind in range(5): last_check = int(time.time()) if last_check_now else 0 metadata_store.TorrentState( infohash=str(torrent_ind).encode() * 20, seeders=torrent_ind + 1, last_check=last_check) async def init_first_node_and_gossip(self, checked_torrent_info: HealthInfo, deliver_timeout: float = 0.1): self.nodes[0].overlay.torrent_checker.torrents_checked[checked_torrent_info.infohash] = checked_torrent_info await self.introduce_nodes() self.nodes[0].overlay.gossip_random_torrents_health() await self.deliver_messages(timeout=deliver_timeout) def torrent_metadata(self, i): return self.overlay(i).mds.TorrentMetadata async def test_torrents_health_gossip(self): """ Test whether torrent health information is correctly gossiped around """ checked_torrent_info = HealthInfo(b'a' * 20, seeders=200, leechers=0) node0_db = self.nodes[0].overlay.mds.TorrentState node1_db2 = self.nodes[1].overlay.mds.TorrentState with db_session: assert node0_db.select().count() == 0 assert node1_db2.select().count() == 0 await self.init_first_node_and_gossip(checked_torrent_info) # Check whether node 1 has new torrent health information with db_session: torrent = node1_db2.select().first() assert torrent.infohash == checked_torrent_info.infohash assert torrent.seeders == checked_torrent_info.seeders assert torrent.leechers == checked_torrent_info.leechers assert torrent.last_check == checked_torrent_info.last_check def test_get_alive_torrents(self): dead_torrents = _generate_checked_torrents(100, 'DEAD') popular_torrents = _generate_checked_torrents(100, 'POPULAR') alive_torrents = _generate_checked_torrents(100) all_checked_torrents = dead_torrents + alive_torrents + popular_torrents self.nodes[0].overlay.torrent_checker.torrents_checked.update( {health.infohash: health for health in all_checked_torrents}) actual_alive_torrents = self.nodes[0].overlay.get_alive_checked_torrents() assert len(actual_alive_torrents) == len(alive_torrents + popular_torrents) async def test_torrents_health_gossip_multiple(self): """ Test whether torrent health information is correctly gossiped around """ dead_torrents = _generate_checked_torrents(100, 'DEAD') popular_torrents = _generate_checked_torrents(100, 'POPULAR') alive_torrents = _generate_checked_torrents(100) all_checked_torrents = dead_torrents + alive_torrents + popular_torrents node0_db = self.nodes[0].overlay.mds.TorrentState node1_db = self.nodes[1].overlay.mds.TorrentState # Given, initially there are no torrents in the database with db_session: node0_count = node0_db.select().count() node1_count = node1_db.select().count() assert node0_count == 0 assert node1_count == 0 # Setup, node 0 checks some torrents, both dead and alive (including popular ones). self.nodes[0].overlay.torrent_checker.torrents_checked.update( {health.infohash: health for health in all_checked_torrents}) # Nodes are introduced await self.introduce_nodes() # Since on introduction request callback, node asks for popular torrents, we expect that # popular torrents are shared by node 0 to node 1. with db_session: node0_count = node0_db.select().count() node1_count = node1_db.select().count() assert node0_count == 0 # Nothing received from Node 1 because it hasn't checked anything to share. assert node1_count == PopularityCommunity.GOSSIP_POPULAR_TORRENT_COUNT node1_db_last_count = node1_count # Now, assuming Node 0 gossips random torrents to Node 1 multiple times to simulate periodic nature for _ in range(10): self.nodes[0].overlay.gossip_random_torrents_health() await self.deliver_messages(timeout=0.1) # After gossip, Node 1 should have received some random torrents from Node 0. # Note that random torrents can also include popular torrents sent during introduction # and random torrents sent in earlier gossip since no state is maintained. with db_session: node0_count = node0_db.select().count() node1_count = node1_db.select().count() assert node0_count == 0 # Still nothing received from Node 1 because it hasn't checked torrents assert node1_count >= node1_db_last_count node1_db_last_count = node1_count async def test_torrents_health_update(self): """ Test updating the local torrent health information from network """ self.fill_database(self.nodes[1].overlay.mds) checked_torrent_info = HealthInfo(b'0' * 20, seeders=200, leechers=0) await self.init_first_node_and_gossip(checked_torrent_info, deliver_timeout=0.5) # Check whether node 1 has new torrent health information with db_session: state = self.nodes[1].overlay.mds.TorrentState.get(infohash=b'0' * 20) self.assertIsNot(state.last_check, 0) async def test_unknown_torrent_query_back(self): """ Test querying sender for metadata upon receiving an unknown torrent """ infohash = b'1' * 20 with db_session: self.nodes[0].overlay.mds.TorrentMetadata(infohash=infohash) await self.init_first_node_and_gossip( HealthInfo(infohash, seeders=200, leechers=0)) with db_session: assert self.nodes[1].overlay.mds.TorrentMetadata.get() async def test_skip_torrent_query_back_for_known_torrent(self): # Test that we _don't_ send the query if we already know about the infohash infohash = b'1' * 20 with db_session: self.nodes[0].overlay.mds.TorrentMetadata(infohash=infohash) self.nodes[1].overlay.mds.TorrentMetadata(infohash=infohash) self.nodes[1].overlay.send_remote_select = Mock() await self.init_first_node_and_gossip( HealthInfo(infohash, seeders=200, leechers=0)) self.nodes[1].overlay.send_remote_select.assert_not_called() async def test_popularity_search(self): """ Test searching several nodes for metadata entries based on title text """ with db_session: # Add test metadata to node ID2 self.torrent_metadata(1)(title="ubuntu torrent", infohash=random_infohash()) self.torrent_metadata(1)(title="debian torrent", infohash=random_infohash()) notifier = Mock() self.overlay(0).notifier = {notifications.remote_query_results: notifier} self.overlay(0).send_search_request(**{"txt_filter": "ubuntu*"}) await self.deliver_messages() notifier.assert_called() \ No newline at end of file diff --git a/src/tribler/core/components/popularity/community/tests/test_sync_strategy.py b/src/tribler/core/components/popularity/community/tests/test_sync_strategy.py deleted file mode 100644 index 7d9e27a3e2a..00000000000 --- a/src/tribler/core/components/popularity/community/tests/test_sync_strategy.py +++ /dev/null @@ -1,63 +0,0 @@ -from ipv8.keyvault.crypto import default_eccrypto -from ipv8.peer import Peer -from ipv8.peerdiscovery.network import Network - -from tribler.core.components.popularity.community.sync_strategy import RemovePeers -from tribler.core.components.ipv8.adapters_tests import TriblerTestBase - - -class MockCommunity: - def __init__(self): - self.fetch_next_called = False - self.send_random_to_called = [] - self.get_peers_return = [] - self.network = Network() - - def send_random_to(self, peer): - self.send_random_to_called.append(peer) - - def fetch_next(self): - self.fetch_next_called = True - - def get_peers(self): - return self.get_peers_return - - -class TestRemovePeers(TriblerTestBase): - def setUp(self): - self.community = MockCommunity() - self.strategy = RemovePeers(self.community) - return super().setUp() - - def test_strategy_no_peers(self): - """ - If we have no peers, nothing should happen. - """ - self.strategy.take_step() - - self.assertSetEqual(set(), self.community.network.verified_peers) - - def test_strategy_one_peer(self): - """ - If we have one peer, it should not be removed. - """ - test_peer = Peer(default_eccrypto.generate_key("very-low")) - self.community.network.add_verified_peer(test_peer) - self.community.get_peers_return.append(test_peer) - - self.strategy.take_step() - - self.assertSetEqual({test_peer}, self.community.network.verified_peers) - - def test_strategy_multi_peer(self): - """ - If we have over 20 peers, one should be removed. - """ - for _ in range(21): - test_peer = Peer(default_eccrypto.generate_key("very-low")) - self.community.network.add_verified_peer(test_peer) - self.community.get_peers_return.append(test_peer) - - self.strategy.take_step() - - self.assertEqual(20, len(self.community.network.verified_peers)) diff --git a/src/tribler/core/components/popularity/community/tests/test_version_community_mixin.py b/src/tribler/core/components/popularity/community/tests/test_version_community_mixin.py deleted file mode 100644 index 2d43a5d423d..00000000000 --- a/src/tribler/core/components/popularity/community/tests/test_version_community_mixin.py +++ /dev/null @@ -1,64 +0,0 @@ -import os -import sys -from asyncio import Future - -from ipv8.messaging.serialization import default_serializer -from tribler.core.components.ipv8.adapters_tests import TriblerMockIPv8, TriblerTestBase -from tribler.core.components.ipv8.tribler_community import TriblerCommunity - -from tribler.core.components.popularity.community.version_community_mixin import VersionCommunityMixin, VersionResponse -from tribler.core.version import version_id - - -class VersionCommunity(VersionCommunityMixin, TriblerCommunity): - community_id = os.urandom(20) - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.init_version_community() - - -class TestVersionCommunity(TriblerTestBase): - NUM_NODES = 2 - - def setUp(self): - super().setUp() - self.initialize(VersionCommunity, self.NUM_NODES) - - def create_node(self, *args, **kwargs): - return TriblerMockIPv8("curve25519", VersionCommunity) - - def test_version_response_payload(self): - """ - Check if the version response is correctly serialized. - """ - version = "v7.10.0" - platform = "linux" - - version_response = VersionResponse(version, platform) - serialized = default_serializer.pack_serializable(version_response) - deserialized, _ = default_serializer.unpack_serializable(VersionResponse, serialized) - - self.assertEqual(version_response.version, version) - self.assertEqual(version_response.platform, platform) - self.assertEqual(deserialized.version, version) - self.assertEqual(deserialized.platform, platform) - - async def test_request_for_version(self): - """ - Test whether version request is responded well. - """ - await self.introduce_nodes() - - on_process_version_response_called = Future() - - def on_process_version_response(peer, version, platform): - self.assertEqual(peer, self.peer(1)) - self.assertEqual(version, version_id) - self.assertEqual(platform, sys.platform) - on_process_version_response_called.set_result(True) - - self.overlay(0).process_version_response = on_process_version_response - self.overlay(0).send_version_request(self.peer(1)) - - return await on_process_version_response_called diff --git a/src/tribler/core/components/popularity/community/version_community_mixin.py b/src/tribler/core/components/popularity/community/version_community_mixin.py deleted file mode 100644 index a6314d4b543..00000000000 --- a/src/tribler/core/components/popularity/community/version_community_mixin.py +++ /dev/null @@ -1,68 +0,0 @@ -import sys - -from ipv8.lazy_community import lazy_wrapper -from ipv8.messaging.lazy_payload import VariablePayload, vp_compile - -from tribler.core.version import version_id - - -@vp_compile -class VersionRequest(VariablePayload): - msg_id = 101 - - -@vp_compile -class VersionResponse(VariablePayload): - msg_id = 102 - format_list = ['varlenI', 'varlenI'] - names = ['version', 'platform'] - - def fix_pack_version(self, value): - return value.encode('utf-8') - - def fix_pack_platform(self, value): - return value.encode('utf-8') - - @classmethod - def fix_unpack_version(cls, value): - return value.decode('utf-8') - - @classmethod - def fix_unpack_platform(cls, value): - return value.decode('utf-8') - - -class VersionCommunityMixin: - """ - This mixin add the protocol messages to ask and receive version of Tribler and community the - peer is currently running. - - Knowing the version of Tribler or the individual community is not critical for normal operation - of Tribler but is useful in doing network experiments and monitoring of the network behavior - because of a new feature/algorithm deployment. - """ - - def init_version_community(self): - self.add_message_handler(VersionRequest, self.on_version_request) - self.add_message_handler(VersionResponse, self.on_version_response) - - def send_version_request(self, peer): - self.logger.info(f"Sending version request to {peer.address}") - self.ez_send(peer, VersionRequest()) - - @lazy_wrapper(VersionRequest) - async def on_version_request(self, peer, _): - self.logger.info(f"Received version request from {peer.address}") - version_response = VersionResponse(version_id, sys.platform) - self.ez_send(peer, version_response) - - @lazy_wrapper(VersionResponse) - async def on_version_response(self, peer, payload): - self.logger.info(f"Received version response from {peer.address}") - self.process_version_response(peer, payload.version, payload.platform) - - def process_version_response(self, peer, version, platform): - """ - This is the method the implementation community or the experiment will implement - to process the version and platform information. - """ diff --git a/src/tribler/core/components/popularity/popularity_component.py b/src/tribler/core/components/popularity/popularity_component.py deleted file mode 100644 index 0c0b00ff566..00000000000 --- a/src/tribler/core/components/popularity/popularity_component.py +++ /dev/null @@ -1,42 +0,0 @@ -from ipv8.peerdiscovery.network import Network - -from tribler.core.components.component import Component -from tribler.core.components.ipv8.ipv8_component import INFINITE, Ipv8Component -from tribler.core.components.metadata_store.metadata_store_component import MetadataStoreComponent -from tribler.core.components.popularity.community.popularity_community import PopularityCommunity -from tribler.core.components.popularity.community.sync_strategy import RemovePeers -from tribler.core.components.reporter.reporter_component import ReporterComponent -from tribler.core.components.torrent_checker.torrent_checker_component import TorrentCheckerComponent - - -class PopularityComponent(Component): - community: PopularityCommunity = None - - _ipv8_component: Ipv8Component = None - - async def run(self): - await super().run() - await self.get_component(ReporterComponent) - - self._ipv8_component = await self.require_component(Ipv8Component) - metadata_store_component = await self.require_component(MetadataStoreComponent) - torrent_checker_component = await self.require_component(TorrentCheckerComponent) - - config = self.session.config - community = PopularityCommunity(self._ipv8_component.peer, - self._ipv8_component.ipv8.endpoint, - Network(), - settings=config.popularity_community, - rqc_settings=config.remote_query_community, - metadata_store=metadata_store_component.mds, - torrent_checker=torrent_checker_component.torrent_checker, - notifier=self.session.notifier) - self.community = community - - self._ipv8_component.initialise_community_by_default(community, default_random_walk_max_peers=30) - self._ipv8_component.ipv8.add_strategy(community, RemovePeers(community), INFINITE) - - async def shutdown(self): - await super().shutdown() - if self._ipv8_component and self.community: - await self._ipv8_component.unload_community(self.community) diff --git a/src/tribler/core/components/popularity/settings.py b/src/tribler/core/components/popularity/settings.py deleted file mode 100644 index e305372723c..00000000000 --- a/src/tribler/core/components/popularity/settings.py +++ /dev/null @@ -1,22 +0,0 @@ -from pydantic import Field - -from tribler.core.config.tribler_config_section import TriblerConfigSection -from tribler.core.utilities.simpledefs import STATEDIR_CHANNELS_DIR - - -class PopularityCommunitySettings(TriblerConfigSection): - enabled: bool = True - cache_dir: str = 'health_cache' - - -class ChantSettings(TriblerConfigSection): - enabled: bool = True - manager_enabled: bool = True - channel_edit: bool = False - channels_dir: str = STATEDIR_CHANNELS_DIR - testnet: bool = Field(default=False, env='CHANT_TESTNET') - - queried_peers_limit: int = 1000 - # The maximum number of peers that we got from channels to peers mapping, - # that must be queried in addition to randomly queried peers - max_mapped_query_peers = 3 diff --git a/src/tribler/core/components/restapi/restapi_component.py b/src/tribler/core/components/restapi/restapi_component.py index d0ceb5ff4d4..e44bc45c6c6 100644 --- a/src/tribler/core/components/restapi/restapi_component.py +++ b/src/tribler/core/components/restapi/restapi_component.py @@ -20,7 +20,7 @@ from tribler.core.components.metadata_store.metadata_store_component import MetadataStoreComponent from tribler.core.components.metadata_store.restapi.metadata_endpoint import MetadataEndpoint from tribler.core.components.metadata_store.restapi.search_endpoint import SearchEndpoint -from tribler.core.components.popularity.popularity_component import PopularityComponent +from tribler.core.components.content_discovery.content_discovery_component import ContentDiscoveryComponent from tribler.core.components.reporter.exception_handler import CoreExceptionHandler, default_core_exception_handler from tribler.core.components.reporter.reported_error import ReportedError from tribler.core.components.reporter.reporter_component import ReporterComponent @@ -77,7 +77,7 @@ async def run(self): libtorrent_component = await self.maybe_component(LibtorrentComponent) resource_monitor_component = await self.maybe_component(ResourceMonitorComponent) bandwidth_accounting_component = await self.maybe_component(BandwidthAccountingComponent) - popularity_component = await self.maybe_component(PopularityComponent) + content_discovery_component = await self.maybe_component(ContentDiscoveryComponent) knowledge_component = await self.maybe_component(KnowledgeComponent) tunnel_component = await self.maybe_component(TunnelsComponent) torrent_checker_component = await self.maybe_component(TorrentCheckerComponent) @@ -108,7 +108,7 @@ async def run(self): self.maybe_add(MetadataEndpoint, libtorrent_component.download_manager, torrent_checker, metadata_store_component.mds, tribler_db=db_component.db, tag_rules_processor=knowledge_component.rules_processor) - self.maybe_add(SearchEndpoint, popularity_component.community, + self.maybe_add(SearchEndpoint, content_discovery_component.community, metadata_store_component.mds, tribler_db=db_component.db) self.maybe_add(KnowledgeEndpoint, db=db_component.db, community=knowledge_component.community) diff --git a/src/tribler/core/components/torrent_checker/torrent_checker/torrent_checker.py b/src/tribler/core/components/torrent_checker/torrent_checker/torrent_checker.py index 0125c2c08a2..c124e94e054 100644 --- a/src/tribler/core/components/torrent_checker/torrent_checker/torrent_checker.py +++ b/src/tribler/core/components/torrent_checker/torrent_checker/torrent_checker.py @@ -64,7 +64,7 @@ def __init__(self, self.udp_transport = None # We keep track of the results of popular torrents checked by you. - # The popularity community gossips this information around. + # The content_discovery community gossips this information around. self._torrents_checked: Optional[Dict[bytes, HealthInfo]] = None async def initialize(self): diff --git a/src/tribler/core/config/tribler_config.py b/src/tribler/core/config/tribler_config.py index e8478756276..cd3dc0ae6e4 100644 --- a/src/tribler/core/config/tribler_config.py +++ b/src/tribler/core/config/tribler_config.py @@ -18,8 +18,8 @@ ) from tribler.core.components.key.settings import TrustchainSettings from tribler.core.components.libtorrent.settings import DownloadDefaultsSettings, LibtorrentSettings -from tribler.core.components.metadata_store.remote_query_community.settings import RemoteQueryCommunitySettings -from tribler.core.components.popularity.settings import PopularityCommunitySettings, ChantSettings +from tribler.core.components.content_discovery.settings import ContentDiscoveryComponentConfig +from tribler.core.components.metadata_store.settings import ChantSettings from tribler.core.components.resource_monitor.settings import ResourceMonitorSettings from tribler.core.components.restapi.rest.settings import APISettings from tribler.core.components.torrent_checker.settings import TorrentCheckerSettings @@ -53,8 +53,7 @@ class Config: download_defaults: DownloadDefaultsSettings = DownloadDefaultsSettings() api: APISettings = APISettings() resource_monitor: ResourceMonitorSettings = ResourceMonitorSettings() - popularity_community: PopularityCommunitySettings = PopularityCommunitySettings() - remote_query_community: RemoteQueryCommunitySettings = RemoteQueryCommunitySettings() + content_discovery_community: ContentDiscoveryComponentConfig = ContentDiscoveryComponentConfig() # Special configuration options related to the operation mode of the Core upgrader_enabled: bool = True diff --git a/src/tribler/core/notifications.py b/src/tribler/core/notifications.py index 9b25a74effe..2b2c6304f50 100644 --- a/src/tribler/core/notifications.py +++ b/src/tribler/core/notifications.py @@ -64,7 +64,7 @@ def tribler_exception(error: dict): ... -def popularity_community_unknown_torrent_added(): +def content_discovery_community_unknown_torrent_added(): ... diff --git a/src/tribler/core/start_core.py b/src/tribler/core/start_core.py index 1164c6d0e9b..e737cab9d7b 100644 --- a/src/tribler/core/start_core.py +++ b/src/tribler/core/start_core.py @@ -23,7 +23,7 @@ from tribler.core.components.libtorrent.libtorrent_component import LibtorrentComponent from tribler.core.components.metadata_store.metadata_store_component import MetadataStoreComponent from tribler.core.components.payout.payout_component import PayoutComponent -from tribler.core.components.popularity.popularity_component import PopularityComponent +from tribler.core.components.content_discovery.content_discovery_component import ContentDiscoveryComponent from tribler.core.components.reporter.exception_handler import default_core_exception_handler from tribler.core.components.reporter.reporter_component import ReporterComponent from tribler.core.components.resource_monitor.resource_monitor_component import ResourceMonitorComponent @@ -77,8 +77,8 @@ def components_gen(config: TriblerConfig): if config.torrent_checking.enabled: yield TorrentCheckerComponent() - if config.ipv8.enabled and config.torrent_checking.enabled and config.popularity_community.enabled: - yield PopularityComponent() + if config.ipv8.enabled and config.torrent_checking.enabled and config.content_discovery_community.enabled: + yield ContentDiscoveryComponent() # The components below are skipped if config.gui_test_mode == True if config.gui_test_mode: diff --git a/src/tribler/core/tests/tools/data/state_dir_dummy/triblerd.conf b/src/tribler/core/tests/tools/data/state_dir_dummy/triblerd.conf index 6489b9a4e1b..a05e8dfcd9c 100644 --- a/src/tribler/core/tests/tools/data/state_dir_dummy/triblerd.conf +++ b/src/tribler/core/tests/tools/data/state_dir_dummy/triblerd.conf @@ -16,4 +16,4 @@ port = 8085 key = 129bbdf7199cec22c9e72a2ad965336e [resource_monitor] -[popularity_community] +[content_discovery_community]