diff --git a/src/tribler/core/components/database/db/serialization.py b/src/tribler/core/components/database/db/serialization.py index 47f209864be..733a218bd29 100644 --- a/src/tribler/core/components/database/db/serialization.py +++ b/src/tribler/core/components/database/db/serialization.py @@ -60,7 +60,7 @@ def read_payload_with_offset(data, offset=0): metadata_type = struct.unpack_from('>H', data, offset=offset)[0] if metadata_type != REGULAR_TORRENT: - return + raise UnknownBlobTypeException payload, offset = default_serializer.unpack_serializable(TorrentMetadataPayload, data, offset=offset) payload.signature = data[offset: offset + 64] diff --git a/src/tribler/core/components/database/db/store.py b/src/tribler/core/components/database/db/store.py index 36e4d18fbbf..858a71f8f83 100644 --- a/src/tribler/core/components/database/db/store.py +++ b/src/tribler/core/components/database/db/store.py @@ -1,7 +1,7 @@ import enum -from dataclasses import dataclass, field import logging import re +from dataclasses import dataclass, field from datetime import datetime, timedelta from time import sleep, time from typing import Optional, Union @@ -15,15 +15,8 @@ from tribler.core.components.database.db.orm_bindings import misc, torrent_metadata, torrent_state as torrent_state_, \ tracker_state from tribler.core.components.database.db.orm_bindings.torrent_metadata import NULL_KEY_SUBST -from tribler.core.components.database.db.serialization import ( - CHANNEL_TORRENT, - COLLECTION_NODE, - HealthItemsPayload, - REGULAR_TORRENT, - read_payload_with_offset, - NULL_KEY -) - +from tribler.core.components.database.db.serialization import (CHANNEL_TORRENT, COLLECTION_NODE, HealthItemsPayload, + NULL_KEY, REGULAR_TORRENT, read_payload_with_offset) from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HealthInfo from tribler.core.utilities.db_corruption_handling.base import DatabaseIsCorrupted, handle_db_if_corrupted from tribler.core.utilities.notifier import Notifier @@ -255,13 +248,13 @@ async def process_compressed_mdblob_threaded(self, compressed_data, **kwargs): try: return await run_threaded(self.db, self.process_compressed_mdblob, compressed_data, **kwargs) except DatabaseIsCorrupted: - raise # re-raise this exception and terminate the Core process + raise # re-raise this exception and terminate the Core process except Exception as e: # pylint: disable=broad-except # pragma: no cover self._logger.exception("DB transaction error when tried to process compressed mdblob: " f"{e.__class__.__name__}: {e}", exc_info=e) return [] - def process_compressed_mdblob(self, compressed_data, **kwargs): + def process_compressed_mdblob(self, compressed_data, skip_personal_metadata_payload=True): try: with LZ4FrameDecompressor() as decompressor: decompressed_data = decompressor.decompress(compressed_data) @@ -278,7 +271,8 @@ def process_compressed_mdblob(self, compressed_data, **kwargs): self._logger.warning(f"Unable to parse health information: {type(e).__name__}: {str(e)}") raise - return self.process_squashed_mdblob(decompressed_data, health_info=health_info, **kwargs) + return self.process_squashed_mdblob(decompressed_data, health_info=health_info, + skip_personal_metadata_payload=skip_personal_metadata_payload) def process_torrent_health(self, health: HealthInfo) -> bool: """ @@ -305,7 +299,8 @@ def process_torrent_health(self, health: HealthInfo) -> bool: return False - def process_squashed_mdblob(self, chunk_data, external_thread=False, health_info=None, **kwargs): + def process_squashed_mdblob(self, chunk_data, external_thread=False, health_info=None, + skip_personal_metadata_payload=True): """ Process raw concatenated payloads blob. This routine breaks the database access into smaller batches. It uses a congestion-control like algorithm to determine the optimal batch size, targeting the @@ -344,7 +339,7 @@ def process_squashed_mdblob(self, chunk_data, external_thread=False, health_info # We separate the sessions to minimize database locking. with db_session(immediate=True): for payload in batch: - result.extend(self.process_payload(payload, **kwargs)) + result.extend(self.process_payload(payload, skip_personal_metadata_payload)) # Batch size adjustment batch_end_time = datetime.now() - batch_start_time @@ -374,7 +369,7 @@ def process_squashed_mdblob(self, chunk_data, external_thread=False, health_info return result @db_session - def process_payload(self, payload, skip_personal_metadata_payload=True, **kwargs): + def process_payload(self, payload, skip_personal_metadata_payload=True): # Don't process our own torrents if skip_personal_metadata_payload and payload.public_key == self.my_public_key_bin: return [] diff --git a/src/tribler/core/components/database/db/tests/test_serialization.py b/src/tribler/core/components/database/db/tests/test_serialization.py index 68aa2c510b5..30b3da4e1b7 100644 --- a/src/tribler/core/components/database/db/tests/test_serialization.py +++ b/src/tribler/core/components/database/db/tests/test_serialization.py @@ -1,6 +1,10 @@ from datetime import datetime +from unittest.mock import Mock, patch -from tribler.core.components.database.db.serialization import TorrentMetadataPayload +import pytest + +from tribler.core.components.database.db.serialization import TorrentMetadataPayload, UnknownBlobTypeException, \ + read_payload_with_offset def test_fix_torrent_metadata_payload(): @@ -24,3 +28,10 @@ def test_torrent_metadata_payload_magnet(): expected = "magnet:?xt=urn:btih:000102030405060708090a0b0c0d0e0f10111213&dn=b'title'&tr=b'tracker_info'" assert expected == payload.get_magnet() + + +@patch('struct.unpack_from', Mock(return_value=(301,))) +def test_read_payload_with_offset_exception(): + # Test that an exception is raised when metadata_type != REGULAR_TORRENT + with pytest.raises(UnknownBlobTypeException): + read_payload_with_offset(b'')