Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add reprocess feature for KnowledgeRulesProcessor #7379

Merged
merged 2 commits into from
Apr 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from tribler.core.components.key.key_component import KeyComponent
from tribler.core.components.knowledge.community.knowledge_community import KnowledgeCommunity
from tribler.core.components.knowledge.db.knowledge_db import KnowledgeDatabase
from tribler.core.components.knowledge.rules.tag_rules_processor import KnowledgeRulesProcessor
from tribler.core.components.knowledge.rules.knowledge_rules_processor import KnowledgeRulesProcessor
from tribler.core.components.metadata_store.utils import generate_test_channels
from tribler.core.utilities.simpledefs import STATEDIR_DB_DIR

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
DEFAULT_BATCH_SIZE = 1000

LAST_PROCESSED_TORRENT_ID = 'last_processed_torrent_id'
RULES_PROCESSOR_VERSION = 'rules_processor_version'


class KnowledgeRulesProcessor(TaskManager):
# this value must be incremented in the case of new rules set has been applied
version: int = 2
version: int = 4

def __init__(self, notifier: Notifier, db: KnowledgeDatabase, mds: MetadataStore,
batch_size: int = DEFAULT_BATCH_SIZE, interval: float = DEFAULT_INTERVAL):
Expand All @@ -45,14 +46,21 @@ def __init__(self, notifier: Notifier, db: KnowledgeDatabase, mds: MetadataStore
def start(self):
self.logger.info('Start')

rules_processor_version = self.get_rules_processor_version()
if rules_processor_version < self.version:
# the database was processed by the previous version of the rules processor
self.logger.info('New version of rules processor is available. Starting knowledge generation from scratch.')
self.set_last_processed_torrent_id(0)
self.set_rules_processor_version(self.version)

max_row_id = self.mds.get_max_rowid()
is_finished = self.get_last_processed_torrent_id() >= max_row_id

if not is_finished:
self.logger.info(f'Register process_batch task with interval: {self.interval} sec')
self.register_task(name=self.process_batch.__name__,
interval=self.interval,
task=self.process_batch)
self.register_task(name=self.process_batch.__name__, interval=self.interval, task=self.process_batch)
else:
self.logger.info(f'Database processing is finished. Last processed torrent id: {max_row_id}')

async def shutdown(self):
await self.shutdown_task_manager()
Expand All @@ -77,7 +85,7 @@ def query(_start, _end):
torrent.tag_processor_version = self.version
processed += 1

self.mds.set_value(LAST_PROCESSED_TORRENT_ID, str(end))
self.set_last_processed_torrent_id(end)
self.logger.info(f'Processed: {processed} titles. Added {added} tags.')

is_finished = end >= max_row_id
Expand Down Expand Up @@ -108,3 +116,12 @@ def save_statements(self, subject_type: ResourceType, subject: str, predicate: R

def get_last_processed_torrent_id(self) -> int:
return int(self.mds.get_value(LAST_PROCESSED_TORRENT_ID, default='0'))

def set_last_processed_torrent_id(self, value: int):
self.mds.set_value(LAST_PROCESSED_TORRENT_ID, str(value))

def get_rules_processor_version(self) -> int:
return int(self.mds.get_value(RULES_PROCESSOR_VERSION, default='0'))

def set_rules_processor_version(self, version: int):
self.mds.set_value(RULES_PROCESSOR_VERSION, str(version))
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import os
from unittest.mock import MagicMock, Mock, patch

import pytest
from ipv8.keyvault.private.libnaclkey import LibNaCLSK
from pony.orm import db_session

from tribler.core import notifications
from tribler.core.components.knowledge.db.knowledge_db import ResourceType
from tribler.core.components.knowledge.rules.knowledge_rules_processor import KnowledgeRulesProcessor
from tribler.core.components.metadata_store.db.serialization import REGULAR_TORRENT
from tribler.core.components.metadata_store.db.store import MetadataStore
from tribler.core.utilities.path_util import Path
from tribler.core.utilities.utilities import MEMORY_DB

TEST_BATCH_SIZE = 100
TEST_INTERVAL = 0.1


# pylint: disable=redefined-outer-name, protected-access
@pytest.fixture
async def tag_rules_processor(tmp_path: Path):
mds = MetadataStore(db_filename=MEMORY_DB, channels_dir=tmp_path, my_key=LibNaCLSK())
processor = KnowledgeRulesProcessor(notifier=MagicMock(), db=MagicMock(), mds=mds,
batch_size=TEST_BATCH_SIZE,
interval=TEST_INTERVAL)
yield processor
await processor.shutdown()


def test_constructor(tag_rules_processor: KnowledgeRulesProcessor):
# test that constructor of TagRulesProcessor works as expected
assert tag_rules_processor.batch_size == TEST_BATCH_SIZE
assert tag_rules_processor.interval == TEST_INTERVAL

m: MagicMock = tag_rules_processor.notifier.add_observer
m.assert_called_with(notifications.new_torrent_metadata_created, tag_rules_processor.process_torrent_title,
synchronous=True)


@patch.object(KnowledgeRulesProcessor, 'save_statements')
def test_process_torrent_file(mocked_save_tags: MagicMock, tag_rules_processor: KnowledgeRulesProcessor):
# test on None
assert not tag_rules_processor.process_torrent_title(infohash=None, title='title')
assert not tag_rules_processor.process_torrent_title(infohash=b'infohash', title=None)

# test that process_torrent_title doesn't find any tags in the title
assert not tag_rules_processor.process_torrent_title(infohash=b'infohash', title='title')
mocked_save_tags.assert_not_called()

# test that process_torrent_title does find tags in the title
assert tag_rules_processor.process_torrent_title(infohash=b'infohash', title='title [tag]') == 1
mocked_save_tags.assert_called_with(subject_type=ResourceType.TORRENT, subject='696e666f68617368', objects={'tag'},
predicate=ResourceType.TAG)


def test_save_tags(tag_rules_processor: KnowledgeRulesProcessor):
# test that tag_rules_processor calls TagDatabase with correct args
expected_calls = [
{'obj': 'tag2', 'predicate': ResourceType.TAG, 'subject': 'infohash', 'subject_type': ResourceType.TORRENT},
{'obj': 'tag1', 'predicate': ResourceType.TAG, 'subject': 'infohash', 'subject_type': ResourceType.TORRENT}
]
tag_rules_processor.save_statements(subject_type=ResourceType.TORRENT, subject='infohash',
predicate=ResourceType.TAG,
objects={'tag1', 'tag2'})
actual_calls = [c.kwargs for c in tag_rules_processor.db.add_auto_generated.mock_calls]

# compare two lists of dict
assert [c for c in actual_calls if c not in expected_calls] == []


@db_session
@patch.object(KnowledgeRulesProcessor, 'process_torrent_title', new=MagicMock(return_value=1))
@patch.object(KnowledgeRulesProcessor, 'cancel_pending_task')
def test_process_batch(mocked_cancel_pending_task: Mock, tag_rules_processor: KnowledgeRulesProcessor):
# test the correctness of the inner logic of process_batch.

# fill the db with 50 torrents
for _ in range(50):
tag_rules_processor.mds.TorrentMetadata(infohash=os.urandom(20), metadata_type=REGULAR_TORRENT)

tag_rules_processor.set_last_processed_torrent_id(10) # batch should start from 11
tag_rules_processor.batch_size = 30 # and process 30 entities

# first iteration
assert tag_rules_processor.process_batch() == 30
assert tag_rules_processor.get_last_processed_torrent_id() == 40
assert not mocked_cancel_pending_task.called # it should not be the last batch in the db

# second iteration
assert tag_rules_processor.process_batch() == 10
assert tag_rules_processor.get_last_processed_torrent_id() == 50
assert mocked_cancel_pending_task.called # it should be the last batch in the db


@db_session
@patch.object(KnowledgeRulesProcessor, 'register_task', new=MagicMock())
def test_start_no_previous_version(tag_rules_processor: KnowledgeRulesProcessor):
# test that if there is no previous version of the rules processor, it will be created
assert tag_rules_processor.get_rules_processor_version() == 0
assert tag_rules_processor.get_rules_processor_version() != tag_rules_processor.version

tag_rules_processor.start()

# version should be set to the current version
assert tag_rules_processor.get_rules_processor_version() == tag_rules_processor.version
# last processed torrent id should be set to 0
assert tag_rules_processor.get_last_processed_torrent_id() == 0


@db_session
@patch.object(KnowledgeRulesProcessor, 'register_task', new=MagicMock())
def test_start_previous_version(tag_rules_processor: KnowledgeRulesProcessor):
# test that if there is a previous version of the rules processor, it will be updated to the current
tag_rules_processor.set_rules_processor_version(tag_rules_processor.version - 1)
tag_rules_processor.set_last_processed_torrent_id(100)

tag_rules_processor.start()

# version should be set to the current version
assert tag_rules_processor.get_rules_processor_version() == tag_rules_processor.version
# last processed torrent id should be set to 0
assert tag_rules_processor.get_last_processed_torrent_id() == 0


@db_session
@patch.object(KnowledgeRulesProcessor, 'register_task', new=MagicMock())
def test_start_current_version(tag_rules_processor: KnowledgeRulesProcessor):
# test that if there is a current version of the rules processor, it will process the database from
# the last processed torrent id
tag_rules_processor.set_rules_processor_version(tag_rules_processor.version)
tag_rules_processor.set_last_processed_torrent_id(100)

tag_rules_processor.start()

# version should be the same
assert tag_rules_processor.get_rules_processor_version() == tag_rules_processor.version
# last processed torrent id should be the same
assert tag_rules_processor.get_last_processed_torrent_id() == 100


@db_session
@patch.object(KnowledgeRulesProcessor, 'register_task')
def test_start_batch_processing(mocked_register_task: Mock, tag_rules_processor: KnowledgeRulesProcessor):
# test that if there are torrents in the database, the batch processing will be started
tag_rules_processor.mds.TorrentMetadata(infohash=os.urandom(20), metadata_type=REGULAR_TORRENT)
tag_rules_processor.start()

assert mocked_register_task.called

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from tribler.core import notifications
from tribler.core.components.component import Component
from tribler.core.components.key.key_component import KeyComponent
from tribler.core.components.knowledge.rules.tag_rules_processor import KnowledgeRulesProcessor
from tribler.core.components.knowledge.rules.knowledge_rules_processor import KnowledgeRulesProcessor
from tribler.core.components.metadata_store.db.store import MetadataStore
from tribler.core.utilities.simpledefs import STATEDIR_DB_DIR

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pony.orm import db_session

from tribler.core.components.knowledge.db.knowledge_db import KnowledgeDatabase, ResourceType
from tribler.core.components.knowledge.rules.tag_rules_processor import KnowledgeRulesProcessor
from tribler.core.components.knowledge.rules.knowledge_rules_processor import KnowledgeRulesProcessor
from tribler.core.components.metadata_store.category_filter.family_filter import default_xxx_filter
from tribler.core.components.metadata_store.db.serialization import CHANNEL_TORRENT, COLLECTION_NODE, REGULAR_TORRENT
from tribler.core.components.metadata_store.db.store import MetadataStore
Expand Down