Skip to content

Commit

Permalink
Added Rendezvous Certificates
Browse files Browse the repository at this point in the history
Add initial rendezvous design

Finalize rendezvous design

Add rendezvous tests

Move tests to seperate class

Use db name const + move tests

Revert "Upgrade PyQt, Yarl, and LibTorrent dependencies"

This reverts commit 91d360a.

Remove unnecessary override

Address requested changes and comments
  • Loading branch information
InvictusRMC authored and qstokkink committed Oct 13, 2023
1 parent 46085b6 commit e99dba8
Show file tree
Hide file tree
Showing 13 changed files with 435 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,18 @@
from typing import List, TYPE_CHECKING

from ipv8.lazy_community import lazy_wrapper
from ipv8.messaging.interfaces.udp.endpoint import UDPv4Address, UDPv4LANAddress
from ipv8.messaging.serialization import PackError
from pony.orm import db_session

from tribler.core.components.metadata_store.remote_query_community.remote_query_community import RemoteQueryCommunity
from tribler.core.components.popularity.community.payload import PopularTorrentsRequest, TorrentsHealthPayload
from tribler.core.components.popularity.community.version_community_mixin import VersionCommunityMixin
from tribler.core.components.popularity.rendezvous.db.database import RendezvousDatabase
from tribler.core.components.popularity.rendezvous.rendezvous import RendezvousRequestPayload, \
RendezvousResponsePayload, RawRendezvousResponsePayload, \
RendezvousChallenge, RendezvousSignature
from tribler.core.components.popularity.rendezvous.rendezvous_cache import RendezvousCache, EMPTY_PEER_CHALLENGE
from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HealthInfo
from tribler.core.utilities.pony_utils import run_threaded
from tribler.core.utilities.unicode import hexlify
Expand All @@ -35,22 +42,97 @@ class PopularityCommunity(RemoteQueryCommunity, VersionCommunityMixin):
GOSSIP_POPULAR_TORRENT_COUNT = 10
GOSSIP_RANDOM_TORRENT_COUNT = 10

PING_INTERVAL_RENDEZVOUS = 60 # seconds

community_id = unhexlify('9aca62f878969c437da9844cba29a134917e1648')

def __init__(self, *args, torrent_checker=None, **kwargs):
def __init__(self, *args, torrent_checker=None, rendezvous_db=None, **kwargs):
# Creating a separate instance of Network for this community to find more peers
super().__init__(*args, **kwargs)

self.rdb: RendezvousDatabase = rendezvous_db
self.torrent_checker: TorrentChecker = torrent_checker

self.add_message_handler(TorrentsHealthPayload, self.on_torrents_health)
self.add_message_handler(PopularTorrentsRequest, self.on_popular_torrents_request)

self.add_message_handler(RendezvousRequestPayload, self.on_rendezvous_request)
self.add_message_handler(RendezvousResponsePayload, self.on_rendezvous_response)

self.logger.info('Popularity Community initialized (peer mid %s)', hexlify(self.my_peer.mid))
self.register_task("gossip_random_torrents", self.gossip_random_torrents_health,
interval=PopularityCommunity.GOSSIP_INTERVAL_FOR_RANDOM_TORRENTS)
interval=self.GOSSIP_INTERVAL_FOR_RANDOM_TORRENTS)
self.register_task("ping_rendezvous", self.ping_rendezvous,
interval=self.PING_INTERVAL_RENDEZVOUS)

# Init version community message handlers
self.init_version_community()
self.rendezvous_cache = RendezvousCache()

def send_introduction_request(self, peer):
rendezvous_request = self._create_rendezvous_request()
extra_payload = self.serializer.pack_serializable(rendezvous_request)
self.logger.debug("Piggy-backing Rendezvous to %s:%d", peer.address[0], peer.address[1])
packet = self.create_introduction_request(peer.address, extra_bytes=extra_payload,
new_style=peer.new_style_intro)
self.endpoint.send(peer.address, packet)
self.rendezvous_cache.add_peer(peer, rendezvous_request.challenge.nonce)

# We override this method to add the rendezvous certificate to the introduction request
def on_introduction_request(self, peer, dist, payload):
if 0 <= self.max_peers < len(self.get_peers()):
self.logger.debug("Dropping introduction request from (%s, %d): too many peers!",
peer.address[0], peer.address[1])
return

