Skip to content

Commit

Permalink
Added missed KnowledgeRulesProcessor migration
Browse files Browse the repository at this point in the history
  • Loading branch information
drew2a committed Apr 20, 2023
1 parent 7d3562f commit b7da736
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from tribler.core.components.key.key_component import KeyComponent
from tribler.core.components.knowledge.community.knowledge_community import KnowledgeCommunity
from tribler.core.components.knowledge.db.knowledge_db import KnowledgeDatabase
from tribler.core.components.knowledge.rules.tag_rules_processor import KnowledgeRulesProcessor
from tribler.core.components.knowledge.rules.knowledge_rules_processor import KnowledgeRulesProcessor
from tribler.core.components.metadata_store.utils import generate_test_channels
from tribler.core.utilities.simpledefs import STATEDIR_DB_DIR

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

class KnowledgeRulesProcessor(TaskManager):
# this value must be incremented in the case of new rules set has been applied
version: int = 2
version: int = 3

def __init__(self, notifier: Notifier, db: KnowledgeDatabase, mds: MetadataStore,
batch_size: int = DEFAULT_BATCH_SIZE, interval: float = DEFAULT_INTERVAL):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from tribler.core import notifications
from tribler.core.components.knowledge.db.knowledge_db import ResourceType
from tribler.core.components.knowledge.rules.tag_rules_processor import KnowledgeRulesProcessor, \
from tribler.core.components.knowledge.rules.knowledge_rules_processor import KnowledgeRulesProcessor, \
LAST_PROCESSED_TORRENT_ID

TEST_BATCH_SIZE = 100
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from tribler.core import notifications
from tribler.core.components.component import Component
from tribler.core.components.key.key_component import KeyComponent
from tribler.core.components.knowledge.rules.tag_rules_processor import KnowledgeRulesProcessor
from tribler.core.components.knowledge.rules.knowledge_rules_processor import KnowledgeRulesProcessor
from tribler.core.components.metadata_store.db.store import MetadataStore
from tribler.core.utilities.simpledefs import STATEDIR_DB_DIR

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from pony.orm import db_session

from tribler.core.components.knowledge.db.knowledge_db import KnowledgeDatabase, ResourceType
from tribler.core.components.knowledge.rules.tag_rules_processor import KnowledgeRulesProcessor
from tribler.core.components.knowledge.rules.knowledge_rules_processor import KnowledgeRulesProcessor
from tribler.core.components.metadata_store.category_filter.family_filter import default_xxx_filter
from tribler.core.components.metadata_store.db.serialization import CHANNEL_TORRENT, COLLECTION_NODE, REGULAR_TORRENT
from tribler.core.components.metadata_store.db.store import MetadataStore
Expand Down
42 changes: 35 additions & 7 deletions src/tribler/core/upgrade/tags_to_knowledge/migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,25 @@

from tribler.core.components.knowledge.community.knowledge_payload import StatementOperation
from tribler.core.components.knowledge.db.knowledge_db import KnowledgeDatabase, ResourceType
from tribler.core.components.knowledge.rules.knowledge_rules_processor import LAST_PROCESSED_TORRENT_ID
from tribler.core.components.metadata_store.db.store import MetadataStore
from tribler.core.upgrade.tags_to_knowledge.tags_db import TagDatabase
from tribler.core.utilities.path_util import Path
from tribler.core.utilities.simpledefs import STATEDIR_DB_DIR
from tribler.core.utilities.unicode import hexlify


class MigrationTagsToKnowledge:
def __init__(self, state_dir: Path, key: LibNaCLSK):
def __init__(self, state_dir: Path, primary_key: LibNaCLSK, secondary_key: LibNaCLSK, channels_dir: Path):
self.logger = logging.getLogger(self.__class__.__name__)
self.state_dir = state_dir
self.key = key
self.primary_key = primary_key
self.secondary_key = secondary_key
self.channels_dir = channels_dir

self.tag_db_path = self.state_dir / STATEDIR_DB_DIR / 'tags.db'
self.knowledge_db_path = self.state_dir / STATEDIR_DB_DIR / 'knowledge.db'
self.mds_db_path = self.state_dir / STATEDIR_DB_DIR / 'metadata.db'

self.serializer = Serializer()
self.crypto = default_eccrypto
Expand All @@ -36,15 +41,40 @@ def run(self) -> bool:
self.logger.info("Tags DB doesn't exist. Stop procedure.")
return False

self.reset_last_processed_id()
self.convert_tags_to_knowledge()

return True

def reset_last_processed_id(self):
"""Reset the last processed torrent ID to 0. It is necessary for KnowledgeRulesProcessor to reprocess all
torrents in the metadata DB."""

if not self.mds_db_path.exists():
self.logger.warning('Metadata DB does not exist. Skip resetting the last processed torrent ID.')
return

mds_db = None
try:
mds_db = MetadataStore(self.mds_db_path, self.channels_dir, self.primary_key, disable_sync=True,
check_tables=False)
with db_session:
mds_db.set_value(LAST_PROCESSED_TORRENT_ID, '0')
except Exception as e:
self.logger.exception(e)
finally:
if mds_db:
mds_db.shutdown()

