From ba45b9d048ff511ebe9bd21b84f11b81d428e079 Mon Sep 17 00:00:00 2001 From: drew2a Date: Fri, 14 Jan 2022 14:29:52 +0100 Subject: [PATCH] Refactor autogeneration of tags --- .../db/orm_bindings/torrent_metadata.py | 4 ++- .../components/metadata_store/db/store.py | 11 +++--- .../metadata_store_component.py | 4 ++- .../components/tag/community/tag_payload.py | 1 - .../tribler_core/components/tag/db/tag_db.py | 25 ++++++++----- .../components/tag/db/tests/test_tag_db.py | 30 +++++++++------- .../tag/rules/tag_rules_processor.py | 35 +++++++++++++------ .../components/tag/tag_component.py | 5 +-- 8 files changed, 73 insertions(+), 42 deletions(-) diff --git a/src/tribler-core/tribler_core/components/metadata_store/db/orm_bindings/torrent_metadata.py b/src/tribler-core/tribler_core/components/metadata_store/db/orm_bindings/torrent_metadata.py index 7b77f2d1a7b..c639b6eba49 100644 --- a/src/tribler-core/tribler_core/components/metadata_store/db/orm_bindings/torrent_metadata.py +++ b/src/tribler-core/tribler_core/components/metadata_store/db/orm_bindings/torrent_metadata.py @@ -46,7 +46,7 @@ def tdef_to_metadata_dict(tdef): } -def define_binding(db, notifier: Notifier): +def define_binding(db, notifier: Notifier, tag_version: int): class TorrentMetadata(db.MetadataNode): """ This ORM binding class is intended to store Torrent objects, i.e. infohashes along with some related metadata. @@ -75,6 +75,7 @@ class TorrentMetadata(db.MetadataNode): 'torrent_date', 'tracker_info', ) + tag_version = orm.Optional(int) def __init__(self, *args, **kwargs): if "health" not in kwargs and "infohash" in kwargs: @@ -90,6 +91,7 @@ def __init__(self, *args, **kwargs): self.add_tracker(kwargs["tracker_info"]) if notifier: notifier.notify(NEW_TORRENT_METADATA_CREATED, infohash=kwargs.get("infohash"), title=self.title) + self.tag_version = tag_version def add_tracker(self, tracker_url): sanitized_url = get_uniformed_tracker_url(tracker_url) diff --git a/src/tribler-core/tribler_core/components/metadata_store/db/store.py b/src/tribler-core/tribler_core/components/metadata_store/db/store.py index 330fe86e80a..07be450c8f0 100644 --- a/src/tribler-core/tribler_core/components/metadata_store/db/store.py +++ b/src/tribler-core/tribler_core/components/metadata_store/db/store.py @@ -4,15 +4,13 @@ from asyncio import get_event_loop from datetime import datetime, timedelta from time import sleep, time -from typing import Union +from typing import Optional, Union from lz4.frame import LZ4FrameDecompressor - from pony import orm from pony.orm import db_session, desc, left_join, raw_sql, select from tribler_common.simpledefs import NTFY - from tribler_core.components.metadata_store.db.orm_bindings import ( binary_node, channel_description, @@ -143,6 +141,7 @@ def __init__( notifier=None, check_tables=True, db_version: int = CURRENT_DB_VERSION, + tag_version: Optional[int] = None ): self.notifier = notifier # Reference to app-level notification service self.db_path = db_filename @@ -189,7 +188,11 @@ def sqlite_disable_sync(_, connection): self.MetadataNode = metadata_node.define_binding(self._db) self.CollectionNode = collection_node.define_binding(self._db) - self.TorrentMetadata = torrent_metadata.define_binding(self._db, notifier) + self.TorrentMetadata = torrent_metadata.define_binding( + self._db, + notifier=notifier, + tag_version=tag_version + ) self.ChannelMetadata = channel_metadata.define_binding(self._db) self.JsonNode = json_node.define_binding(self._db, db_version) diff --git a/src/tribler-core/tribler_core/components/metadata_store/metadata_store_component.py b/src/tribler-core/tribler_core/components/metadata_store/metadata_store_component.py index 493cba017df..1a899b799c2 100644 --- a/src/tribler-core/tribler_core/components/metadata_store/metadata_store_component.py +++ b/src/tribler-core/tribler_core/components/metadata_store/metadata_store_component.py @@ -4,6 +4,7 @@ from tribler_core.components.key.key_component import KeyComponent from tribler_core.components.metadata_store.db.store import MetadataStore from tribler_core.components.metadata_store.utils import generate_test_channels +from tribler_core.components.tag.rules.tag_rules_processor import TagRulesProcessor from tribler_core.components.tag.tag_component import TagComponent @@ -42,9 +43,10 @@ async def run(self): key_component.primary_key, notifier=self.session.notifier, disable_sync=config.gui_test_mode, + tag_version=TagRulesProcessor.version ) self.mds = metadata_store - self.session.notifier.add_observer(NTFY.TORRENT_METADATA_ADDED, + self.session.notifier.add_observer(NTFY.TORRENT_METADATA_ADDED.value, metadata_store.TorrentMetadata.add_ffa_from_dict) if config.gui_test_mode: tag_component = await self.require_component(TagComponent) diff --git a/src/tribler-core/tribler_core/components/tag/community/tag_payload.py b/src/tribler-core/tribler_core/components/tag/community/tag_payload.py index e8165989c0f..c9ed6613b26 100644 --- a/src/tribler-core/tribler_core/components/tag/community/tag_payload.py +++ b/src/tribler-core/tribler_core/components/tag/community/tag_payload.py @@ -3,7 +3,6 @@ from ipv8.messaging.payload_dataclass import overwrite_dataclass, type_from_format - dataclass = overwrite_dataclass(dataclass) diff --git a/src/tribler-core/tribler_core/components/tag/db/tag_db.py b/src/tribler-core/tribler_core/components/tag/db/tag_db.py index fda21033272..5a09f810620 100644 --- a/src/tribler-core/tribler_core/components/tag/db/tag_db.py +++ b/src/tribler-core/tribler_core/components/tag/db/tag_db.py @@ -9,6 +9,13 @@ from tribler_core.components.tag.community.tag_payload import TagOperation, TagOperationEnum from tribler_core.utilities.unicode import hexlify + +CLOCK_START_VALUE = 0 + +# we picked `-1` as a value because it is allows manually created tags get a higher priority +CLOCK_FOR_AUTOGENERATED_TAGS = CLOCK_START_VALUE - 1 +PUBLIC_KEY_FOR_AUTO_GENERATED_TAGS = b'auto_generated' + SHOW_THRESHOLD = 2 HIDE_THRESHOLD = -2 @@ -106,7 +113,7 @@ def _get_or_create(cls, create_kwargs=None, **kwargs): # pylint: disable=bad-st return obj def add_tag_operation(self, operation: TagOperation, signature: bytes, is_local_peer: bool = False, - is_auto_generated:bool = False) -> bool: + is_auto_generated: bool = False, counter_increment: int = 1) -> bool: """ Add the operation that will be applied to the tag. Args: operation: the class describes the adding operation @@ -126,29 +133,29 @@ def add_tag_operation(self, operation: TagOperation, signature: bytes, is_local_ if not op: # then insert self.instance.TorrentTagOp(torrent_tag=torrent_tag, peer=peer, operation=operation.operation, clock=operation.clock, signature=signature, auto_generated=is_auto_generated) - torrent_tag.update_counter(operation.operation, is_local_peer=is_local_peer) + torrent_tag.update_counter(operation.operation, increment=counter_increment, is_local_peer=is_local_peer) return True # if it is a message from the past, then return if operation.clock <= op.clock: return False - # if it is an auto generated operation, then return - if is_auto_generated: - return False - # To prevent endless incrementing of the operation, we apply the following logic: # 1. Decrement previous operation - torrent_tag.update_counter(op.operation, increment=-1, is_local_peer=is_local_peer) + torrent_tag.update_counter(op.operation, increment=-counter_increment, is_local_peer=is_local_peer) # 2. Increment new operation - torrent_tag.update_counter(operation.operation, is_local_peer=is_local_peer) + torrent_tag.update_counter(operation.operation, increment=counter_increment, is_local_peer=is_local_peer) # 3. Update the operation entity op.set(operation=operation.operation, clock=operation.clock, signature=signature, updated_at=datetime.datetime.utcnow(), auto_generated=is_auto_generated) return True + def add_auto_generated_tag_operation(self, operation: TagOperation): + self.add_tag_operation(operation, signature=b'', is_local_peer=False, is_auto_generated=True, + counter_increment=SHOW_THRESHOLD) + @staticmethod def _show_condition(torrent_tag): """This function determines show condition for the torrent_tag""" @@ -213,7 +220,7 @@ def get_clock(self, operation: TagOperation) -> int: return 0 op = self.instance.TorrentTagOp.get(torrent_tag=torrent_tag, peer=peer) - return op.clock if op else 0 + return op.clock if op else CLOCK_START_VALUE def get_tags_operations_for_gossip(self, time_delta, count: int = 10) -> List: """ Get random operations from the DB that older than time_delta. diff --git a/src/tribler-core/tribler_core/components/tag/db/tests/test_tag_db.py b/src/tribler-core/tribler_core/components/tag/db/tests/test_tag_db.py index ddb86523e57..98087d03e5e 100644 --- a/src/tribler-core/tribler_core/components/tag/db/tests/test_tag_db.py +++ b/src/tribler-core/tribler_core/components/tag/db/tests/test_tag_db.py @@ -7,7 +7,8 @@ from ipv8.test.base import TestBase from tribler_core.components.tag.community.tag_payload import TagOperation, TagOperationEnum -from tribler_core.components.tag.db.tag_db import SHOW_THRESHOLD, TagDatabase +from tribler_core.components.tag.db.tag_db import CLOCK_FOR_AUTOGENERATED_TAGS, PUBLIC_KEY_FOR_AUTO_GENERATED_TAGS, \ + SHOW_THRESHOLD, TagDatabase # pylint: disable=protected-access @@ -56,11 +57,11 @@ def create_operation(infohash=b'infohash', tag='tag', peer=b'', operation=TagOpe @staticmethod def add_operation(tag_db: TagDatabase, infohash=b'infohash', tag='tag', peer=b'', operation=TagOperationEnum.ADD, - is_local_peer=False, clock=None, is_auto_generated=False): + is_local_peer=False, clock=None, is_auto_generated=False, counter_increment: int = 1): operation = TestTagDB.create_operation(infohash, tag, peer, operation, clock) operation.clock = clock or tag_db.get_clock(operation) + 1 result = tag_db.add_tag_operation(operation, signature=b'', is_local_peer=is_local_peer, - is_auto_generated=is_auto_generated) + is_auto_generated=is_auto_generated, counter_increment=counter_increment) commit() return result @@ -183,17 +184,20 @@ async def test_remote_add_multiple_tag_operations(self): @db_session async def test_add_auto_generated_operation(self): - # test that the first added auto generated operation is stores in DB - assert self.add_operation(self.db, is_auto_generated=True) - assert self.db.instance.TorrentTagOp.get().auto_generated - - # test that the next not auto generated operation is updated in DB - assert self.add_operation(self.db, is_auto_generated=False) - assert not self.db.instance.TorrentTagOp.get().auto_generated + self.db.add_auto_generated_tag_operation( + operation=TagOperation( + infohash=b'infohash', + operation=TagOperationEnum.ADD, + clock=CLOCK_FOR_AUTOGENERATED_TAGS, + creator_public_key=PUBLIC_KEY_FOR_AUTO_GENERATED_TAGS, + tag='tag' + ) + ) - # test that the next auto generated operation is not updated in DB - assert not self.add_operation(self.db, is_auto_generated=True) - assert not self.db.instance.TorrentTagOp.get().auto_generated + assert self.db.instance.TorrentTagOp.get().auto_generated + assert self.db.instance.TorrentTagOp.get().clock == CLOCK_FOR_AUTOGENERATED_TAGS + assert self.db.instance.TorrentTag.get().added_count == SHOW_THRESHOLD + assert self.db.instance.Peer.get().public_key == PUBLIC_KEY_FOR_AUTO_GENERATED_TAGS @db_session async def test_multiple_tags(self): diff --git a/src/tribler-core/tribler_core/components/tag/rules/tag_rules_processor.py b/src/tribler-core/tribler_core/components/tag/rules/tag_rules_processor.py index 80b4ad37653..f56c4af6d78 100644 --- a/src/tribler-core/tribler_core/components/tag/rules/tag_rules_processor.py +++ b/src/tribler-core/tribler_core/components/tag/rules/tag_rules_processor.py @@ -3,25 +3,36 @@ from pony.orm import db_session -from ipv8.keyvault.private.libnaclkey import LibNaCLSK -from tribler_core.components.metadata_store.db.orm_bindings.torrent_metadata import NEW_TORRENT_METADATA_CREATED +import tribler_core.components.metadata_store.db.orm_bindings.torrent_metadata as torrent_metadata +import tribler_core.components.metadata_store.db.store as MDS +from ipv8.taskmanager import TaskManager from tribler_core.components.tag.community.tag_payload import TagOperation, TagOperationEnum -from tribler_core.components.tag.db.tag_db import TagDatabase +from tribler_core.components.tag.db.tag_db import CLOCK_FOR_AUTOGENERATED_TAGS, PUBLIC_KEY_FOR_AUTO_GENERATED_TAGS, \ + TagDatabase from tribler_core.components.tag.rules.tag_rules import extract_only_valid_tags from tribler_core.notifier import Notifier -class TagRulesProcessor: - def __init__(self, notifier: Notifier, db: TagDatabase, key: LibNaCLSK): +class TagRulesProcessor(TaskManager): + version = 1 # this value must be incremented in the case of new rules set has been applied + + def __init__(self, notifier: Notifier, db: TagDatabase, mds: MDS.MetadataStore, batch_size: int = 1000, + interval: float = 10): + super().__init__() self.logger = logging.getLogger(self.__class__.__name__) self.notifier = notifier self.db = db - self.public_key = key.pub().key_to_bin() + self.mds = mds + self.batch_size = batch_size + + self.notifier.add_observer(torrent_metadata.NEW_TORRENT_METADATA_CREATED, self.process_torrent_title) + self.register_task(name='', interval=interval, task=self.process_batch) - self.notifier.add_observer(NEW_TORRENT_METADATA_CREATED, self.on_new_torrent_metadata_created) + def process_batch(self): + ... - def on_new_torrent_metadata_created(self, infohash: Optional[bytes] = None, title: Optional[str] = None): + def process_torrent_title(self, infohash: Optional[bytes] = None, title: Optional[str] = None): if not infohash or not title: return tags = set(extract_only_valid_tags(title)) @@ -35,8 +46,10 @@ def save_tags(self, infohash: bytes, tags: Set[str]): operation = TagOperation( infohash=infohash, operation=TagOperationEnum.ADD, - clock=0, - creator_public_key=self.public_key, + clock=CLOCK_FOR_AUTOGENERATED_TAGS, + creator_public_key=PUBLIC_KEY_FOR_AUTO_GENERATED_TAGS, tag=tag ) - self.db.add_tag_operation(operation=operation, signature=b'', is_local_peer=True, is_auto_generated=True) + # we want auto generated operation to act like a normal operation + # therefore we use 2 as a `counter_increment` to immediately pass SHOW_THRESHOLD + self.db.add_auto_generated_tag_operation(operation=operation) diff --git a/src/tribler-core/tribler_core/components/tag/tag_component.py b/src/tribler-core/tribler_core/components/tag/tag_component.py index 6bf6216e211..8af66200e92 100644 --- a/src/tribler-core/tribler_core/components/tag/tag_component.py +++ b/src/tribler-core/tribler_core/components/tag/tag_component.py @@ -1,5 +1,5 @@ +import tribler_core.components.metadata_store.metadata_store_component as metadata_store_component from tribler_common.simpledefs import STATEDIR_DB_DIR - from tribler_core.components.base import Component from tribler_core.components.ipv8.ipv8_component import Ipv8Component from tribler_core.components.key.key_component import KeyComponent @@ -21,6 +21,7 @@ async def run(self): self._ipv8_component = await self.require_component(Ipv8Component) key_component = await self.require_component(KeyComponent) + mds_component = await self.require_component(metadata_store_component.MetadataStoreComponent) db_path = self.session.config.state_dir / STATEDIR_DB_DIR / "tags.db" if self.session.config.gui_test_mode: @@ -37,7 +38,7 @@ async def run(self): self.rules_processor = TagRulesProcessor( notifier=self.session.notifier, db=self.tags_db, - key=key_component.secondary_key + mds=mds_component.mds, ) self._ipv8_component.initialise_community_by_default(self.community)