extra_payload = b''
if payload.extra_bytes:
self.logger.debug("Received introduction request with extra bytes")
try:
rendezvous_request, _ = self.serializer.unpack_serializable(RendezvousRequestPayload,
payload.extra_bytes)
rendezvous_response = self._create_rendezvous_response(rendezvous_request.challenge)
# As we are sending the rendezvous response, we know this peer is interested in rendezvous.
self.rendezvous_cache.add_peer(peer)
extra_payload = self.serializer.pack_serializable(rendezvous_response)
except PackError as e:
self.logger.warning("Failed to unpack RendezvousRequestPayload: %s", e)

if isinstance(payload.source_lan_address, UDPv4Address):
peer.address = UDPv4LANAddress(*payload.source_lan_address)
self.network.add_verified_peer(peer)
self.network.discover_services(peer, [self.community_id, ])

packet = self.create_introduction_response(payload.destination_address, peer.address, payload.identifier,
extra_bytes=extra_payload, new_style=peer.new_style_intro)

self.endpoint.send(peer.address, packet)
self.introduction_request_callback(peer, dist, payload)

@lazy_wrapper(RendezvousRequestPayload)
def on_rendezvous_request(self, peer, payload: RendezvousRequestPayload):
self.logger.debug("Received rendezvous request from %s:%d", peer.address[0], peer.address[1])
# As we are sending the rendezvous response, we know this peer is interested in rendezvous.
self.rendezvous_cache.add_peer(peer)
rendezvous_response = self._create_rendezvous_response(payload.challenge)
self.ez_send(peer, rendezvous_response)

@lazy_wrapper(RawRendezvousResponsePayload)
def on_rendezvous_response(self, peer, payload: RawRendezvousResponsePayload):
self.logger.debug("Received rendezvous response from %s:%d", peer.address[0], peer.address[1])
self._handle_rendezvous_response(peer, payload)

def introduction_response_callback(self, peer, dist, payload):
super().introduction_response_callback(peer, dist, payload)
if payload.extra_bytes:
self.logger.debug("Received introduction response with extra bytes")
try:
raw_rendezvous_response, _ = self.serializer.unpack_serializable(RawRendezvousResponsePayload,
payload.extra_bytes)
self._handle_rendezvous_response(peer, raw_rendezvous_response)

except PackError as e:
self.logger.warning("Failed to unpack RendezvousResponsePayload: %s", e)

def introduction_request_callback(self, peer, dist, payload):
super().introduction_request_callback(peer, dist, payload)
Expand All @@ -77,6 +159,15 @@ def gossip_random_torrents_health(self):

self.ez_send(random_peer, TorrentsHealthPayload.create(random_torrents, {}))

def ping_rendezvous(self):
# Remove peers that haven't replied in a while.
self.rendezvous_cache.clear_inactive_peers()

for peer in self.rendezvous_cache.get_rendezvous_peers():
payload = self._create_rendezvous_request()
self.rendezvous_cache.set_rendezvous_challenge(peer, payload.challenge.nonce)
self.ez_send(peer, payload)

