From 64dd187f29f4956b86775f58dbe5dc3f2cd038d5 Mon Sep 17 00:00:00 2001 From: drew2a Date: Thu, 20 Apr 2023 14:54:02 +0200 Subject: [PATCH 1/2] Add reprocess feature for KnowledgeRulesProcessor --- .../knowledge/knowledge_component.py | 2 +- ...cessor.py => knowledge_rules_processor.py} | 24 +++++++++++++++---- ...r.py => test_knowledge_rules_processor.py} | 2 +- .../metadata_store_component.py | 2 +- .../restapi/metadata_endpoint_base.py | 2 +- 5 files changed, 23 insertions(+), 9 deletions(-) rename src/tribler/core/components/knowledge/rules/{tag_rules_processor.py => knowledge_rules_processor.py} (83%) rename src/tribler/core/components/knowledge/rules/tests/{test_tag_rules_processor.py => test_knowledge_rules_processor.py} (98%) diff --git a/src/tribler/core/components/knowledge/knowledge_component.py b/src/tribler/core/components/knowledge/knowledge_component.py index 5c343c8a01d..d5c8d9745ec 100644 --- a/src/tribler/core/components/knowledge/knowledge_component.py +++ b/src/tribler/core/components/knowledge/knowledge_component.py @@ -4,7 +4,7 @@ from tribler.core.components.key.key_component import KeyComponent from tribler.core.components.knowledge.community.knowledge_community import KnowledgeCommunity from tribler.core.components.knowledge.db.knowledge_db import KnowledgeDatabase -from tribler.core.components.knowledge.rules.tag_rules_processor import KnowledgeRulesProcessor +from tribler.core.components.knowledge.rules.knowledge_rules_processor import KnowledgeRulesProcessor from tribler.core.components.metadata_store.utils import generate_test_channels from tribler.core.utilities.simpledefs import STATEDIR_DB_DIR diff --git a/src/tribler/core/components/knowledge/rules/tag_rules_processor.py b/src/tribler/core/components/knowledge/rules/knowledge_rules_processor.py similarity index 83% rename from src/tribler/core/components/knowledge/rules/tag_rules_processor.py rename to src/tribler/core/components/knowledge/rules/knowledge_rules_processor.py index 5ba8642d5c7..299292391e0 100644 --- a/src/tribler/core/components/knowledge/rules/tag_rules_processor.py +++ b/src/tribler/core/components/knowledge/rules/knowledge_rules_processor.py @@ -18,11 +18,12 @@ DEFAULT_BATCH_SIZE = 1000 LAST_PROCESSED_TORRENT_ID = 'last_processed_torrent_id' +RULES_PROCESSOR_VERSION = 'rules_processor_version' class KnowledgeRulesProcessor(TaskManager): # this value must be incremented in the case of new rules set has been applied - version: int = 2 + version: int = 3 def __init__(self, notifier: Notifier, db: KnowledgeDatabase, mds: MetadataStore, batch_size: int = DEFAULT_BATCH_SIZE, interval: float = DEFAULT_INTERVAL): @@ -45,14 +46,18 @@ def __init__(self, notifier: Notifier, db: KnowledgeDatabase, mds: MetadataStore def start(self): self.logger.info('Start') + rules_processor_version = self.get_rules_processor_version() + if rules_processor_version < self.version: + # the database was processed by the previous version of the rules processor + self.set_last_processed_torrent_id(0) # reset the last processed torrent id + self.set_rules_processor_version(self.version) + max_row_id = self.mds.get_max_rowid() is_finished = self.get_last_processed_torrent_id() >= max_row_id if not is_finished: self.logger.info(f'Register process_batch task with interval: {self.interval} sec') - self.register_task(name=self.process_batch.__name__, - interval=self.interval, - task=self.process_batch) + self.register_task(name=self.process_batch.__name__, interval=self.interval, task=self.process_batch) async def shutdown(self): await self.shutdown_task_manager() @@ -77,7 +82,7 @@ def query(_start, _end): torrent.tag_processor_version = self.version processed += 1 - self.mds.set_value(LAST_PROCESSED_TORRENT_ID, str(end)) + self.set_last_processed_torrent_id(end) self.logger.info(f'Processed: {processed} titles. Added {added} tags.') is_finished = end >= max_row_id @@ -108,3 +113,12 @@ def save_statements(self, subject_type: ResourceType, subject: str, predicate: R def get_last_processed_torrent_id(self) -> int: return int(self.mds.get_value(LAST_PROCESSED_TORRENT_ID, default='0')) + + def set_last_processed_torrent_id(self, value: int): + self.mds.set_value(LAST_PROCESSED_TORRENT_ID, str(value)) + + def get_rules_processor_version(self) -> int: + return int(self.mds.get_value(RULES_PROCESSOR_VERSION, default='0')) + + def set_rules_processor_version(self, version: int): + self.mds.set_value(RULES_PROCESSOR_VERSION, str(version)) diff --git a/src/tribler/core/components/knowledge/rules/tests/test_tag_rules_processor.py b/src/tribler/core/components/knowledge/rules/tests/test_knowledge_rules_processor.py similarity index 98% rename from src/tribler/core/components/knowledge/rules/tests/test_tag_rules_processor.py rename to src/tribler/core/components/knowledge/rules/tests/test_knowledge_rules_processor.py index ff6c7bd2a72..67f12aff249 100644 --- a/src/tribler/core/components/knowledge/rules/tests/test_tag_rules_processor.py +++ b/src/tribler/core/components/knowledge/rules/tests/test_knowledge_rules_processor.py @@ -5,7 +5,7 @@ from tribler.core import notifications from tribler.core.components.knowledge.db.knowledge_db import ResourceType -from tribler.core.components.knowledge.rules.tag_rules_processor import KnowledgeRulesProcessor, \ +from tribler.core.components.knowledge.rules.knowledge_rules_processor import KnowledgeRulesProcessor, \ LAST_PROCESSED_TORRENT_ID TEST_BATCH_SIZE = 100 diff --git a/src/tribler/core/components/metadata_store/metadata_store_component.py b/src/tribler/core/components/metadata_store/metadata_store_component.py index 477d682480b..4c16e328c6e 100644 --- a/src/tribler/core/components/metadata_store/metadata_store_component.py +++ b/src/tribler/core/components/metadata_store/metadata_store_component.py @@ -1,7 +1,7 @@ from tribler.core import notifications from tribler.core.components.component import Component from tribler.core.components.key.key_component import KeyComponent -from tribler.core.components.knowledge.rules.tag_rules_processor import KnowledgeRulesProcessor +from tribler.core.components.knowledge.rules.knowledge_rules_processor import KnowledgeRulesProcessor from tribler.core.components.metadata_store.db.store import MetadataStore from tribler.core.utilities.simpledefs import STATEDIR_DB_DIR diff --git a/src/tribler/core/components/metadata_store/restapi/metadata_endpoint_base.py b/src/tribler/core/components/metadata_store/restapi/metadata_endpoint_base.py index 73b3a389544..77f0b6e18e3 100644 --- a/src/tribler/core/components/metadata_store/restapi/metadata_endpoint_base.py +++ b/src/tribler/core/components/metadata_store/restapi/metadata_endpoint_base.py @@ -4,7 +4,7 @@ from pony.orm import db_session from tribler.core.components.knowledge.db.knowledge_db import KnowledgeDatabase, ResourceType -from tribler.core.components.knowledge.rules.tag_rules_processor import KnowledgeRulesProcessor +from tribler.core.components.knowledge.rules.knowledge_rules_processor import KnowledgeRulesProcessor from tribler.core.components.metadata_store.category_filter.family_filter import default_xxx_filter from tribler.core.components.metadata_store.db.serialization import CHANNEL_TORRENT, COLLECTION_NODE, REGULAR_TORRENT from tribler.core.components.metadata_store.db.store import MetadataStore From 7e1d8a5f41e320b33082766d6b24f45286085d82 Mon Sep 17 00:00:00 2001 From: drew2a Date: Thu, 20 Apr 2023 16:28:39 +0200 Subject: [PATCH 2/2] Add tests --- .../rules/knowledge_rules_processor.py | 7 +- .../tests/test_knowledge_rules_processor.py | 110 ++++++++++++------ 2 files changed, 82 insertions(+), 35 deletions(-) diff --git a/src/tribler/core/components/knowledge/rules/knowledge_rules_processor.py b/src/tribler/core/components/knowledge/rules/knowledge_rules_processor.py index 299292391e0..dbec41dd1fa 100644 --- a/src/tribler/core/components/knowledge/rules/knowledge_rules_processor.py +++ b/src/tribler/core/components/knowledge/rules/knowledge_rules_processor.py @@ -23,7 +23,7 @@ class KnowledgeRulesProcessor(TaskManager): # this value must be incremented in the case of new rules set has been applied - version: int = 3 + version: int = 4 def __init__(self, notifier: Notifier, db: KnowledgeDatabase, mds: MetadataStore, batch_size: int = DEFAULT_BATCH_SIZE, interval: float = DEFAULT_INTERVAL): @@ -49,7 +49,8 @@ def start(self): rules_processor_version = self.get_rules_processor_version() if rules_processor_version < self.version: # the database was processed by the previous version of the rules processor - self.set_last_processed_torrent_id(0) # reset the last processed torrent id + self.logger.info('New version of rules processor is available. Starting knowledge generation from scratch.') + self.set_last_processed_torrent_id(0) self.set_rules_processor_version(self.version) max_row_id = self.mds.get_max_rowid() @@ -58,6 +59,8 @@ def start(self): if not is_finished: self.logger.info(f'Register process_batch task with interval: {self.interval} sec') self.register_task(name=self.process_batch.__name__, interval=self.interval, task=self.process_batch) + else: + self.logger.info(f'Database processing is finished. Last processed torrent id: {max_row_id}') async def shutdown(self): await self.shutdown_task_manager() diff --git a/src/tribler/core/components/knowledge/rules/tests/test_knowledge_rules_processor.py b/src/tribler/core/components/knowledge/rules/tests/test_knowledge_rules_processor.py index 67f12aff249..c3dbf00a078 100644 --- a/src/tribler/core/components/knowledge/rules/tests/test_knowledge_rules_processor.py +++ b/src/tribler/core/components/knowledge/rules/tests/test_knowledge_rules_processor.py @@ -1,12 +1,17 @@ -from types import SimpleNamespace -from unittest.mock import MagicMock, patch +import os +from unittest.mock import MagicMock, Mock, patch import pytest +from ipv8.keyvault.private.libnaclkey import LibNaCLSK +from pony.orm import db_session from tribler.core import notifications from tribler.core.components.knowledge.db.knowledge_db import ResourceType -from tribler.core.components.knowledge.rules.knowledge_rules_processor import KnowledgeRulesProcessor, \ - LAST_PROCESSED_TORRENT_ID +from tribler.core.components.knowledge.rules.knowledge_rules_processor import KnowledgeRulesProcessor +from tribler.core.components.metadata_store.db.serialization import REGULAR_TORRENT +from tribler.core.components.metadata_store.db.store import MetadataStore +from tribler.core.utilities.path_util import Path +from tribler.core.utilities.utilities import MEMORY_DB TEST_BATCH_SIZE = 100 TEST_INTERVAL = 0.1 @@ -14,8 +19,9 @@ # pylint: disable=redefined-outer-name, protected-access @pytest.fixture -async def tag_rules_processor(): - processor = KnowledgeRulesProcessor(notifier=MagicMock(), db=MagicMock(), mds=MagicMock(), +async def tag_rules_processor(tmp_path: Path): + mds = MetadataStore(db_filename=MEMORY_DB, channels_dir=tmp_path, my_key=LibNaCLSK()) + processor = KnowledgeRulesProcessor(notifier=MagicMock(), db=MagicMock(), mds=mds, batch_size=TEST_BATCH_SIZE, interval=TEST_INTERVAL) yield processor @@ -63,43 +69,81 @@ def test_save_tags(tag_rules_processor: KnowledgeRulesProcessor): assert [c for c in actual_calls if c not in expected_calls] == [] +@db_session @patch.object(KnowledgeRulesProcessor, 'process_torrent_title', new=MagicMock(return_value=1)) -def test_process_batch_within_the_boundary(tag_rules_processor: KnowledgeRulesProcessor): - # test inner logic of `process_batch` in case this batch located within the boundary - returned_batch_size = TEST_BATCH_SIZE // 2 # let's return a half of requested items +@patch.object(KnowledgeRulesProcessor, 'cancel_pending_task') +def test_process_batch(mocked_cancel_pending_task: Mock, tag_rules_processor: KnowledgeRulesProcessor): + # test the correctness of the inner logic of process_batch. - def select(_): - return [SimpleNamespace(infohash=i, title=i) for i in range(returned_batch_size)] + # fill the db with 50 torrents + for _ in range(50): + tag_rules_processor.mds.TorrentMetadata(infohash=os.urandom(20), metadata_type=REGULAR_TORRENT) - tag_rules_processor.mds.TorrentMetadata.select = select - tag_rules_processor.mds.get_value = lambda *_, **__: 0 # let's start from 0 for LAST_PROCESSED_TORRENT_ID + tag_rules_processor.set_last_processed_torrent_id(10) # batch should start from 11 + tag_rules_processor.batch_size = 30 # and process 30 entities - # let's specify `max_rowid` in such a way that it is far more than end of the current batch - tag_rules_processor.mds.get_max_rowid = lambda: TEST_BATCH_SIZE * 10 + # first iteration + assert tag_rules_processor.process_batch() == 30 + assert tag_rules_processor.get_last_processed_torrent_id() == 40 + assert not mocked_cancel_pending_task.called # it should not be the last batch in the db - # assert that actually returned count of processed items is equal to `returned_batch_size` - assert tag_rules_processor.process_batch() == returned_batch_size + # second iteration + assert tag_rules_processor.process_batch() == 10 + assert tag_rules_processor.get_last_processed_torrent_id() == 50 + assert mocked_cancel_pending_task.called # it should be the last batch in the db - # assert that actually stored last_processed_torrent_id is equal to `TEST_BATCH_SIZE` - tag_rules_processor.mds.set_value.assert_called_with(LAST_PROCESSED_TORRENT_ID, str(TEST_BATCH_SIZE)) +@db_session +@patch.object(KnowledgeRulesProcessor, 'register_task', new=MagicMock()) +def test_start_no_previous_version(tag_rules_processor: KnowledgeRulesProcessor): + # test that if there is no previous version of the rules processor, it will be created + assert tag_rules_processor.get_rules_processor_version() == 0 + assert tag_rules_processor.get_rules_processor_version() != tag_rules_processor.version -@patch.object(KnowledgeRulesProcessor, 'process_torrent_title', new=MagicMock(return_value=1)) -def test_process_batch_beyond_the_boundary(tag_rules_processor: KnowledgeRulesProcessor): - # test inner logic of `process_batch` in case this batch located on a border - returned_batch_size = TEST_BATCH_SIZE // 2 # let's return a half of requested items + tag_rules_processor.start() + + # version should be set to the current version + assert tag_rules_processor.get_rules_processor_version() == tag_rules_processor.version + # last processed torrent id should be set to 0 + assert tag_rules_processor.get_last_processed_torrent_id() == 0 + + +@db_session +@patch.object(KnowledgeRulesProcessor, 'register_task', new=MagicMock()) +def test_start_previous_version(tag_rules_processor: KnowledgeRulesProcessor): + # test that if there is a previous version of the rules processor, it will be updated to the current + tag_rules_processor.set_rules_processor_version(tag_rules_processor.version - 1) + tag_rules_processor.set_last_processed_torrent_id(100) + + tag_rules_processor.start() + + # version should be set to the current version + assert tag_rules_processor.get_rules_processor_version() == tag_rules_processor.version + # last processed torrent id should be set to 0 + assert tag_rules_processor.get_last_processed_torrent_id() == 0 + + +@db_session +@patch.object(KnowledgeRulesProcessor, 'register_task', new=MagicMock()) +def test_start_current_version(tag_rules_processor: KnowledgeRulesProcessor): + # test that if there is a current version of the rules processor, it will process the database from + # the last processed torrent id + tag_rules_processor.set_rules_processor_version(tag_rules_processor.version) + tag_rules_processor.set_last_processed_torrent_id(100) - # let's specify `max_rowid` in such a way that it is less than end of the current batch - max_rowid = returned_batch_size // 2 + tag_rules_processor.start() - def select(_): - return [SimpleNamespace(infohash=i, title=i) for i in range(returned_batch_size)] + # version should be the same + assert tag_rules_processor.get_rules_processor_version() == tag_rules_processor.version + # last processed torrent id should be the same + assert tag_rules_processor.get_last_processed_torrent_id() == 100 - tag_rules_processor.mds.get_value = lambda *_, **__: 0 # let's start from 0 for LAST_PROCESSED_TORRENT_ID - tag_rules_processor.mds.TorrentMetadata.select = select - tag_rules_processor.mds.get_max_rowid = lambda: max_rowid +@db_session +@patch.object(KnowledgeRulesProcessor, 'register_task') +def test_start_batch_processing(mocked_register_task: Mock, tag_rules_processor: KnowledgeRulesProcessor): + # test that if there are torrents in the database, the batch processing will be started + tag_rules_processor.mds.TorrentMetadata(infohash=os.urandom(20), metadata_type=REGULAR_TORRENT) + tag_rules_processor.start() - # assert that actually returned count of processed items is equal to `max_rowid` - assert tag_rules_processor.process_batch() == returned_batch_size - tag_rules_processor.mds.set_value.assert_called_with(LAST_PROCESSED_TORRENT_ID, str(max_rowid)) + assert mocked_register_task.called