Skip to content

Commit

Permalink
Rein in KnowledgeCommunity peer count
Browse files Browse the repository at this point in the history
  • Loading branch information
qstokkink committed Dec 4, 2024
1 parent 757cc61 commit bd0175b
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 5 deletions.
2 changes: 1 addition & 1 deletion src/tribler/core/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def get_endpoints(self) -> list[RESTEndpoint]:
@precondition('session.config.get("knowledge_community/enabled")')
@overlay("tribler.core.knowledge.community", "KnowledgeCommunity")
@kwargs(db="session.db", key='session.ipv8.keys["secondary"].key')
class KnowledgeComponent(CommunityLauncherWEndpoints):
class KnowledgeComponent(BaseLauncher):
"""
Launch instructions for the knowledge community.
"""
Expand Down
42 changes: 38 additions & 4 deletions src/tribler/core/knowledge/community.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from __future__ import annotations

import random
from binascii import unhexlify
from typing import TYPE_CHECKING

from cryptography.exceptions import InvalidSignature
from ipv8.community import Community, CommunitySettings
from ipv8.keyvault.private.libnaclkey import LibNaCLSK
from ipv8.lazy_community import lazy_wrapper
from ipv8.types import Key, Peer
from pony.orm import db_session

from tribler.core.database.layers.knowledge import Operation, ResourceType
from tribler.core.database.tribler_database import TriblerDatabase
from tribler.core.knowledge.operations_requests import OperationsRequests, PeerValidationError
from tribler.core.knowledge.payload import (
RawStatementOperationMessage,
Expand All @@ -19,6 +19,12 @@
StatementOperationSignature,
)

if TYPE_CHECKING:
from ipv8.keyvault.private.libnaclkey import LibNaCLSK
from ipv8.types import Key, Peer

from tribler.core.database.tribler_database import TriblerDatabase

REQUESTED_OPERATIONS_COUNT = 10
CLEAR_ALL_REQUESTS_INTERVAL = 10 * 60 # 10 minutes

Expand Down Expand Up @@ -50,21 +56,41 @@ def __init__(self, settings: KnowledgeCommunitySettings) -> None:
self.key = settings.key
self.requests = OperationsRequests()

self.cool_peers: list[Peer] | None = None

self.add_message_handler(RawStatementOperationMessage, self.on_message)
self.add_message_handler(RequestStatementOperationMessage, self.on_request)

self.register_task("request_operations", self.request_operations, interval=settings.request_interval)
self.register_task("clear_requests", self.requests.clear_requests, interval=CLEAR_ALL_REQUESTS_INTERVAL)
self.logger.info("Knowledge community initialized")

def get_cool_peers(self) -> list[Peer]:
"""
We may need to freeze the peer list in this community to avoid inflating the peer count.
Peers sampled from the frozen list are "cool" peers.
"""
known_peers = self.get_peers()
if self.max_peers < 0 or len(known_peers) < self.max_peers + 5:
self.cool_peers = None
return known_peers
# We may not be frozen yet and old cool peers may have gone offline.
if self.cool_peers is None or len(self.cool_peers) <= len(known_peers) // 2:
cool_peers = known_peers[:self.max_peers]
else:
cool_peers = self.cool_peers
self.cool_peers = [p for p in cool_peers if p in known_peers]
return self.cool_peers

def request_operations(self) -> None:
"""
Contact peers to request operations.
"""
if not self.get_peers():
return

peer = random.choice(self.get_peers())
peer = random.choice(self.get_cool_peers())
self.requests.register_peer(peer, REQUESTED_OPERATIONS_COUNT)
self.logger.info("-> request %d operations from peer %s", REQUESTED_OPERATIONS_COUNT, peer.mid.hex())
self.ez_send(peer, RequestStatementOperationMessage(count=REQUESTED_OPERATIONS_COUNT))
Expand All @@ -74,6 +100,10 @@ def on_message(self, peer: Peer, raw: RawStatementOperationMessage) -> None:
"""
Callback for when a raw statement operation message is received.
"""
if peer not in self.get_cool_peers():
self.logger.debug("Dropping message from %s: peer is not cool!", str(peer))
return

operation, _ = self.serializer.unpack_serializable(StatementOperation, raw.operation)
signature, _ = self.serializer.unpack_serializable(StatementOperationSignature, raw.signature)
self.logger.debug("<- message received: %s", str(operation))
Expand Down Expand Up @@ -103,6 +133,10 @@ def on_request(self, peer: Peer, operation: RequestStatementOperationMessage) ->
"""
Callback for when statement operations are requested.
"""
if peer not in self.get_cool_peers():
self.logger.debug("Dropping message from %s: peer is not cool!", str(peer))
return

operations_count = min(max(1, operation.count), REQUESTED_OPERATIONS_COUNT)
self.logger.debug("<- peer %s requested %d operations", peer.mid.hex(), operations_count)

Expand Down

0 comments on commit bd0175b

Please sign in to comment.