@lazy_wrapper(TorrentsHealthPayload)
async def on_torrents_health(self, peer, payload):
self.logger.debug(f"Received torrent health information for "
Expand Down Expand Up @@ -141,3 +232,36 @@ def get_random_torrents(self) -> List[HealthInfo]:

random_torrents = random.sample(checked_and_alive, num_torrents_to_send)
return random_torrents

def _create_rendezvous_request(self) -> RendezvousRequestPayload:
challenge = RendezvousChallenge.create()
payload = RendezvousRequestPayload(challenge)
return payload

def _create_rendezvous_response(self, challenge: RendezvousChallenge) -> RendezvousResponsePayload:
signature = challenge.sign(self.my_peer.key)
payload = RendezvousResponsePayload(challenge, RendezvousSignature(signature))
return payload

def _handle_rendezvous_response(self, peer, raw_payload: RawRendezvousResponsePayload):
signature, _ = self.serializer.unpack_serializable(RendezvousSignature, raw_payload.signature)
challenge, _ = self.serializer.unpack_serializable(RendezvousChallenge, raw_payload.challenge)

expected_nonce = self.rendezvous_cache.get_rendezvous_challenge(peer) or EMPTY_PEER_CHALLENGE
if expected_nonce == EMPTY_PEER_CHALLENGE or expected_nonce != challenge.nonce:
self.logger.warning(f"Received invalid rendezvous response from {peer.mid}")
return

if not self.crypto.is_valid_signature(peer.key, raw_payload.challenge, signature.signature):
self.logger.warning(f"Received invalid signature from {peer.mid}")
return

# This nonce has been burned.
self.rendezvous_cache.clear_peer_challenge(peer)

self.logger.debug(f"Received valid rendezvous response from {peer.mid}")
with db_session(immediate=True):
certificate = self.rdb.Certificate.get(public_key=peer.mid)
if not certificate:
certificate = self.rdb.Certificate(public_key=peer.mid, counter=0)
certificate.counter += 1
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from tribler.core.components.metadata_store.db.store import MetadataStore
from tribler.core.components.metadata_store.remote_query_community.settings import RemoteQueryCommunitySettings
from tribler.core.components.popularity.community.popularity_community import PopularityCommunity
from tribler.core.components.popularity.rendezvous.db.database import RendezvousDatabase
from tribler.core.components.popularity.rendezvous.rendezvous import RendezvousChallenge
from tribler.core.components.torrent_checker.torrent_checker.torrentchecker_session import HealthInfo
from tribler.core.tests.tools.base_test import MockObject
from tribler.core.utilities.path_util import Path
Expand Down Expand Up @@ -210,3 +212,102 @@ async def test_skip_torrent_query_back_for_known_torrent(self):
await self.init_first_node_and_gossip(
HealthInfo(infohash, seeders=200, leechers=0))
self.nodes[1].overlay.send_remote_select.assert_not_called()


class TestRendezvousLogic(TestBase):
NUM_NODES = 3

def setUp(self):
super().setUp()
self.count = 0
self.initialize(PopularityCommunity, self.NUM_NODES)

def create_node(self, *args, **kwargs):
rdb = RendezvousDatabase(Path(self.temporary_directory()) / f"{self.count + 1}")
torrent_checker = MockObject()
torrent_checker.torrents_checked = {}
mds = MetadataStore(Path(self.temporary_directory()) / f"{self.count}",
Path(self.temporary_directory()),
default_eccrypto.generate_key("curve25519"))
rqc_settings = RemoteQueryCommunitySettings()

self.count += 1
return MockIPv8("curve25519", PopularityCommunity, metadata_store=mds,
rendezvous_db=rdb,
torrent_checker=torrent_checker,
rqc_settings=rqc_settings
)

async def test_introduction_rendezvous_payload(self):
await self.introduce_nodes()
await self.deliver_messages()

self.nodes[0].overlay.send_introduction_request(self.nodes[1].my_peer)
await self.deliver_messages()

self.nodes[0].overlay.send_introduction_request(self.nodes[2].my_peer)
await self.deliver_messages()

with db_session:
assert self.nodes[0].overlay.rdb.Certificate.get(public_key=self.nodes[1].my_peer.mid).counter == 1
assert self.nodes[0].overlay.rdb.Certificate.get(public_key=self.nodes[2].my_peer.mid).counter == 1

# Check if the rendezvous cache is updated
rendezvous_peers = list(self.nodes[1].overlay.rendezvous_cache.get_rendezvous_peers())
assert rendezvous_peers[0] == self.nodes[0].my_peer
rendezvous_peers = list(self.nodes[2].overlay.rendezvous_cache.get_rendezvous_peers())
assert rendezvous_peers[0] == self.nodes[0].my_peer

async def test_rendezvous_payloads(self):
await self.introduce_nodes()
await self.deliver_messages()

self.nodes[0].overlay.send_introduction_request(self.nodes[1].my_peer)
self.nodes[0].overlay.send_introduction_request(self.nodes[2].my_peer)
await self.deliver_messages()

number_of_rendezvous = 4
for _ in range(number_of_rendezvous):
for j in range(self.count):
self.nodes[j].overlay.ping_rendezvous()
await self.deliver_messages()

with db_session:
# Peer 0 should have a counter of 1 more
assert self.nodes[0].overlay.rdb.Certificate.get(
public_key=self.nodes[1].my_peer.mid).counter == number_of_rendezvous + 1
assert self.nodes[1].overlay.rdb.Certificate.get(
public_key=self.nodes[0].my_peer.mid).counter == number_of_rendezvous
assert self.nodes[2].overlay.rdb.Certificate.get(
public_key=self.nodes[0].my_peer.mid).counter == number_of_rendezvous

async def test_invalid_nonce(self):
await self.introduce_nodes()
await self.deliver_messages()

self.nodes[0].overlay.rendezvous_cache.add_peer(self.nodes[1].my_peer, b'1' * 16)

payload = self.nodes[1].overlay._create_rendezvous_response(RendezvousChallenge(b'2' * 16))
self.nodes[1].overlay.ez_send(self.nodes[0].my_peer, payload)
await self.deliver_messages()

with db_session:
assert self.nodes[0].overlay.rdb.Certificate.get(public_key=self.nodes[1].my_peer.mid) is None

async def test_invalid_signature(self):
await self.introduce_nodes()
await self.deliver_messages()

challenge_1 = RendezvousChallenge(b'1' * 16)
challenge_2 = RendezvousChallenge(b'2' * 16)

self.nodes[0].overlay.rendezvous_cache.add_peer(self.nodes[1].my_peer, challenge_1.nonce)

payload = self.nodes[1].overlay._create_rendezvous_response(challenge_2)
payload.challenge = challenge_1

self.nodes[1].overlay.ez_send(self.nodes[0].my_peer, payload)
await self.deliver_messages()

with db_session:
assert self.nodes[0].overlay.rdb.Certificate.get(public_key=self.nodes[1].my_peer.mid) is None
12 changes: 11 additions & 1 deletion src/tribler/core/components/popularity/popularity_component.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@
from tribler.core.components.ipv8.ipv8_component import INFINITE, Ipv8Component
from tribler.core.components.metadata_store.metadata_store_component import MetadataStoreComponent
from tribler.core.components.popularity.community.popularity_community import PopularityCommunity
from tribler.core.components.popularity.rendezvous.db.database import RendezvousDatabase
from tribler.core.components.reporter.reporter_component import ReporterComponent
from tribler.core.components.torrent_checker.torrent_checker_component import TorrentCheckerComponent
from tribler.core.utilities.simpledefs import STATEDIR_DB_DIR


class PopularityComponent(Component):
community: PopularityCommunity = None
RENDEZVOUS_DB_NAME = 'rendezvous.db'

community: PopularityCommunity = None
rendezvous_db: RendezvousDatabase = None
_ipv8_component: Ipv8Component = None

async def run(self):
Expand All @@ -22,13 +26,17 @@ async def run(self):
metadata_store_component = await self.require_component(MetadataStoreComponent)
torrent_checker_component = await self.require_component(TorrentCheckerComponent)

self.rendezvous_db = RendezvousDatabase(
db_path=self.session.config.state_dir / STATEDIR_DB_DIR / self.RENDEZVOUS_DB_NAME)

config = self.session.config
community = PopularityCommunity(self._ipv8_component.peer,
self._ipv8_component.ipv8.endpoint,
Network(),
settings=config.popularity_community,
rqc_settings=config.remote_query_community,
metadata_store=metadata_store_component.mds,
rendezvous_db=self.rendezvous_db,
torrent_checker=torrent_checker_component.torrent_checker)
self.community = community

Expand All @@ -39,3 +47,5 @@ async def shutdown(self):
await super().shutdown()
if self._ipv8_component and self.community:
await self._ipv8_component.unload_community(self.community)
if self.rendezvous_db:
self.rendezvous_db.shutdown()
Empty file.
Empty file.
36 changes: 36 additions & 0 deletions src/tribler/core/components/popularity/rendezvous/db/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from pathlib import Path
from typing import Union

from pony.orm import Database, db_session

from tribler.core.components.metadata_store.db.orm_bindings import misc
from tribler.core.components.popularity.rendezvous.db.orm_bindings import certificate
from tribler.core.utilities.utilities import MEMORY_DB


class RendezvousDatabase:
DB_VERSION = 0

def __init__(self, db_path: Union[Path, type(MEMORY_DB)]):

self.database = Database()

self.MiscData = misc.define_binding(self.database)
self.Certificate = certificate.define_binding(self.database)

if db_path is MEMORY_DB:
create_db = True
db_path_string = ":memory:"
else:
create_db = not db_path.is_file()
db_path_string = str(db_path)

self.database.bind(provider='sqlite', filename=db_path_string, create_db=create_db, timeout=120.0)
self.database.generate_mapping(create_tables=create_db)

if create_db:
with db_session:
self.MiscData(name="db_version", value=str(self.DB_VERSION))

def shutdown(self) -> None:
self.database.disconnect()
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from pony.orm import Required, db_session


def define_binding(db):
class RendezvousCertificate(db.Entity):
public_key = Required(bytes, index=True)
counter = Required(int)

return RendezvousCertificate
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from pony.orm import Optional, PrimaryKey


def define_binding(db):
class MiscData(db.Entity):
name = PrimaryKey(str)
value = Optional(str)

return MiscData
Loading

0 comments on commit e99dba8

Please sign in to comment.