Skip to content

Commit

Permalink
Merge pull request #7070 from drew2a/feature/add_content_items
Browse files Browse the repository at this point in the history
Upgrade the `TagComponent` to the `KnowledgeComponent`
  • Loading branch information
drew2a authored Oct 19, 2022
2 parents c44dd7c + 901f4ee commit 499d559
Show file tree
Hide file tree
Showing 90 changed files with 2,751 additions and 1,715 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from tribler.core.components.ipv8.ipv8_component import INFINITE, Ipv8Component
from tribler.core.components.metadata_store.metadata_store_component import MetadataStoreComponent
from tribler.core.components.reporter.reporter_component import ReporterComponent
from tribler.core.components.tag.tag_component import TagComponent
from tribler.core.components.knowledge.knowledge_component import KnowledgeComponent


class GigaChannelComponent(Component):
Expand All @@ -25,7 +25,7 @@ async def run(self):

self._ipv8_component = await self.require_component(Ipv8Component)
metadata_store_component = await self.require_component(MetadataStoreComponent)
tag_component = await self.get_component(TagComponent)
knowledge_component = await self.get_component(KnowledgeComponent)

giga_channel_cls = GigaChannelTestnetCommunity if config.general.testnet else GigaChannelCommunity
community = giga_channel_cls(
Expand All @@ -37,7 +37,7 @@ async def run(self):
rqc_settings=config.remote_query_community,
metadata_store=metadata_store_component.mds,
max_peers=50,
tags_db=tag_component.tags_db if tag_component else None
knowledge_db=knowledge_component.knowledge_db if knowledge_component else None
)
self.community = community
self._ipv8_component.initialise_community_by_default(community, default_random_walk_max_peers=30)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import pytest

from tribler.core.components.session import Session
from tribler.core.components.gigachannel.gigachannel_component import GigaChannelComponent
from tribler.core.components.ipv8.ipv8_component import Ipv8Component
from tribler.core.components.key.key_component import KeyComponent
from tribler.core.components.knowledge.knowledge_component import KnowledgeComponent
from tribler.core.components.metadata_store.metadata_store_component import MetadataStoreComponent
from tribler.core.components.tag.tag_component import TagComponent
from tribler.core.components.session import Session


# pylint: disable=protected-access

Expand All @@ -14,7 +13,8 @@ async def test_giga_channel_component(tribler_config):
tribler_config.ipv8.enabled = True
tribler_config.libtorrent.enabled = True
tribler_config.chant.enabled = True
components = [TagComponent(), MetadataStoreComponent(), KeyComponent(), Ipv8Component(), GigaChannelComponent()]
components = [KnowledgeComponent(), MetadataStoreComponent(), KeyComponent(), Ipv8Component(),
GigaChannelComponent()]
async with Session(tribler_config, components) as session:
comp = session.get_instance(GigaChannelComponent)
assert comp.started_event.is_set() and not comp.failed
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import pytest

from tribler.core.components.session import Session
from tribler.core.components.gigachannel_manager.gigachannel_manager_component import GigachannelManagerComponent
from tribler.core.components.ipv8.ipv8_component import Ipv8Component
from tribler.core.components.key.key_component import KeyComponent
from tribler.core.components.knowledge.knowledge_component import KnowledgeComponent
from tribler.core.components.libtorrent.libtorrent_component import LibtorrentComponent
from tribler.core.components.metadata_store.metadata_store_component import MetadataStoreComponent
from tribler.core.components.session import Session
from tribler.core.components.socks_servers.socks_servers_component import SocksServersComponent
from tribler.core.components.tag.tag_component import TagComponent


# pylint: disable=protected-access


