Skip to content

Commit

Permalink
Refactor autogeneration of tags
Browse files Browse the repository at this point in the history
  • Loading branch information
drew2a committed Jan 14, 2022
1 parent 701cb39 commit ba45b9d
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def tdef_to_metadata_dict(tdef):
}


def define_binding(db, notifier: Notifier):
def define_binding(db, notifier: Notifier, tag_version: int):
class TorrentMetadata(db.MetadataNode):
"""
This ORM binding class is intended to store Torrent objects, i.e. infohashes along with some related metadata.
Expand Down Expand Up @@ -75,6 +75,7 @@ class TorrentMetadata(db.MetadataNode):
'torrent_date',
'tracker_info',
)
tag_version = orm.Optional(int)

def __init__(self, *args, **kwargs):
if "health" not in kwargs and "infohash" in kwargs:
Expand All @@ -90,6 +91,7 @@ def __init__(self, *args, **kwargs):
self.add_tracker(kwargs["tracker_info"])
if notifier:
notifier.notify(NEW_TORRENT_METADATA_CREATED, infohash=kwargs.get("infohash"), title=self.title)
self.tag_version = tag_version

def add_tracker(self, tracker_url):
sanitized_url = get_uniformed_tracker_url(tracker_url)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@
from asyncio import get_event_loop
from datetime import datetime, timedelta
from time import sleep, time
from typing import Union
from typing import Optional, Union

from lz4.frame import LZ4FrameDecompressor

from pony import orm
from pony.orm import db_session, desc, left_join, raw_sql, select

from tribler_common.simpledefs import NTFY

from tribler_core.components.metadata_store.db.orm_bindings import (
binary_node,
channel_description,
Expand Down Expand Up @@ -143,6 +141,7 @@ def __init__(
notifier=None,
check_tables=True,
db_version: int = CURRENT_DB_VERSION,
tag_version: Optional[int] = None
):
self.notifier = notifier # Reference to app-level notification service
self.db_path = db_filename
Expand Down Expand Up @@ -189,7 +188,11 @@ def sqlite_disable_sync(_, connection):

self.MetadataNode = metadata_node.define_binding(self._db)
self.CollectionNode = collection_node.define_binding(self._db)
self.TorrentMetadata = torrent_metadata.define_binding(self._db, notifier)
self.TorrentMetadata = torrent_metadata.define_binding(
self._db,
notifier=notifier,
tag_version=tag_version
)
self.ChannelMetadata = channel_metadata.define_binding(self._db)

self.JsonNode = json_node.define_binding(self._db, db_version)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from tribler_core.components.key.key_component import KeyComponent
from tribler_core.components.metadata_store.db.store import MetadataStore
from tribler_core.components.metadata_store.utils import generate_test_channels
from tribler_core.components.tag.rules.tag_rules_processor import TagRulesProcessor
from tribler_core.components.tag.tag_component import TagComponent


Expand Down Expand Up @@ -42,9 +43,10 @@ async def run(self):
key_component.primary_key,
notifier=self.session.notifier,
disable_sync=config.gui_test_mode,
tag_version=TagRulesProcessor.version
)
self.mds = metadata_store
self.session.notifier.add_observer(NTFY.TORRENT_METADATA_ADDED,
self.session.notifier.add_observer(NTFY.TORRENT_METADATA_ADDED.value,
metadata_store.TorrentMetadata.add_ffa_from_dict)
if config.gui_test_mode:
tag_component = await self.require_component(TagComponent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

from ipv8.messaging.payload_dataclass import overwrite_dataclass, type_from_format


dataclass = overwrite_dataclass(dataclass)


Expand Down
25 changes: 16 additions & 9 deletions src/tribler-core/tribler_core/components/tag/db/tag_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@
from tribler_core.components.tag.community.tag_payload import TagOperation, TagOperationEnum
from tribler_core.utilities.unicode import hexlify


CLOCK_START_VALUE = 0

# we picked `-1` as a value because it is allows manually created tags get a higher priority
CLOCK_FOR_AUTOGENERATED_TAGS = CLOCK_START_VALUE - 1
PUBLIC_KEY_FOR_AUTO_GENERATED_TAGS = b'auto_generated'

SHOW_THRESHOLD = 2
HIDE_THRESHOLD = -2

Expand Down Expand Up @@ -106,7 +113,7 @@ def _get_or_create(cls, create_kwargs=None, **kwargs): # pylint: disable=bad-st
return obj

def add_tag_operation(self, operation: TagOperation, signature: bytes, is_local_peer: bool = False,
is_auto_generated:bool = False) -> bool:
is_auto_generated: bool = False, counter_increment: int = 1) -> bool:
""" Add the operation that will be applied to the tag.
Args:
operation: the class describes the adding operation
Expand All @@ -126,29 +133,29 @@ def add_tag_operation(self, operation: TagOperation, signature: bytes, is_local_
if not op: # then insert
self.instance.TorrentTagOp(torrent_tag=torrent_tag, peer=peer, operation=operation.operation,
clock=operation.clock, signature=signature, auto_generated=is_auto_generated)
torrent_tag.update_counter(operation.operation, is_local_peer=is_local_peer)
torrent_tag.update_counter(operation.operation, increment=counter_increment, is_local_peer=is_local_peer)
return True

# if it is a message from the past, then return
if operation.clock <= op.clock:
return False

# if it is an auto generated operation, then return
if is_auto_generated:
return False

# To prevent endless incrementing of the operation, we apply the following logic:

# 1. Decrement previous operation
torrent_tag.update_counter(op.operation, increment=-1, is_local_peer=is_local_peer)
torrent_tag.update_counter(op.operation, increment=-counter_increment, is_local_peer=is_local_peer)
# 2. Increment new operation
torrent_tag.update_counter(operation.operation, is_local_peer=is_local_peer)
torrent_tag.update_counter(operation.operation, increment=counter_increment, is_local_peer=is_local_peer)

# 3. Update the operation entity
op.set(operation=operation.operation, clock=operation.clock, signature=signature,
updated_at=datetime.datetime.utcnow(), auto_generated=is_auto_generated)
return True

def add_auto_generated_tag_operation(self, operation: TagOperation):
self.add_tag_operation(operation, signature=b'', is_local_peer=False, is_auto_generated=True,
counter_increment=SHOW_THRESHOLD)

@staticmethod
def _show_condition(torrent_tag):
"""This function determines show condition for the torrent_tag"""
Expand Down Expand Up @@ -213,7 +220,7 @@ def get_clock(self, operation: TagOperation) -> int:
return 0

op = self.instance.TorrentTagOp.get(torrent_tag=torrent_tag, peer=peer)
return op.clock if op else 0
return op.clock if op else CLOCK_START_VALUE

def get_tags_operations_for_gossip(self, time_delta, count: int = 10) -> List:
""" Get random operations from the DB that older than time_delta.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

from ipv8.test.base import TestBase
from tribler_core.components.tag.community.tag_payload import TagOperation, TagOperationEnum
from tribler_core.components.tag.db.tag_db import SHOW_THRESHOLD, TagDatabase
from tribler_core.components.tag.db.tag_db import CLOCK_FOR_AUTOGENERATED_TAGS, PUBLIC_KEY_FOR_AUTO_GENERATED_TAGS, \
SHOW_THRESHOLD, TagDatabase


# pylint: disable=protected-access
Expand Down Expand Up @@ -56,11 +57,11 @@ def create_operation(infohash=b'infohash', tag='tag', peer=b'', operation=TagOpe

@staticmethod
def add_operation(tag_db: TagDatabase, infohash=b'infohash', tag='tag', peer=b'', operation=TagOperationEnum.ADD,
is_local_peer=False, clock=None, is_auto_generated=False):
is_local_peer=False, clock=None, is_auto_generated=False, counter_increment: int = 1):
operation = TestTagDB.create_operation(infohash, tag, peer, operation, clock)
operation.clock = clock or tag_db.get_clock(operation) + 1
result = tag_db.add_tag_operation(operation, signature=b'', is_local_peer=is_local_peer,
is_auto_generated=is_auto_generated)
is_auto_generated=is_auto_generated, counter_increment=counter_increment)
commit()
return result

Expand Down Expand Up @@ -183,17 +184,20 @@ async def test_remote_add_multiple_tag_operations(self):

@db_session
async def test_add_auto_generated_operation(self):
# test that the first added auto generated operation is stores in DB
assert self.add_operation(self.db, is_auto_generated=True)
assert self.db.instance.TorrentTagOp.get().auto_generated

# test that the next not auto generated operation is updated in DB
assert self.add_operation(self.db, is_auto_generated=False)
assert not self.db.instance.TorrentTagOp.get().auto_generated
self.db.add_auto_generated_tag_operation(
operation=TagOperation(
infohash=b'infohash',
operation=TagOperationEnum.ADD,
clock=CLOCK_FOR_AUTOGENERATED_TAGS,
creator_public_key=PUBLIC_KEY_FOR_AUTO_GENERATED_TAGS,
tag='tag'
)
)

# test that the next auto generated operation is not updated in DB
assert not self.add_operation(self.db, is_auto_generated=True)
assert not self.db.instance.TorrentTagOp.get().auto_generated
assert self.db.instance.TorrentTagOp.get().auto_generated
assert self.db.instance.TorrentTagOp.get().clock == CLOCK_FOR_AUTOGENERATED_TAGS
assert self.db.instance.TorrentTag.get().added_count == SHOW_THRESHOLD
assert self.db.instance.Peer.get().public_key == PUBLIC_KEY_FOR_AUTO_GENERATED_TAGS

@db_session
async def test_multiple_tags(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,36 @@

from pony.orm import db_session

from ipv8.keyvault.private.libnaclkey import LibNaCLSK
from tribler_core.components.metadata_store.db.orm_bindings.torrent_metadata import NEW_TORRENT_METADATA_CREATED
import tribler_core.components.metadata_store.db.orm_bindings.torrent_metadata as torrent_metadata
import tribler_core.components.metadata_store.db.store as MDS
from ipv8.taskmanager import TaskManager
from tribler_core.components.tag.community.tag_payload import TagOperation, TagOperationEnum
from tribler_core.components.tag.db.tag_db import TagDatabase
from tribler_core.components.tag.db.tag_db import CLOCK_FOR_AUTOGENERATED_TAGS, PUBLIC_KEY_FOR_AUTO_GENERATED_TAGS, \
TagDatabase
from tribler_core.components.tag.rules.tag_rules import extract_only_valid_tags
from tribler_core.notifier import Notifier


class TagRulesProcessor:
def __init__(self, notifier: Notifier, db: TagDatabase, key: LibNaCLSK):
class TagRulesProcessor(TaskManager):
version = 1 # this value must be incremented in the case of new rules set has been applied

def __init__(self, notifier: Notifier, db: TagDatabase, mds: MDS.MetadataStore, batch_size: int = 1000,
interval: float = 10):
super().__init__()
self.logger = logging.getLogger(self.__class__.__name__)

self.notifier = notifier
self.db = db
self.public_key = key.pub().key_to_bin()
self.mds = mds
self.batch_size = batch_size

self.notifier.add_observer(torrent_metadata.NEW_TORRENT_METADATA_CREATED, self.process_torrent_title)
self.register_task(name='', interval=interval, task=self.process_batch)

self.notifier.add_observer(NEW_TORRENT_METADATA_CREATED, self.on_new_torrent_metadata_created)
def process_batch(self):
...

def on_new_torrent_metadata_created(self, infohash: Optional[bytes] = None, title: Optional[str] = None):
def process_torrent_title(self, infohash: Optional[bytes] = None, title: Optional[str] = None):
if not infohash or not title:
return
tags = set(extract_only_valid_tags(title))
Expand All @@ -35,8 +46,10 @@ def save_tags(self, infohash: bytes, tags: Set[str]):
operation = TagOperation(
infohash=infohash,
operation=TagOperationEnum.ADD,
clock=0,
creator_public_key=self.public_key,
clock=CLOCK_FOR_AUTOGENERATED_TAGS,
creator_public_key=PUBLIC_KEY_FOR_AUTO_GENERATED_TAGS,
tag=tag
)
self.db.add_tag_operation(operation=operation, signature=b'', is_local_peer=True, is_auto_generated=True)
# we want auto generated operation to act like a normal operation
# therefore we use 2 as a `counter_increment` to immediately pass SHOW_THRESHOLD
self.db.add_auto_generated_tag_operation(operation=operation)
5 changes: 3 additions & 2 deletions src/tribler-core/tribler_core/components/tag/tag_component.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import tribler_core.components.metadata_store.metadata_store_component as metadata_store_component
from tribler_common.simpledefs import STATEDIR_DB_DIR

from tribler_core.components.base import Component
from tribler_core.components.ipv8.ipv8_component import Ipv8Component
from tribler_core.components.key.key_component import KeyComponent
Expand All @@ -21,6 +21,7 @@ async def run(self):

self._ipv8_component = await self.require_component(Ipv8Component)
key_component = await self.require_component(KeyComponent)
mds_component = await self.require_component(metadata_store_component.MetadataStoreComponent)

db_path = self.session.config.state_dir / STATEDIR_DB_DIR / "tags.db"
if self.session.config.gui_test_mode:
Expand All @@ -37,7 +38,7 @@ async def run(self):
self.rules_processor = TagRulesProcessor(
notifier=self.session.notifier,
db=self.tags_db,
key=key_component.secondary_key
mds=mds_component.mds,
)

self._ipv8_component.initialise_community_by_default(self.community)
Expand Down

0 comments on commit ba45b9d

Please sign in to comment.