def convert_tags_to_knowledge(self):
tag_db = None
knowledge_db = None

try:
tag_db = TagDatabase(str(self.tag_db_path))
knowledge_db = KnowledgeDatabase(str(self.knowledge_db_path))
self.logger.info("Migrating the tags.db into the knowledge.db")

public_key = self.key.pub().key_to_bin()
public_key = self.secondary_key.pub().key_to_bin()

operations = list(self._read(tag_db, public_key))
self._write(knowledge_db, operations)
Expand All @@ -55,11 +85,9 @@ def run(self) -> bool:
tag_db.shutdown()
if knowledge_db:
knowledge_db.shutdown()

self.logger.info("Removing Tags DB")
self.tag_db_path.unlink(missing_ok=True)
self.logger.info("Tags DB has been removed")
return True

@db_session
def _read(self, tag_db: TagDatabase, public_key: bytes) -> Iterable[StatementOperation]:
Expand Down Expand Up @@ -99,4 +127,4 @@ def _write(self, knowledge_db: KnowledgeDatabase, operations: List[StatementOper

def _sign(self, operation) -> bytes:
packed = self.serializer.pack_serializable(operation)
return self.crypto.create_signature(self.key, packed)
return self.crypto.create_signature(self.secondary_key, packed)
36 changes: 32 additions & 4 deletions src/tribler/core/upgrade/tags_to_knowledge/tests/test_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

from tribler.core.components.knowledge.community.knowledge_payload import StatementOperation
from tribler.core.components.knowledge.db.knowledge_db import KnowledgeDatabase
from tribler.core.components.knowledge.rules.knowledge_rules_processor import LAST_PROCESSED_TORRENT_ID
from tribler.core.components.metadata_store.db.store import MetadataStore
from tribler.core.upgrade.tags_to_knowledge.migration import MigrationTagsToKnowledge
from tribler.core.upgrade.tags_to_knowledge.tags_db import TagDatabase
from tribler.core.utilities.simpledefs import STATEDIR_DB_DIR
Expand All @@ -18,7 +20,12 @@
def migration(tmp_path: Path):
db_dir = tmp_path / STATEDIR_DB_DIR
db_dir.mkdir()
migration = MigrationTagsToKnowledge(tmp_path, LibNaCLSK())
migration = MigrationTagsToKnowledge(
state_dir=tmp_path,
primary_key=LibNaCLSK(),
secondary_key=LibNaCLSK(),
channels_dir=tmp_path
)
return migration


Expand Down Expand Up @@ -90,12 +97,12 @@ def clock() -> int:

def verify_signature(o: StatementOperation, signature: bytes):
packed = migration.serializer.pack_serializable(o)
if not migration.crypto.is_valid_signature(migration.key, packed, signature):
if not migration.crypto.is_valid_signature(migration.secondary_key, packed, signature):
raise InvalidSignature(f'Invalid signature for {o}')

###############################
tag_db = TagDatabase(str(migration.tag_db_path))
fill_db(migration.key)
fill_db(migration.secondary_key)
tag_db.shutdown()

assert migration.run()
Expand All @@ -105,7 +112,7 @@ def verify_signature(o: StatementOperation, signature: bytes):
with db_session:
# assert only local peer is here
peer = knowledge_db.instance.Peer.get()
assert peer.public_key == migration.key.pub().key_to_bin()
assert peer.public_key == migration.secondary_key.pub().key_to_bin()

# assert only local operations were added
operations = set(knowledge_db.instance.StatementOp.select())
Expand All @@ -132,3 +139,24 @@ def verify_signature(o: StatementOperation, signature: bytes):
creator_public_key=operation.peer.public_key
)
verify_signature(operation, signature)


def test_last_processed_torrent_id(migration: MigrationTagsToKnowledge):
# Test that the migration resets the `LAST_PROCESSED_TORRENT_ID` value to 0.

mds_db = MetadataStore(migration.mds_db_path, channels_dir=migration.channels_dir, my_key=migration.primary_key)
tags_db = TagDatabase(str(migration.tag_db_path))
with db_session:
# set last processed ID to some value
mds_db.set_value(LAST_PROCESSED_TORRENT_ID, 'value')
assert mds_db.get_value(LAST_PROCESSED_TORRENT_ID) == 'value'
mds_db.shutdown()
tags_db.shutdown()

assert migration.run()

# check that the value was reset
mds_db = MetadataStore(migration.mds_db_path, channels_dir=migration.channels_dir, my_key=migration.primary_key)
with db_session:
assert mds_db.get_value(LAST_PROCESSED_TORRENT_ID) == '0'
mds_db.shutdown()
2 changes: 1 addition & 1 deletion src/tribler/core/upgrade/upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def remove_old_logs(self) -> Tuple[List[Path], List[Path]]:
return removed_files, left_files

def upgrade_tags_to_knowledge(self):
migration = MigrationTagsToKnowledge(self.state_dir, self.secondary_key)
migration = MigrationTagsToKnowledge(self.state_dir, self.primary_key, self.secondary_key, self.channels_dir)
migration.run()

def upgrade_pony_db_14to15(self):
Expand Down

0 comments on commit b7da736

Please sign in to comment.