diff --git a/doc/requirements.txt b/doc/requirements.txt index c106361bb25..03b8fa0db8b 100644 --- a/doc/requirements.txt +++ b/doc/requirements.txt @@ -9,5 +9,5 @@ sphinxcontrib-openapi==0.7.0 configobj==5.0.6 mistune==0.8.4 # sphinxcontrib-openapi==0.7.0 cannot work with the latest mistune version (2.0.0) MarkupSafe==2.0.1 # used by jinja2; 2.1.0 version removes soft_unicode and breaks jinja2-2.11.3 -pyipv8==2.8.0 +pyipv8==2.11.0 setuptools>=65.5.1 # not directly required, pinned by Snyk to avoid a vulnerability diff --git a/src/tribler/core/components/ipv8/ipv8_component.py b/src/tribler/core/components/ipv8/ipv8_component.py index 7b07d4a332e..97e22761ab7 100644 --- a/src/tribler/core/components/ipv8/ipv8_component.py +++ b/src/tribler/core/components/ipv8/ipv8_component.py @@ -15,7 +15,10 @@ from ipv8_service import IPv8 from tribler.core.components.component import Component +from tribler.core.components.ipv8.rendezvous.db.database import RendezvousDatabase +from tribler.core.components.ipv8.rendezvous.rendezvous_hook import RendezvousHook from tribler.core.components.key.key_component import KeyComponent +from tribler.core.utilities.simpledefs import STATEDIR_DB_DIR INFINITE = -1 @@ -30,6 +33,10 @@ class Ipv8Component(Component): _task_manager: TaskManager _peer_discovery_community: Optional[DiscoveryCommunity] = None + RENDEZVOUS_DB_NAME = 'rendezvous.db' + rendezvous_db: Optional[RendezvousDatabase] = None + rendevous_hook: Optional[RendezvousHook] = None + async def run(self): await super().run() @@ -37,6 +44,11 @@ async def run(self): self._task_manager = TaskManager() + if config.ipv8.rendezvous_stats: + self.rendezvous_db = RendezvousDatabase( + db_path=self.session.config.state_dir / STATEDIR_DB_DIR / self.RENDEZVOUS_DB_NAME) + self.rendevous_hook = RendezvousHook(self.rendezvous_db) + port = config.ipv8.port address = config.ipv8.address self.logger.info('Starting ipv8') @@ -52,14 +64,12 @@ async def run(self): if config.gui_test_mode: endpoint = DispatcherEndpoint([]) else: - # IPv8 includes IPv6 support by default. - # We only load IPv4 to not kill all Tribler overlays (currently, it would instantly crash all users). - # If you want to test IPv6 in Tribler you can set ``endpoint = None`` here. - endpoint = DispatcherEndpoint(["UDPIPv4"], UDPIPv4={'port': port, - 'ip': address}) + endpoint = DispatcherEndpoint(["UDPIPv4"], UDPIPv4={'port': port, 'ip': address}) ipv8 = IPv8(ipv8_config_builder.finalize(), enable_statistics=config.ipv8.statistics and not config.gui_test_mode, endpoint_override=endpoint) + if config.ipv8.rendezvous_stats: + ipv8.network.add_peer_observer(self.rendevous_hook) await ipv8.start() self.ipv8 = ipv8 @@ -135,5 +145,7 @@ async def shutdown(self): if overlay: await self.ipv8.unload_overlay(overlay) + if self.rendevous_hook is not None: + self.rendevous_hook.shutdown(self.ipv8.network) await self._task_manager.shutdown_task_manager() await self.ipv8.stop(stop_loop=False) diff --git a/src/tribler/core/components/popularity/rendezvous/__init__.py b/src/tribler/core/components/ipv8/rendezvous/__init__.py similarity index 100% rename from src/tribler/core/components/popularity/rendezvous/__init__.py rename to src/tribler/core/components/ipv8/rendezvous/__init__.py diff --git a/src/tribler/core/components/popularity/rendezvous/db/__init__.py b/src/tribler/core/components/ipv8/rendezvous/db/__init__.py similarity index 100% rename from src/tribler/core/components/popularity/rendezvous/db/__init__.py rename to src/tribler/core/components/ipv8/rendezvous/db/__init__.py diff --git a/src/tribler/core/components/ipv8/rendezvous/db/database.py b/src/tribler/core/components/ipv8/rendezvous/db/database.py new file mode 100644 index 00000000000..8b5ac82e5e6 --- /dev/null +++ b/src/tribler/core/components/ipv8/rendezvous/db/database.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING, Union + +from pony.orm import Database, db_session, select + +from ipv8.peer import Peer +from tribler.core.components.ipv8.rendezvous.db.orm_bindings import certificate +from tribler.core.utilities.utilities import MEMORY_DB + +if TYPE_CHECKING: + from tribler.core.components.ipv8.rendezvous.db.orm_bindings.certificate import RendezvousCertificate + + +class RendezvousDatabase: + + def __init__(self, db_path: Union[Path, type(MEMORY_DB)]) -> None: + create_db = db_path is MEMORY_DB or not db_path.is_file() + db_path_string = ":memory:" if db_path is MEMORY_DB else str(db_path) + + self.database = Database() + self.Certificate = certificate.define_binding(self.database) + self.database.bind(provider='sqlite', filename=db_path_string, create_db=create_db, timeout=120.0) + self.database.generate_mapping(create_tables=create_db) + + def add(self, peer: Peer, start_timestamp: float, stop_timestamp: float) -> None: + with db_session(immediate=True): + self.Certificate(public_key=peer.public_key.key_to_bin(), + start=start_timestamp, + stop=stop_timestamp) + + def get(self, peer: Peer) -> list[RendezvousCertificate]: + with db_session(): + return select(certificate for certificate in self.Certificate + if certificate.public_key == peer.public_key.key_to_bin()).fetch() + + def shutdown(self) -> None: + self.database.disconnect() diff --git a/src/tribler/core/components/popularity/rendezvous/db/orm_bindings/__init__.py b/src/tribler/core/components/ipv8/rendezvous/db/orm_bindings/__init__.py similarity index 100% rename from src/tribler/core/components/popularity/rendezvous/db/orm_bindings/__init__.py rename to src/tribler/core/components/ipv8/rendezvous/db/orm_bindings/__init__.py diff --git a/src/tribler/core/components/ipv8/rendezvous/db/orm_bindings/certificate.py b/src/tribler/core/components/ipv8/rendezvous/db/orm_bindings/certificate.py new file mode 100644 index 00000000000..fd483ec39b8 --- /dev/null +++ b/src/tribler/core/components/ipv8/rendezvous/db/orm_bindings/certificate.py @@ -0,0 +1,20 @@ +import dataclasses +from typing import TYPE_CHECKING + +from pony.orm import Required + +if TYPE_CHECKING: + @dataclasses.dataclass + class RendezvousCertificate: + public_key: bytes + start: float + stop: float + + +def define_binding(db): + class RendezvousCertificate(db.Entity): + public_key = Required(bytes, index=True) + start = Required(float) + stop = Required(float) + + return RendezvousCertificate diff --git a/src/tribler/core/components/popularity/rendezvous/tests/__init__.py b/src/tribler/core/components/ipv8/rendezvous/db/tests/__init__.py similarity index 100% rename from src/tribler/core/components/popularity/rendezvous/tests/__init__.py rename to src/tribler/core/components/ipv8/rendezvous/db/tests/__init__.py diff --git a/src/tribler/core/components/ipv8/rendezvous/db/tests/test_database.py b/src/tribler/core/components/ipv8/rendezvous/db/tests/test_database.py new file mode 100644 index 00000000000..8db49139531 --- /dev/null +++ b/src/tribler/core/components/ipv8/rendezvous/db/tests/test_database.py @@ -0,0 +1,71 @@ +from typing import Generator + +import pytest + +from ipv8.keyvault.crypto import default_eccrypto +from ipv8.peer import Peer +from tribler.core.components.ipv8.rendezvous.db.database import RendezvousDatabase +from tribler.core.utilities.utilities import MEMORY_DB + + +@pytest.fixture(name="memdb", scope="function") +def fixture_memory_database() -> Generator[RendezvousDatabase, None, None]: + db = RendezvousDatabase(MEMORY_DB) + + yield db + + db.shutdown() + + +def generate_peer() -> Peer: + public_key = default_eccrypto.generate_key("curve25519").pub() + return Peer(public_key) + + +@pytest.fixture(name="peer", scope="module") +def fixture_peer() -> Generator[Peer, None, None]: + yield generate_peer() + + +@pytest.fixture(name="peer2", scope="function") +def fixture_peer2() -> Generator[Peer, None, None]: + yield generate_peer() + + +def test_retrieve_no_certificates(peer: Peer, memdb: RendezvousDatabase) -> None: + retrieved = memdb.get(peer) + + assert len(retrieved) == 0 + + +def test_retrieve_single_certificate(peer: Peer, memdb: RendezvousDatabase) -> None: + start_timestamp, stop_timestamp = range(1, 3) + memdb.add(peer, start_timestamp, stop_timestamp) + + retrieved = memdb.get(peer) + + assert len(retrieved) == 1 + assert retrieved[0].start, retrieved[0].stop == (start_timestamp, stop_timestamp) + + +def test_retrieve_multiple_certificates(peer: Peer, memdb: RendezvousDatabase) -> None: + start_timestamp1, stop_timestamp1, start_timestamp2, stop_timestamp2 = range(1, 5) + memdb.add(peer, start_timestamp1, stop_timestamp1) + memdb.add(peer, start_timestamp2, stop_timestamp2) + + retrieved = memdb.get(peer) + + assert len(retrieved) == 2 + assert retrieved[0].start, retrieved[0].stop == (start_timestamp1, stop_timestamp1) + assert retrieved[1].start, retrieved[1].stop == (start_timestamp2, stop_timestamp2) + + +def test_retrieve_filter_certificates(peer: Peer, peer2: Peer, memdb: RendezvousDatabase) -> None: + start_timestamp1, stop_timestamp1, start_timestamp2, stop_timestamp2 = range(1, 5) + memdb.add(peer, start_timestamp1, stop_timestamp1) + memdb.add(peer2, start_timestamp2, stop_timestamp2) + + retrieved = memdb.get(peer) + + assert len(retrieved) == 1 + assert retrieved[0].start, retrieved[0].stop == (start_timestamp1, stop_timestamp1) diff --git a/src/tribler/core/components/ipv8/rendezvous/rendezvous_hook.py b/src/tribler/core/components/ipv8/rendezvous/rendezvous_hook.py new file mode 100644 index 00000000000..b4579c0a170 --- /dev/null +++ b/src/tribler/core/components/ipv8/rendezvous/rendezvous_hook.py @@ -0,0 +1,31 @@ +import logging +import time + +from ipv8.peerdiscovery.network import Network, PeerObserver +from ipv8.types import Peer +from tribler.core.components.ipv8.rendezvous.db.database import RendezvousDatabase + + +class RendezvousHook(PeerObserver): + + def __init__(self, rendezvous_db: RendezvousDatabase) -> None: + self.rendezvous_db = rendezvous_db + + def shutdown(self, network: Network) -> None: + for peer in network.verified_peers: + self.on_peer_removed(peer) + if self.rendezvous_db: + self.rendezvous_db.shutdown() + + @property + def current_time(self) -> float: + return time.time() + + def on_peer_added(self, peer: Peer) -> None: + pass + + def on_peer_removed(self, peer: Peer) -> None: + if self.current_time >= peer.creation_time: + self.rendezvous_db.add(peer, peer.creation_time, self.current_time) + else: + logging.exception("%s was first seen in the future! Something is seriously wrong!", peer) diff --git a/src/tribler/core/components/ipv8/rendezvous/tests/__init__.py b/src/tribler/core/components/ipv8/rendezvous/tests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/src/tribler/core/components/ipv8/rendezvous/tests/test_rendezvous_hook.py b/src/tribler/core/components/ipv8/rendezvous/tests/test_rendezvous_hook.py new file mode 100644 index 00000000000..bea126f025a --- /dev/null +++ b/src/tribler/core/components/ipv8/rendezvous/tests/test_rendezvous_hook.py @@ -0,0 +1,90 @@ +from __future__ import annotations + +from typing import Generator + +import pytest + +from ipv8.keyvault.crypto import default_eccrypto +from ipv8.peer import Peer +from ipv8.peerdiscovery.network import Network +from tribler.core.components.ipv8.rendezvous.db.database import RendezvousDatabase +from tribler.core.components.ipv8.rendezvous.rendezvous_hook import RendezvousHook +from tribler.core.utilities.utilities import MEMORY_DB + + +class MockedRendezvousHook(RendezvousHook): + + def __init__(self, rendezvous_db: RendezvousDatabase, mocked_time: float | None = None) -> None: + super().__init__(rendezvous_db) + self.mocked_time = mocked_time + + @property + def current_time(self) -> float: + if self.mocked_time is None: + return super().current_time + return self.mocked_time + + +@pytest.fixture(name="memdb", scope="function") +def fixture_memory_database() -> Generator[RendezvousDatabase, None, None]: + db = RendezvousDatabase(MEMORY_DB) + + yield db + + db.shutdown() + + +@pytest.fixture(name="hook", scope="function") +def fixture_hook(memdb: RendezvousDatabase) -> Generator[MockedRendezvousHook, None, None]: + hook = MockedRendezvousHook(memdb) + + yield hook + + hook.shutdown(Network()) + + +@pytest.fixture(name="peer", scope="module") +def fixture_peer() -> Generator[Peer, None, None]: + public_key = default_eccrypto.generate_key("curve25519").pub() + yield Peer(public_key) + + +def test_peer_added(peer: Peer, hook: MockedRendezvousHook, memdb: RendezvousDatabase) -> None: + hook.on_peer_added(peer) + + retrieved = memdb.get(peer) + assert len(retrieved) == 0 + + +def test_peer_removed(peer: Peer, hook: MockedRendezvousHook, memdb: RendezvousDatabase) -> None: + hook.on_peer_added(peer) + + hook.mocked_time = peer.creation_time + 1.0 + hook.on_peer_removed(peer) + + retrieved = memdb.get(peer) + assert len(retrieved) == 1 + assert retrieved[0].start, retrieved[0].stop == (peer.creation_time, hook.mocked_time) + + +def test_peer_store_on_shutdown(peer: Peer, hook: MockedRendezvousHook, memdb: RendezvousDatabase) -> None: + network = Network() + network.add_verified_peer(peer) + hook.on_peer_added(peer) + hook.mocked_time = peer.creation_time + 1.0 + + hook.shutdown(network) + + retrieved = memdb.get(peer) + assert len(retrieved) == 1 + assert retrieved[0].start, retrieved[0].stop == (peer.creation_time, hook.mocked_time) + + +def test_peer_ignore_future(peer: Peer, hook: MockedRendezvousHook, memdb: RendezvousDatabase) -> None: + hook.on_peer_added(peer) + + hook.mocked_time = peer.creation_time - 1.0 + hook.on_peer_removed(peer) + + retrieved = memdb.get(peer) + assert len(retrieved) == 0 diff --git a/src/tribler/core/components/ipv8/settings.py b/src/tribler/core/components/ipv8/settings.py index 69055d82a91..7f7aeb770f2 100644 --- a/src/tribler/core/components/ipv8/settings.py +++ b/src/tribler/core/components/ipv8/settings.py @@ -21,6 +21,7 @@ class Ipv8Settings(TriblerConfigSection): address: str = '0.0.0.0' bootstrap_override: Optional[str] = None statistics: bool = False + rendezvous_stats: bool = False walk_interval: float = 0.5 walk_scaling_enabled: bool = True walk_scaling_upper_limit: float = 3.0 diff --git a/src/tribler/core/components/ipv8/tests/test_ipv8_component.py b/src/tribler/core/components/ipv8/tests/test_ipv8_component.py index 9d34ad47286..3f176de2cef 100644 --- a/src/tribler/core/components/ipv8/tests/test_ipv8_component.py +++ b/src/tribler/core/components/ipv8/tests/test_ipv8_component.py @@ -40,3 +40,12 @@ async def test_ipv8_component_statistics_enabled(tribler_config): async with Session(tribler_config, [KeyComponent(), Ipv8Component()]) as session: comp = session.get_instance(Ipv8Component) assert comp.dht_discovery_community.get_prefix() in comp.ipv8.endpoint.statistics + + +async def test_ipv8_rendezvous_enabled(tribler_config): + tribler_config.ipv8.rendezvous_stats = True + async with Session(tribler_config, [KeyComponent(), Ipv8Component()]) as session: + comp = session.get_instance(Ipv8Component) + assert comp.rendezvous_db is not None + assert comp.rendevous_hook is not None + diff --git a/src/tribler/core/components/popularity/community/popularity_community.py b/src/tribler/core/components/popularity/community/popularity_community.py index bf31d9c1b67..4e7e31c90c4 100644 --- a/src/tribler/core/components/popularity/community/popularity_community.py +++ b/src/tribler/core/components/popularity/community/popularity_community.py @@ -5,18 +5,11 @@ from typing import List, TYPE_CHECKING from ipv8.lazy_community import lazy_wrapper -from ipv8.messaging.interfaces.udp.endpoint import UDPv4Address, UDPv4LANAddress -from ipv8.messaging.serialization import PackError from pony.orm import db_session from tribler.core.components.metadata_store.remote_query_community.remote_query_community import RemoteQueryCommunity from tribler.core.components.popularity.community.payload import PopularTorrentsRequest, TorrentsHealthPayload from tribler.core.components.popularity.community.version_community_mixin import VersionCommunityMixin -from tribler.core.components.popularity.rendezvous.db.database import RendezvousDatabase -from tribler.core.components.popularity.rendezvous.rendezvous import RendezvousRequestPayload, \ - RendezvousResponsePayload, RawRendezvousResponsePayload, \ - RendezvousChallenge, RendezvousSignature -from tribler.core.components.popularity.rendezvous.rendezvous_cache import RendezvousCache, EMPTY_PEER_CHALLENGE from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HealthInfo from tribler.core.utilities.pony_utils import run_threaded from tribler.core.utilities.unicode import hexlify @@ -42,97 +35,22 @@ class PopularityCommunity(RemoteQueryCommunity, VersionCommunityMixin): GOSSIP_POPULAR_TORRENT_COUNT = 10 GOSSIP_RANDOM_TORRENT_COUNT = 10 - PING_INTERVAL_RENDEZVOUS = 60 # seconds - community_id = unhexlify('9aca62f878969c437da9844cba29a134917e1648') - def __init__(self, *args, torrent_checker=None, rendezvous_db=None, **kwargs): + def __init__(self, *args, torrent_checker=None, **kwargs): # Creating a separate instance of Network for this community to find more peers super().__init__(*args, **kwargs) - - self.rdb: RendezvousDatabase = rendezvous_db self.torrent_checker: TorrentChecker = torrent_checker self.add_message_handler(TorrentsHealthPayload, self.on_torrents_health) self.add_message_handler(PopularTorrentsRequest, self.on_popular_torrents_request) - self.add_message_handler(RendezvousRequestPayload, self.on_rendezvous_request) - self.add_message_handler(RendezvousResponsePayload, self.on_rendezvous_response) - self.logger.info('Popularity Community initialized (peer mid %s)', hexlify(self.my_peer.mid)) self.register_task("gossip_random_torrents", self.gossip_random_torrents_health, - interval=self.GOSSIP_INTERVAL_FOR_RANDOM_TORRENTS) - self.register_task("ping_rendezvous", self.ping_rendezvous, - interval=self.PING_INTERVAL_RENDEZVOUS) + interval=PopularityCommunity.GOSSIP_INTERVAL_FOR_RANDOM_TORRENTS) # Init version community message handlers self.init_version_community() - self.rendezvous_cache = RendezvousCache() - - def send_introduction_request(self, peer): - rendezvous_request = self._create_rendezvous_request() - extra_payload = self.serializer.pack_serializable(rendezvous_request) - self.logger.debug("Piggy-backing Rendezvous to %s:%d", peer.address[0], peer.address[1]) - packet = self.create_introduction_request(peer.address, extra_bytes=extra_payload, - new_style=peer.new_style_intro) - self.endpoint.send(peer.address, packet) - self.rendezvous_cache.add_peer(peer, rendezvous_request.challenge.nonce) - - # We override this method to add the rendezvous certificate to the introduction request - def on_introduction_request(self, peer, dist, payload): - if 0 <= self.max_peers < len(self.get_peers()): - self.logger.debug("Dropping introduction request from (%s, %d): too many peers!", - peer.address[0], peer.address[1]) - return - - extra_payload = b'' - if payload.extra_bytes: - self.logger.debug("Received introduction request with extra bytes") - try: - rendezvous_request, _ = self.serializer.unpack_serializable(RendezvousRequestPayload, - payload.extra_bytes) - rendezvous_response = self._create_rendezvous_response(rendezvous_request.challenge) - # As we are sending the rendezvous response, we know this peer is interested in rendezvous. - self.rendezvous_cache.add_peer(peer) - extra_payload = self.serializer.pack_serializable(rendezvous_response) - except PackError as e: - self.logger.warning("Failed to unpack RendezvousRequestPayload: %s", e) - - if isinstance(payload.source_lan_address, UDPv4Address): - peer.address = UDPv4LANAddress(*payload.source_lan_address) - self.network.add_verified_peer(peer) - self.network.discover_services(peer, [self.community_id, ]) - - packet = self.create_introduction_response(payload.destination_address, peer.address, payload.identifier, - extra_bytes=extra_payload, new_style=peer.new_style_intro) - - self.endpoint.send(peer.address, packet) - self.introduction_request_callback(peer, dist, payload) - - @lazy_wrapper(RendezvousRequestPayload) - def on_rendezvous_request(self, peer, payload: RendezvousRequestPayload): - self.logger.debug("Received rendezvous request from %s:%d", peer.address[0], peer.address[1]) - # As we are sending the rendezvous response, we know this peer is interested in rendezvous. - self.rendezvous_cache.add_peer(peer) - rendezvous_response = self._create_rendezvous_response(payload.challenge) - self.ez_send(peer, rendezvous_response) - - @lazy_wrapper(RawRendezvousResponsePayload) - def on_rendezvous_response(self, peer, payload: RawRendezvousResponsePayload): - self.logger.debug("Received rendezvous response from %s:%d", peer.address[0], peer.address[1]) - self._handle_rendezvous_response(peer, payload) - - def introduction_response_callback(self, peer, dist, payload): - super().introduction_response_callback(peer, dist, payload) - if payload.extra_bytes: - self.logger.debug("Received introduction response with extra bytes") - try: - raw_rendezvous_response, _ = self.serializer.unpack_serializable(RawRendezvousResponsePayload, - payload.extra_bytes) - self._handle_rendezvous_response(peer, raw_rendezvous_response) - - except PackError as e: - self.logger.warning("Failed to unpack RendezvousResponsePayload: %s", e) def introduction_request_callback(self, peer, dist, payload): super().introduction_request_callback(peer, dist, payload) @@ -159,15 +77,6 @@ def gossip_random_torrents_health(self): self.ez_send(random_peer, TorrentsHealthPayload.create(random_torrents, {})) - def ping_rendezvous(self): - # Remove peers that haven't replied in a while. - self.rendezvous_cache.clear_inactive_peers() - - for peer in self.rendezvous_cache.get_rendezvous_peers(): - payload = self._create_rendezvous_request() - self.rendezvous_cache.set_rendezvous_challenge(peer, payload.challenge.nonce) - self.ez_send(peer, payload) - @lazy_wrapper(TorrentsHealthPayload) async def on_torrents_health(self, peer, payload): self.logger.debug(f"Received torrent health information for " @@ -232,36 +141,3 @@ def get_random_torrents(self) -> List[HealthInfo]: random_torrents = random.sample(checked_and_alive, num_torrents_to_send) return random_torrents - - def _create_rendezvous_request(self) -> RendezvousRequestPayload: - challenge = RendezvousChallenge.create() - payload = RendezvousRequestPayload(challenge) - return payload - - def _create_rendezvous_response(self, challenge: RendezvousChallenge) -> RendezvousResponsePayload: - signature = challenge.sign(self.my_peer.key) - payload = RendezvousResponsePayload(challenge, RendezvousSignature(signature)) - return payload - - def _handle_rendezvous_response(self, peer, raw_payload: RawRendezvousResponsePayload): - signature, _ = self.serializer.unpack_serializable(RendezvousSignature, raw_payload.signature) - challenge, _ = self.serializer.unpack_serializable(RendezvousChallenge, raw_payload.challenge) - - expected_nonce = self.rendezvous_cache.get_rendezvous_challenge(peer) or EMPTY_PEER_CHALLENGE - if expected_nonce == EMPTY_PEER_CHALLENGE or expected_nonce != challenge.nonce: - self.logger.warning(f"Received invalid rendezvous response from {peer.mid}") - return - - if not self.crypto.is_valid_signature(peer.key, raw_payload.challenge, signature.signature): - self.logger.warning(f"Received invalid signature from {peer.mid}") - return - - # This nonce has been burned. - self.rendezvous_cache.clear_peer_challenge(peer) - - self.logger.debug(f"Received valid rendezvous response from {peer.mid}") - with db_session(immediate=True): - certificate = self.rdb.Certificate.get(public_key=peer.mid) - if not certificate: - certificate = self.rdb.Certificate(public_key=peer.mid, counter=0) - certificate.counter += 1 diff --git a/src/tribler/core/components/popularity/community/tests/test_popularity_community.py b/src/tribler/core/components/popularity/community/tests/test_popularity_community.py index a71fcf2936f..78415f61ed8 100644 --- a/src/tribler/core/components/popularity/community/tests/test_popularity_community.py +++ b/src/tribler/core/components/popularity/community/tests/test_popularity_community.py @@ -11,8 +11,6 @@ from tribler.core.components.metadata_store.db.store import MetadataStore from tribler.core.components.metadata_store.remote_query_community.settings import RemoteQueryCommunitySettings from tribler.core.components.popularity.community.popularity_community import PopularityCommunity -from tribler.core.components.popularity.rendezvous.db.database import RendezvousDatabase -from tribler.core.components.popularity.rendezvous.rendezvous import RendezvousChallenge from tribler.core.components.torrent_checker.torrent_checker.torrentchecker_session import HealthInfo from tribler.core.tests.tools.base_test import MockObject from tribler.core.utilities.path_util import Path @@ -212,102 +210,3 @@ async def test_skip_torrent_query_back_for_known_torrent(self): await self.init_first_node_and_gossip( HealthInfo(infohash, seeders=200, leechers=0)) self.nodes[1].overlay.send_remote_select.assert_not_called() - - -class TestRendezvousLogic(TestBase): - NUM_NODES = 3 - - def setUp(self): - super().setUp() - self.count = 0 - self.initialize(PopularityCommunity, self.NUM_NODES) - - def create_node(self, *args, **kwargs): - rdb = RendezvousDatabase(Path(self.temporary_directory()) / f"{self.count + 1}") - torrent_checker = MockObject() - torrent_checker.torrents_checked = {} - mds = MetadataStore(Path(self.temporary_directory()) / f"{self.count}", - Path(self.temporary_directory()), - default_eccrypto.generate_key("curve25519")) - rqc_settings = RemoteQueryCommunitySettings() - - self.count += 1 - return MockIPv8("curve25519", PopularityCommunity, metadata_store=mds, - rendezvous_db=rdb, - torrent_checker=torrent_checker, - rqc_settings=rqc_settings - ) - - async def test_introduction_rendezvous_payload(self): - await self.introduce_nodes() - await self.deliver_messages() - - self.nodes[0].overlay.send_introduction_request(self.nodes[1].my_peer) - await self.deliver_messages() - - self.nodes[0].overlay.send_introduction_request(self.nodes[2].my_peer) - await self.deliver_messages() - - with db_session: - assert self.nodes[0].overlay.rdb.Certificate.get(public_key=self.nodes[1].my_peer.mid).counter == 1 - assert self.nodes[0].overlay.rdb.Certificate.get(public_key=self.nodes[2].my_peer.mid).counter == 1 - - # Check if the rendezvous cache is updated - rendezvous_peers = list(self.nodes[1].overlay.rendezvous_cache.get_rendezvous_peers()) - assert rendezvous_peers[0] == self.nodes[0].my_peer - rendezvous_peers = list(self.nodes[2].overlay.rendezvous_cache.get_rendezvous_peers()) - assert rendezvous_peers[0] == self.nodes[0].my_peer - - async def test_rendezvous_payloads(self): - await self.introduce_nodes() - await self.deliver_messages() - - self.nodes[0].overlay.send_introduction_request(self.nodes[1].my_peer) - self.nodes[0].overlay.send_introduction_request(self.nodes[2].my_peer) - await self.deliver_messages() - - number_of_rendezvous = 4 - for _ in range(number_of_rendezvous): - for j in range(self.count): - self.nodes[j].overlay.ping_rendezvous() - await self.deliver_messages() - - with db_session: - # Peer 0 should have a counter of 1 more - assert self.nodes[0].overlay.rdb.Certificate.get( - public_key=self.nodes[1].my_peer.mid).counter == number_of_rendezvous + 1 - assert self.nodes[1].overlay.rdb.Certificate.get( - public_key=self.nodes[0].my_peer.mid).counter == number_of_rendezvous - assert self.nodes[2].overlay.rdb.Certificate.get( - public_key=self.nodes[0].my_peer.mid).counter == number_of_rendezvous - - async def test_invalid_nonce(self): - await self.introduce_nodes() - await self.deliver_messages() - - self.nodes[0].overlay.rendezvous_cache.add_peer(self.nodes[1].my_peer, b'1' * 16) - - payload = self.nodes[1].overlay._create_rendezvous_response(RendezvousChallenge(b'2' * 16)) - self.nodes[1].overlay.ez_send(self.nodes[0].my_peer, payload) - await self.deliver_messages() - - with db_session: - assert self.nodes[0].overlay.rdb.Certificate.get(public_key=self.nodes[1].my_peer.mid) is None - - async def test_invalid_signature(self): - await self.introduce_nodes() - await self.deliver_messages() - - challenge_1 = RendezvousChallenge(b'1' * 16) - challenge_2 = RendezvousChallenge(b'2' * 16) - - self.nodes[0].overlay.rendezvous_cache.add_peer(self.nodes[1].my_peer, challenge_1.nonce) - - payload = self.nodes[1].overlay._create_rendezvous_response(challenge_2) - payload.challenge = challenge_1 - - self.nodes[1].overlay.ez_send(self.nodes[0].my_peer, payload) - await self.deliver_messages() - - with db_session: - assert self.nodes[0].overlay.rdb.Certificate.get(public_key=self.nodes[1].my_peer.mid) is None diff --git a/src/tribler/core/components/popularity/popularity_component.py b/src/tribler/core/components/popularity/popularity_component.py index 1e1d86e95aa..bbc54a5ac86 100644 --- a/src/tribler/core/components/popularity/popularity_component.py +++ b/src/tribler/core/components/popularity/popularity_component.py @@ -5,17 +5,13 @@ from tribler.core.components.ipv8.ipv8_component import INFINITE, Ipv8Component from tribler.core.components.metadata_store.metadata_store_component import MetadataStoreComponent from tribler.core.components.popularity.community.popularity_community import PopularityCommunity -from tribler.core.components.popularity.rendezvous.db.database import RendezvousDatabase from tribler.core.components.reporter.reporter_component import ReporterComponent from tribler.core.components.torrent_checker.torrent_checker_component import TorrentCheckerComponent -from tribler.core.utilities.simpledefs import STATEDIR_DB_DIR class PopularityComponent(Component): - RENDEZVOUS_DB_NAME = 'rendezvous.db' - community: PopularityCommunity = None - rendezvous_db: RendezvousDatabase = None + _ipv8_component: Ipv8Component = None async def run(self): @@ -26,9 +22,6 @@ async def run(self): metadata_store_component = await self.require_component(MetadataStoreComponent) torrent_checker_component = await self.require_component(TorrentCheckerComponent) - self.rendezvous_db = RendezvousDatabase( - db_path=self.session.config.state_dir / STATEDIR_DB_DIR / self.RENDEZVOUS_DB_NAME) - config = self.session.config community = PopularityCommunity(self._ipv8_component.peer, self._ipv8_component.ipv8.endpoint, @@ -36,7 +29,6 @@ async def run(self): settings=config.popularity_community, rqc_settings=config.remote_query_community, metadata_store=metadata_store_component.mds, - rendezvous_db=self.rendezvous_db, torrent_checker=torrent_checker_component.torrent_checker) self.community = community @@ -47,5 +39,3 @@ async def shutdown(self): await super().shutdown() if self._ipv8_component and self.community: await self._ipv8_component.unload_community(self.community) - if self.rendezvous_db: - self.rendezvous_db.shutdown() diff --git a/src/tribler/core/components/popularity/rendezvous/db/database.py b/src/tribler/core/components/popularity/rendezvous/db/database.py deleted file mode 100644 index db3a7003bc1..00000000000 --- a/src/tribler/core/components/popularity/rendezvous/db/database.py +++ /dev/null @@ -1,36 +0,0 @@ -from pathlib import Path -from typing import Union - -from pony.orm import Database, db_session - -from tribler.core.components.metadata_store.db.orm_bindings import misc -from tribler.core.components.popularity.rendezvous.db.orm_bindings import certificate -from tribler.core.utilities.utilities import MEMORY_DB - - -class RendezvousDatabase: - DB_VERSION = 0 - - def __init__(self, db_path: Union[Path, type(MEMORY_DB)]): - - self.database = Database() - - self.MiscData = misc.define_binding(self.database) - self.Certificate = certificate.define_binding(self.database) - - if db_path is MEMORY_DB: - create_db = True - db_path_string = ":memory:" - else: - create_db = not db_path.is_file() - db_path_string = str(db_path) - - self.database.bind(provider='sqlite', filename=db_path_string, create_db=create_db, timeout=120.0) - self.database.generate_mapping(create_tables=create_db) - - if create_db: - with db_session: - self.MiscData(name="db_version", value=str(self.DB_VERSION)) - - def shutdown(self) -> None: - self.database.disconnect() diff --git a/src/tribler/core/components/popularity/rendezvous/db/orm_bindings/certificate.py b/src/tribler/core/components/popularity/rendezvous/db/orm_bindings/certificate.py deleted file mode 100644 index 4f6821c5868..00000000000 --- a/src/tribler/core/components/popularity/rendezvous/db/orm_bindings/certificate.py +++ /dev/null @@ -1,9 +0,0 @@ -from pony.orm import Required, db_session - - -def define_binding(db): - class RendezvousCertificate(db.Entity): - public_key = Required(bytes, index=True) - counter = Required(int) - - return RendezvousCertificate diff --git a/src/tribler/core/components/popularity/rendezvous/db/orm_bindings/misc.py b/src/tribler/core/components/popularity/rendezvous/db/orm_bindings/misc.py deleted file mode 100644 index e5434d09d1b..00000000000 --- a/src/tribler/core/components/popularity/rendezvous/db/orm_bindings/misc.py +++ /dev/null @@ -1,9 +0,0 @@ -from pony.orm import Optional, PrimaryKey - - -def define_binding(db): - class MiscData(db.Entity): - name = PrimaryKey(str) - value = Optional(str) - - return MiscData diff --git a/src/tribler/core/components/popularity/rendezvous/rendezvous.py b/src/tribler/core/components/popularity/rendezvous/rendezvous.py deleted file mode 100644 index 497d8899f3d..00000000000 --- a/src/tribler/core/components/popularity/rendezvous/rendezvous.py +++ /dev/null @@ -1,61 +0,0 @@ -import dataclasses -import secrets -from dataclasses import dataclass, fields - -from ipv8.keyvault.crypto import default_eccrypto -from ipv8.messaging.payload_dataclass import overwrite_dataclass, type_from_format -from ipv8.messaging.serialization import default_serializer - -dataclass = overwrite_dataclass(dataclass) - - -@dataclass -class RendezvousChallenge: - nonce: bytes - - def __str__(self): - return f"RendezvousRequest(public_key_b={self.nonce})" - - def sign(self, sk, crypto=default_eccrypto) -> bytes: - serialized = default_serializer.pack_serializable(self) - return crypto.create_signature(sk, serialized) - - @staticmethod - def create(): - return RendezvousChallenge(secrets.token_bytes(32)) - - -@dataclass -class RendezvousSignature: - signature: type_from_format('64s') - - def __str__(self): - return f"RendezvousSignature(signature={self.signature})" - - -@dataclass(msg_id=3) -class RendezvousRequestPayload: - challenge: RendezvousChallenge - - def __str__(self): - return f"RendezvousCertificateRequestPayload(certificate={self.challenge})" - - -@dataclass(msg_id=4) -class RawRendezvousResponsePayload: - challenge: type_from_format('varlenH') - signature: type_from_format('varlenH') - - def __str__(self): - return f"RendezvousCertificatePayload(rendezvous_certificate={self.challenge}, " \ - f"signature={self.signature})" - - -@dataclass(msg_id=4) -class RendezvousResponsePayload: - challenge: RendezvousChallenge - signature: RendezvousSignature - - def __str__(self): - return f"RendezvousCertificatePayload(rendezvous_certificate={self.challenge}, " \ - f"signature={self.signature})" diff --git a/src/tribler/core/components/popularity/rendezvous/rendezvous_cache.py b/src/tribler/core/components/popularity/rendezvous/rendezvous_cache.py deleted file mode 100644 index ee30c0f1e1e..00000000000 --- a/src/tribler/core/components/popularity/rendezvous/rendezvous_cache.py +++ /dev/null @@ -1,40 +0,0 @@ -import threading -from collections import defaultdict -from time import time - -from ipv8.peer import Peer - -EMPTY_PEER_CHALLENGE = b'0' * 16 -RENDEZVOUS_TIMEOUT = 60 - - -class RendezvousCache: - - def __init__(self): - self._cache = {} - self._rendezvous_lock = threading.Lock() - - def add_peer(self, peer, peer_challenge=EMPTY_PEER_CHALLENGE): - with self._rendezvous_lock: - self._cache[peer] = (peer_challenge, time()) - - def get_rendezvous_peers(self): - return self._cache.keys() - - def get_rendezvous_challenge(self, peer): - return self._cache[peer][0] - - def set_rendezvous_challenge(self, peer, challenge): - return self.add_peer(peer, challenge) - - def clear_inactive_peers(self, timeout=RENDEZVOUS_TIMEOUT): - with self._rendezvous_lock: - to_remove = [] - for peer, (peer_challenge, timestamp) in self._cache.items(): - if time() - timestamp > timeout: - to_remove.append(peer) - [self._cache.pop(peer) for peer in to_remove] - - def clear_peer_challenge(self, peer): - with self._rendezvous_lock: - self._cache[peer] = (EMPTY_PEER_CHALLENGE, time()) diff --git a/src/tribler/core/components/popularity/rendezvous/tests/test_rendezvous_cache.py b/src/tribler/core/components/popularity/rendezvous/tests/test_rendezvous_cache.py deleted file mode 100644 index b23162e19f0..00000000000 --- a/src/tribler/core/components/popularity/rendezvous/tests/test_rendezvous_cache.py +++ /dev/null @@ -1,42 +0,0 @@ -import time - -from ipv8.keyvault.crypto import default_eccrypto -from ipv8.peer import Peer -from ipv8.test.base import TestBase - -from tribler.core.components.popularity.rendezvous.rendezvous_cache import RendezvousCache, EMPTY_PEER_CHALLENGE - - -class TestRendezvousCache(TestBase): - NUM_NODES = 3 - - def setUp(self): - super().setUp() - self.peers = [Peer(default_eccrypto.generate_key(u"low")) for _ in range(self.NUM_NODES)] - self._cache = RendezvousCache() - - def test_add_peer(self): - self._cache.add_peer(self.peers[0]) - self._cache.add_peer(self.peers[1]) - self.assertEqual(len(self._cache.get_rendezvous_peers()), 2) - - def test_set_rendezvous_challenge(self): - self._cache.add_peer(self.peers[0]) - self._cache.set_rendezvous_challenge(self.peers[0], b'1234') - self.assertEqual(self._cache.get_rendezvous_challenge(self.peers[0]), b'1234') - - def test_clear_inactive_peers(self): - self._cache.add_peer(self.peers[0]) - self._cache.add_peer(self.peers[1]) - self._cache.add_peer(self.peers[2]) - time.sleep(1) - - self._cache.set_rendezvous_challenge(self.peers[0], b'1234') - self._cache.clear_inactive_peers(1) - - self.assertEqual(len(self._cache.get_rendezvous_peers()), 1) - - def test_clear_peer_challenge(self): - self._cache.add_peer(self.peers[0], b'1234') - self._cache.clear_peer_challenge(self.peers[0]) - assert self._cache.get_rendezvous_challenge(self.peers[0]) == EMPTY_PEER_CHALLENGE