async def test_gigachannel_manager_component(tribler_config):
components = [Ipv8Component(), TagComponent(), SocksServersComponent(), KeyComponent(), MetadataStoreComponent(),
components = [Ipv8Component(), KnowledgeComponent(), SocksServersComponent(), KeyComponent(),
MetadataStoreComponent(),
LibtorrentComponent(), GigachannelManagerComponent()]
async with Session(tribler_config, components) as session:
comp = session.get_instance(GigachannelManagerComponent)
Expand Down
File renamed without changes.
136 changes: 136 additions & 0 deletions src/tribler/core/components/knowledge/community/knowledge_community.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import random
from binascii import unhexlify

from cryptography.exceptions import InvalidSignature
from ipv8.keyvault.private.libnaclkey import LibNaCLSK
from ipv8.lazy_community import lazy_wrapper
from ipv8.types import Key
from pony.orm import db_session

from tribler.core.components.ipv8.tribler_community import TriblerCommunity
from tribler.core.components.knowledge.community.knowledge_payload import (
RawStatementOperationMessage,
RequestStatementOperationMessage,
StatementOperation,
StatementOperationMessage,
StatementOperationSignature,
)
from tribler.core.components.knowledge.community.knowledge_validator import validate_operation, validate_resource, \
validate_resource_type
from tribler.core.components.knowledge.community.operations_requests import OperationsRequests, PeerValidationError
from tribler.core.components.knowledge.db.knowledge_db import KnowledgeDatabase

REQUESTED_OPERATIONS_COUNT = 10

REQUEST_INTERVAL = 5 # 5 sec
CLEAR_ALL_REQUESTS_INTERVAL = 10 * 60 # 10 minutes
TIME_DELTA_READY_TO_GOSSIP = {'minutes': 1}


class KnowledgeCommunity(TriblerCommunity):
""" Community for disseminating tags across the network.
Only tags that are older than 1 minute will be gossiped.
"""

community_id = unhexlify('d7f7bdc8bcd3d9ad23f06f25aa8aab6754eb23a0')

def __init__(self, *args, db: KnowledgeDatabase, key: LibNaCLSK, request_interval=REQUEST_INTERVAL,
**kwargs):
super().__init__(*args, **kwargs)
self.db = db
self.key = key
self.requests = OperationsRequests()

self.add_message_handler(RawStatementOperationMessage, self.on_message)
self.add_message_handler(RequestStatementOperationMessage, self.on_request)

self.register_task("request_operations", self.request_operations, interval=request_interval)
self.register_task("clear_requests", self.requests.clear_requests, interval=CLEAR_ALL_REQUESTS_INTERVAL)
self.logger.info('Knowledge community initialized')

def request_operations(self):
if not self.get_peers():
return

peer = random.choice(self.get_peers())
self.requests.register_peer(peer, REQUESTED_OPERATIONS_COUNT)
self.logger.info(f'-> request {REQUESTED_OPERATIONS_COUNT} operations from peer {peer.mid.hex()}')
self.ez_send(peer, RequestStatementOperationMessage(count=REQUESTED_OPERATIONS_COUNT))

@lazy_wrapper(RawStatementOperationMessage)
def on_message(self, peer, raw: RawStatementOperationMessage):
operation, _ = self.serializer.unpack_serializable(StatementOperation, raw.operation)
signature, _ = self.serializer.unpack_serializable(StatementOperationSignature, raw.signature)
self.logger.debug(f'<- message received: {operation}')
try:
remote_key = self.crypto.key_from_public_bin(operation.creator_public_key)

self.requests.validate_peer(peer)
self.verify_signature(packed_message=raw.operation, key=remote_key, signature=signature.signature,
operation=operation)
self.validate_operation(operation)

with db_session():
is_added = self.db.add_operation(operation, signature.signature)
if is_added:
s = f'+ operation added ({operation.object!r} "{operation.predicate}" {operation.subject!r})'
self.logger.info(s)

except PeerValidationError as e: # peer has exhausted his response count
self.logger.warning(e)
except ValueError as e: # validation error
self.logger.warning(e)
except InvalidSignature as e: # signature verification error
self.logger.warning(e)

@lazy_wrapper(RequestStatementOperationMessage)
def on_request(self, peer, operation):
operations_count = min(max(1, operation.count), REQUESTED_OPERATIONS_COUNT)
self.logger.info(f'<- peer {peer.mid.hex()} requested {operations_count} operations')

with db_session:
random_operations = self.db.get_operations_for_gossip(
count=operations_count,
time_delta=TIME_DELTA_READY_TO_GOSSIP
)

self.logger.debug(f'Response {len(random_operations)} operations')
sent_operations = []
for op in random_operations:
try:
operation = StatementOperation(
subject_type=op.statement.subject.type,
subject=op.statement.subject.name,
predicate=op.statement.predicate,
object=op.statement.object.name,
operation=op.operation,
clock=op.clock,
creator_public_key=op.peer.public_key,
)
self.validate_operation(operation)
signature = StatementOperationSignature(signature=op.signature)
self.ez_send(peer, StatementOperationMessage(operation=operation, signature=signature))
sent_operations.append(operation)
except ValueError as e: # validation error
self.logger.warning(e)
if sent_operations:
sent_tags_info = ", ".join(f"({t})" for t in sent_operations)
self.logger.info(f'-> sent {len(sent_operations)} operations to peer: {peer.mid.hex()}')
self.logger.debug(f'-> sent operations ({sent_tags_info}) to peer: {peer.mid.hex()}')

@staticmethod
def validate_operation(operation: StatementOperation):
validate_resource(operation.subject)
validate_resource(operation.object)
validate_operation(operation.operation)
validate_resource_type(operation.subject_type)
validate_resource_type(operation.predicate)

def verify_signature(self, packed_message: bytes, key: Key, signature: bytes, operation: StatementOperation):
if not self.crypto.is_valid_signature(key, packed_message, signature):
raise InvalidSignature(f'Invalid signature for {operation}')

def sign(self, operation: StatementOperation) -> bytes:
packed = self.serializer.pack_serializable(operation)
return self.crypto.create_signature(self.key, packed)
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from dataclasses import dataclass

from ipv8.messaging.payload_dataclass import overwrite_dataclass, type_from_format

dataclass = overwrite_dataclass(dataclass)


@dataclass
class StatementOperation:
"""Do not change the format of the StatementOperation, because this will result in an invalid signature.
"""
subject_type: int # ResourceType enum
subject: str
predicate: int # ResourceType enum
object: str
operation: int # Operation enum
clock: int # This is the lamport-like clock that unique for each quadruple {public_key, subject, predicate, object}
creator_public_key: type_from_format('74s')

def __str__(self):
return f'({self.subject} {self.predicate} {self.object}), o:{self.operation}, c:{self.clock}))'


RAW_DATA = type_from_format('varlenH')
STATEMENT_OPERATION_MESSAGE_ID = 2


@dataclass
class StatementOperationSignature:
signature: type_from_format('64s')


@dataclass(msg_id=STATEMENT_OPERATION_MESSAGE_ID)
class RawStatementOperationMessage:
""" RAW payload class is used for reducing ipv8 unpacking operations
For more information take a look at: https://github.com/Tribler/tribler/pull/6396#discussion_r728334323
"""
operation: RAW_DATA
signature: RAW_DATA


@dataclass(msg_id=STATEMENT_OPERATION_MESSAGE_ID)
class StatementOperationMessage:
operation: StatementOperation
signature: StatementOperationSignature


@dataclass(msg_id=1)
class RequestStatementOperationMessage:
count: int
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from tribler.core.components.knowledge.db.knowledge_db import Operation, ResourceType
from tribler.core.components.knowledge.knowledge_constants import MAX_RESOURCE_LENGTH, MIN_RESOURCE_LENGTH


def validate_resource(resource: str):
"""Validate the resource. Raises ValueError, in the case the resource is not valid."""
if len(resource) < MIN_RESOURCE_LENGTH or len(resource) > MAX_RESOURCE_LENGTH:
raise ValueError(f'Tag length should be in range [{MIN_RESOURCE_LENGTH}..{MAX_RESOURCE_LENGTH}]')


def is_valid_resource(resource: str) -> bool:
"""Validate the resource. Returns False, in the case the resource is not valid."""
try:
validate_resource(resource)
except ValueError:
return False
return True


def validate_operation(operation: int):
"""Validate the incoming operation. Raises ValueError, in the case the operation is not valid."""
Operation(operation)


def validate_resource_type(t: int):
"""Validate the resource type. Raises ValueError, in the case the type is not valid."""
ResourceType(t)
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ class PeerValidationError(ValueError):
...


class TagRequests:
class OperationsRequests:
""" This class is design for controlling requests during pull-based gossip.
The main idea:
Expand Down
Loading

0 comments on commit 499d559

Please sign in to comment.