Skip to content

Commit

Permalink
Add Tag Rules Processor
Browse files Browse the repository at this point in the history
  • Loading branch information
drew2a committed Feb 1, 2022
1 parent ed80c7e commit d42afef
Show file tree
Hide file tree
Showing 9 changed files with 457 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
from tribler_core.components.metadata_store.category_filter.family_filter import default_xxx_filter
from tribler_core.components.metadata_store.db.orm_bindings.channel_node import COMMITTED
from tribler_core.components.metadata_store.db.serialization import EPOCH, REGULAR_TORRENT, TorrentMetadataPayload
from tribler_core.notifier import Notifier
from tribler_core.utilities.tracker_utils import get_uniformed_tracker_url
from tribler_core.utilities.unicode import ensure_unicode, hexlify

NULL_KEY_SUBST = b"\00"
NEW_TORRENT_METADATA_CREATED: str = 'TorrentMetadata:new_torrent_metadata_created'


# This function is used to devise id_ from infohash in deterministic way. Used in FFA channels.
Expand Down Expand Up @@ -44,7 +46,7 @@ def tdef_to_metadata_dict(tdef):
}


def define_binding(db):
def define_binding(db, notifier: Notifier, tag_version: int):
class TorrentMetadata(db.MetadataNode):
"""
This ORM binding class is intended to store Torrent objects, i.e. infohashes along with some related metadata.
Expand All @@ -61,12 +63,13 @@ class TorrentMetadata(db.MetadataNode):
# Local
xxx = orm.Optional(float, default=0)
health = orm.Optional('TorrentState', reverse='metadata')
tag_version = orm.Required(int, default=0)

# Special class-level properties
_payload_class = TorrentMetadataPayload
payload_arguments = _payload_class.__init__.__code__.co_varnames[
: _payload_class.__init__.__code__.co_argcount
][1:]
: _payload_class.__init__.__code__.co_argcount
][1:]
nonpersonal_attributes = db.MetadataNode.nonpersonal_attributes + (
'infohash',
'size',
Expand All @@ -86,6 +89,11 @@ def __init__(self, *args, **kwargs):

if 'tracker_info' in kwargs:
self.add_tracker(kwargs["tracker_info"])
if notifier:
notifier.notify(NEW_TORRENT_METADATA_CREATED,
infohash=kwargs.get("infohash"),
title=self.title)
self.tag_version = tag_version

def add_tracker(self, tracker_url):
sanitized_url = get_uniformed_tracker_url(tracker_url)
Expand Down
93 changes: 53 additions & 40 deletions src/tribler-core/tribler_core/components/metadata_store/db/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from asyncio import get_event_loop
from datetime import datetime, timedelta
from time import sleep, time
from typing import Union
from typing import Optional, Union

from lz4.frame import LZ4FrameDecompressor

Expand Down Expand Up @@ -49,19 +49,19 @@
from tribler_core.components.metadata_store.remote_query_community.payload_checker import process_payload
from tribler_core.exceptions import InvalidSignatureException
from tribler_core.utilities.path_util import Path
from tribler_core.utilities.pony_utils import get_or_create
from tribler_core.utilities.unicode import hexlify
from tribler_core.utilities.utilities import MEMORY_DB

BETA_DB_VERSIONS = [0, 1, 2, 3, 4, 5]
CURRENT_DB_VERSION = 13
CURRENT_DB_VERSION = 14

MIN_BATCH_SIZE = 10
MAX_BATCH_SIZE = 1000

POPULAR_TORRENTS_FRESHNESS_PERIOD = 60 * 60 * 24 # Last day
POPULAR_TORRENTS_COUNT = 100


# This table should never be used from ORM directly.
# It is created as a VIRTUAL table by raw SQL and
# maintained by SQL triggers.
Expand Down Expand Up @@ -136,14 +136,15 @@

class MetadataStore:
def __init__(
self,
db_filename: Union[Path, type(MEMORY_DB)],
channels_dir,
my_key,
disable_sync=False,
notifier=None,
check_tables=True,
db_version: int = CURRENT_DB_VERSION,
self,
db_filename: Union[Path, type(MEMORY_DB)],
channels_dir,
my_key,
disable_sync=False,
notifier=None,
check_tables=True,
db_version: int = CURRENT_DB_VERSION,
tag_version: int = 0
):
self.notifier = notifier # Reference to app-level notification service
self.db_path = db_filename
Expand Down Expand Up @@ -190,7 +191,11 @@ def sqlite_disable_sync(_, connection):

self.MetadataNode = metadata_node.define_binding(self._db)
self.CollectionNode = collection_node.define_binding(self._db)
self.TorrentMetadata = torrent_metadata.define_binding(self._db)
self.TorrentMetadata = torrent_metadata.define_binding(
self._db,
notifier=notifier,
tag_version=tag_version
)
self.ChannelMetadata = channel_metadata.define_binding(self._db)

self.JsonNode = json_node.define_binding(self._db, db_version)
Expand Down Expand Up @@ -242,6 +247,14 @@ def wrapper():

return await get_event_loop().run_in_executor(None, wrapper)

def set_value(self, key: str, value: str):
key_value = get_or_create(self.MiscData, name=key)
key_value.value = value

def get_value(self, key: str, default: Optional[str] = None) -> Optional[str]:
data = self.MiscData.get(name=key)
return data.value if data else default

def drop_indexes(self):
cursor = self._db.get_connection().cursor()
cursor.execute("select name from sqlite_master where type='index' and name like 'idx_%'")
Expand Down Expand Up @@ -391,9 +404,9 @@ def process_channel_dir(self, dirname, public_key, id_, **kwargs):
if not channel:
return
if (
blob_sequence_number <= channel.start_timestamp
or blob_sequence_number <= channel.local_version
or blob_sequence_number > channel.timestamp
blob_sequence_number <= channel.start_timestamp
or blob_sequence_number <= channel.local_version
or blob_sequence_number > channel.timestamp
):
continue
try:
Expand Down Expand Up @@ -595,28 +608,28 @@ def search_keyword(self, query, lim=100):

@db_session
def get_entries_query(
self,
metadata_type=None,
channel_pk=None,
exclude_deleted=False,
hide_xxx=False,
exclude_legacy=False,
origin_id=None,
sort_by=None,
sort_desc=True,
max_rowid=None,
txt_filter=None,
subscribed=None,
category=None,
attribute_ranges=None,
infohash=None,
infohash_set=None,
id_=None,
complete_channel=None,
self_checked_torrent=None,
cls=None,
health_checked_after=None,
popular=None,
self,
metadata_type=None,
channel_pk=None,
exclude_deleted=False,
hide_xxx=False,
exclude_legacy=False,
origin_id=None,
sort_by=None,
sort_desc=True,
max_rowid=None,
txt_filter=None,
subscribed=None,
category=None,
attribute_ranges=None,
infohash=None,
infohash_set=None,
id_=None,
complete_channel=None,
self_checked_torrent=None,
cls=None,
health_checked_after=None,
popular=None,
):
"""
This method implements REST-friendly way to get entries from the database.
Expand Down Expand Up @@ -662,8 +675,8 @@ def get_entries_query(
if attribute_ranges is not None:
for attr, left, right in attribute_ranges:
if (
self.ChannelNode._adict_.get(attr) # pylint: disable=W0212
or self.ChannelNode._subclass_adict_.get(attr) # pylint: disable=W0212
self.ChannelNode._adict_.get(attr) # pylint: disable=W0212
or self.ChannelNode._subclass_adict_.get(attr) # pylint: disable=W0212
) is None: # Check against code injection
raise AttributeError("Tried to query for non-existent attribute")
if left is not None:
Expand Down Expand Up @@ -737,7 +750,7 @@ def get_entries(self, first=1, last=None, **kwargs):
:return: A list of class members
"""
pony_query = self.get_entries_query(**kwargs)
result = pony_query[(first or 1) - 1 : last]
result = pony_query[(first or 1) - 1: last]
for entry in result:
# ACHTUNG! This is necessary in order to load entry.health inside db_session,
# to be able to perform successfully `entry.to_simple_dict()` later
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
from tribler_core.components.base import Component
from tribler_core.components.key.key_component import KeyComponent
from tribler_core.components.metadata_store.db.store import MetadataStore
from tribler_core.components.metadata_store.utils import generate_test_channels
from tribler_core.components.tag.tag_component import TagComponent
from tribler_core.components.tag.rules.tag_rules_processor import TagRulesProcessor


class MetadataStoreComponent(Component):
Expand Down Expand Up @@ -42,13 +41,11 @@ async def run(self):
key_component.primary_key,
notifier=self.session.notifier,
disable_sync=config.gui_test_mode,
tag_version=TagRulesProcessor.version
)
self.mds = metadata_store
self.session.notifier.add_observer(NTFY.TORRENT_METADATA_ADDED,
self.session.notifier.add_observer(NTFY.TORRENT_METADATA_ADDED.value,
metadata_store.TorrentMetadata.add_ffa_from_dict)
if config.gui_test_mode:
tag_component = await self.require_component(TagComponent)
generate_test_channels(metadata_store, tag_component.tags_db)

async def shutdown(self):
await super().shutdown()
Expand Down
63 changes: 63 additions & 0 deletions src/tribler-core/tribler_core/components/tag/rules/tag_rules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import re
from typing import AnyStr, Iterable, Optional, Pattern, Sequence

from tribler_core.components.tag.community.tag_validator import is_valid_tag

# Each regex expression should contain just a single capturing group:
square_brackets_re = re.compile(r'\[([^\[\]]+)]')
parentheses_re = re.compile(r'\(([^()]+)\)')
extension_re = re.compile(r'\.(\w{3,4})$')
delimiter_re = re.compile(r'([^\s.,/|]+)')

tags_in_square_brackets = [
square_brackets_re, # extract content from square brackets
delimiter_re # divide content by "," or "." or " " or "/"
]

tags_in_parentheses = [
parentheses_re, # extract content from brackets
delimiter_re # divide content by "," or "." or " " or "/"
]

tags_in_extension = [
extension_re # extract an extension
]

RulesList = Sequence[Sequence[Pattern[AnyStr]]]
default_rules: RulesList = [
tags_in_square_brackets,
tags_in_parentheses,
tags_in_extension
]


def extract_tags(text: str, rules: Optional[RulesList] = None) -> Iterable[str]:
""" Extract tags by using the giving rules.
Rules are represented by an array of an array of regexes.
Each rule contains one or more regex expressions.
During the `text` processing, each rule will be applied to the `text` value.
All extracted tags will be returned.
During application of the particular rule, `text` will be split into
tokens by application of the first regex expression. Then, second regex
expression will be applied to each tokens that were extracted on the
previous step.
This process will be repeated until regex expression ends.
"""
rules = rules or default_rules
for rule in rules:
text_set = {text}
for regex in rule:
next_text_set = set()
for token in text_set:
for match in regex.finditer(token):
next_text_set |= set(match.groups())
text_set = next_text_set
yield from text_set


def extract_only_valid_tags(text: str, rules: Optional[RulesList] = None) -> Iterable[str]:
extracted_tags_gen = (t.lower() for t in extract_tags(text, rules))
yield from (t for t in extracted_tags_gen if is_valid_tag(t))
Loading

0 comments on commit d42afef

Please sign in to comment.