Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove code related to num_entries #7874

Merged
merged 1 commit into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,26 @@
from unittest.mock import AsyncMock, Mock, patch

import pytest

from ipv8.keyvault.crypto import default_eccrypto
from ipv8.messaging.payload import IntroductionRequestPayload
from pony.orm import OperationalError, db_session

from ipv8.messaging.serialization import default_serializer
from ipv8.test.base import TestBase
from ipv8.test.mocking.ipv8 import MockIPv8
from pony.orm import OperationalError, db_session

from tribler.core import notifications
from tribler.core.components.content_discovery.community.payload import SelectResponsePayload, VersionResponse, \
TorrentsHealthPayload, PopularTorrentsRequest
from tribler.core.components.content_discovery.community.content_discovery_community import ContentDiscoveryCommunity
from tribler.core.components.content_discovery.community.payload import PopularTorrentsRequest, SelectResponsePayload, \
TorrentsHealthPayload, VersionResponse
from tribler.core.components.content_discovery.community.settings import ContentDiscoverySettings
from tribler.core.components.database.db.layers.knowledge_data_access_layer import KnowledgeDataAccessLayer, \
ResourceType, SHOW_THRESHOLD
from tribler.core.components.database.db.layers.tests.test_knowledge_data_access_layer_base import \
Resource, TestKnowledgeAccessLayerBase
from tribler.core.components.database.db.tribler_database import TriblerDatabase
from tribler.core.components.database.db.orm_bindings.torrent_metadata import LZ4_EMPTY_ARCHIVE, NEW
from tribler.core.components.database.db.serialization import CHANNEL_THUMBNAIL, NULL_KEY, REGULAR_TORRENT
from tribler.core.components.database.db.serialization import NULL_KEY, REGULAR_TORRENT
from tribler.core.components.database.db.store import MetadataStore
from tribler.core.components.content_discovery.community.content_discovery_community import ContentDiscoveryCommunity
from tribler.core.components.database.db.tribler_database import TriblerDatabase
from tribler.core.components.torrent_checker.torrent_checker.torrent_checker import TorrentChecker
from tribler.core.components.torrent_checker.torrent_checker.torrentchecker_session import HealthInfo
from tribler.core.tests.tools.base_test import MockObject
Expand Down Expand Up @@ -163,15 +162,15 @@ async def test_torrents_health_gossip_no_checker(self):
"""
self.overlay(0).composition.torrent_checker = None

with self.assertReceivedBy(1, [], message_filter = [TorrentsHealthPayload]):
with self.assertReceivedBy(1, [], message_filter=[TorrentsHealthPayload]):
self.overlay(0).gossip_random_torrents_health()
await self.deliver_messages()

async def test_torrents_health_gossip_no_live(self):
"""
Test whether torrent health information is spread when no live torrents are known
"""
with self.assertReceivedBy(1, [TorrentsHealthPayload], message_filter = [TorrentsHealthPayload]) as received:
with self.assertReceivedBy(1, [TorrentsHealthPayload], message_filter=[TorrentsHealthPayload]) as received:
self.overlay(0).gossip_random_torrents_health()
await self.deliver_messages()
message, = received
Expand Down Expand Up @@ -489,7 +488,6 @@ def callback(_, results):
assert obj.title == 'title1'
assert obj.health.seeders == 0


async def test_remote_select_packets_limit(self):
"""
Test dropping packets that go over the response limit for a remote select.
Expand All @@ -502,6 +500,7 @@ async def test_remote_select_packets_limit(self):

def add_result(request, processing_results):
add_result.result_count += 1

add_result.result_count = 0

expected = [SelectResponsePayload]
Expand All @@ -516,7 +515,7 @@ def add_result(request, processing_results):
while add_result.result_count < 10:
await asyncio.sleep(0.1)

assert [] == self.overlay(1).request_cache.get_tasks() # The list of outstanding requests should be empty
assert [] == self.overlay(1).request_cache.get_tasks() # The list of outstanding requests should be empty
assert add_result.result_count == 10 # The packet limit is 10

def test_sanitize_query(self):
Expand Down Expand Up @@ -629,7 +628,7 @@ async def test_remote_query_big_response(self):
with db_session:
add_random_torrent(self.metadata_store(1).TorrentMetadata, name=hexlify(value).decode())

kwargs_dict = {"metadata_type": [CHANNEL_THUMBNAIL]}
kwargs_dict = {"metadata_type": [REGULAR_TORRENT]}
callback = Mock()
self.overlay(0).send_remote_select(self.peer(1), **kwargs_dict, processing_callback=callback)

Expand Down Expand Up @@ -734,8 +733,8 @@ def slow_get_entries(self, *args, **kwargs):
time.sleep(0.1)
return original_get_entries(self, *args, **kwargs)

with patch.object(self.overlay(0), 'logger') as logger,\
patch.object(MetadataStore, 'get_entries', slow_get_entries):
with patch.object(self.overlay(0), 'logger') as logger, \
patch.object(MetadataStore, 'get_entries', slow_get_entries):
await self.deliver_messages(timeout=0.5)

torrents1 = list(self.metadata_store(1).get_entries(**kwargs1))
Expand Down Expand Up @@ -772,7 +771,6 @@ async def test_deprecated_popular_torrents_request_no_live(self):
assert message.random_torrents == []
assert message.torrents_checked == []


async def test_deprecated_popular_torrents_request_live(self):
"""
The new protocol no longer uses PopularTorrentsRequest but still supports it.
Expand All @@ -788,4 +786,4 @@ async def test_deprecated_popular_torrents_request_live(self):
assert message.random_torrents_length == 0
assert message.torrents_checked_length == 1
assert message.random_torrents == []
assert message.torrents_checked[0] == (b'00000000000000000000', 200, 0, message.torrents_checked[0][3])
assert message.torrents_checked[0] == (b'00000000000000000000', 200, 0, message.torrents_checked[0][3])
84 changes: 10 additions & 74 deletions src/tribler/core/components/database/db/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from ipv8.keyvault.crypto import default_eccrypto
from ipv8.messaging.lazy_payload import VariablePayload, vp_compile
from ipv8.messaging.serialization import default_serializer, VarLenUtf8
from ipv8.messaging.serialization import VarLenUtf8, default_serializer

from tribler.core.utilities.unicode import hexlify

Expand All @@ -18,18 +18,9 @@
NULL_SIG = b'\x00' * 64
NULL_KEY = b'\x00' * 64

# Metadata types. Should have been an enum, but in Python its unwieldy.
TYPELESS = 100
CHANNEL_NODE = 200
METADATA_NODE = 210
COLLECTION_NODE = 220
JSON_NODE = 230
CHANNEL_DESCRIPTION = 231
BINARY_NODE = 240
CHANNEL_THUMBNAIL = 241
REGULAR_TORRENT = 300
CHANNEL_TORRENT = 400
DELETED = 500
SNIPPET = 600


Expand Down Expand Up @@ -67,14 +58,13 @@ class UnknownBlobTypeException(Exception):
def read_payload_with_offset(data, offset=0):
# First we have to determine the actual payload type
metadata_type = struct.unpack_from('>H', data, offset=offset)[0]
payload_class = METADATA_TYPE_TO_PAYLOAD_CLASS.get(metadata_type)
if payload_class is not None:
payload, offset = default_serializer.unpack_serializable(payload_class, data, offset=offset)
payload.signature = data[offset: offset + 64]
return payload, offset + 64

# Unknown metadata type, raise exception
raise UnknownBlobTypeException
if metadata_type != REGULAR_TORRENT:
raise UnknownBlobTypeException

payload, offset = default_serializer.unpack_serializable(TorrentMetadataPayload, data, offset=offset)
payload.signature = data[offset: offset + 64]
return payload, offset + 64


@vp_compile
Expand Down Expand Up @@ -110,9 +100,9 @@ def has_signature(self):

def check_signature(self):
return default_eccrypto.is_valid_signature(
default_eccrypto.key_from_public_bin(b"LibNaCLPK:" + self.public_key),
self.serialized(),
self.signature
default_eccrypto.key_from_public_bin(b"LibNaCLPK:" + self.public_key),
self.serialized(),
self.signature
)


Expand All @@ -122,30 +112,6 @@ class ChannelNodePayload(SignedPayload):
format_list = SignedPayload.format_list + ['Q', 'Q', 'Q']


@vp_compile
class MetadataNodePayload(ChannelNodePayload):
names = ChannelNodePayload.names + ['title', 'tags']
format_list = ChannelNodePayload.format_list + ['varlenIutf8', 'varlenIutf8']


@vp_compile
class JsonNodePayload(ChannelNodePayload):
names = ChannelNodePayload.names + ['json_text']
format_list = ChannelNodePayload.format_list + ['varlenIutf8']


@vp_compile
class BinaryNodePayload(ChannelNodePayload):
names = ChannelNodePayload.names + ['binary_data', 'data_type']
format_list = ChannelNodePayload.format_list + ['varlenI', 'varlenIutf8']


@vp_compile
class CollectionNodePayload(MetadataNodePayload):
names = MetadataNodePayload.names + ['num_entries']
format_list = MetadataNodePayload.format_list + ['Q']


@vp_compile
class TorrentMetadataPayload(ChannelNodePayload):
"""
Expand All @@ -170,36 +136,6 @@ def get_magnet(self):
)


@vp_compile
class ChannelMetadataPayload(TorrentMetadataPayload):
"""
Payload for metadata that stores a channel.
"""

names = TorrentMetadataPayload.names + ['num_entries', 'start_timestamp']
format_list = TorrentMetadataPayload.format_list + ['Q'] + ['Q']


@vp_compile
class DeletedMetadataPayload(SignedPayload):
"""
Payload for metadata that stores deleted metadata.
"""

names = SignedPayload.names + ['delete_signature']
format_list = SignedPayload.format_list + ['64s']


METADATA_TYPE_TO_PAYLOAD_CLASS = {
REGULAR_TORRENT: TorrentMetadataPayload,
CHANNEL_TORRENT: ChannelMetadataPayload,
COLLECTION_NODE: CollectionNodePayload,
CHANNEL_THUMBNAIL: BinaryNodePayload,
CHANNEL_DESCRIPTION: JsonNodePayload,
DELETED: DeletedMetadataPayload,
}


@vp_compile
class HealthItemsPayload(VariablePayload):
"""
Expand Down
27 changes: 11 additions & 16 deletions src/tribler/core/components/database/db/store.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import enum
from dataclasses import dataclass, field
import logging
import re
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from time import sleep, time
from typing import Optional, Union
Expand All @@ -15,15 +15,8 @@
from tribler.core.components.database.db.orm_bindings import misc, torrent_metadata, torrent_state as torrent_state_, \
tracker_state
from tribler.core.components.database.db.orm_bindings.torrent_metadata import NULL_KEY_SUBST
from tribler.core.components.database.db.serialization import (
CHANNEL_TORRENT,
COLLECTION_NODE,
HealthItemsPayload,
REGULAR_TORRENT,
read_payload_with_offset,
NULL_KEY
)

from tribler.core.components.database.db.serialization import (CHANNEL_TORRENT, COLLECTION_NODE, HealthItemsPayload,
NULL_KEY, REGULAR_TORRENT, read_payload_with_offset)
from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HealthInfo
from tribler.core.utilities.db_corruption_handling.base import DatabaseIsCorrupted, handle_db_if_corrupted
from tribler.core.utilities.notifier import Notifier
Expand Down Expand Up @@ -255,13 +248,13 @@
try:
return await run_threaded(self.db, self.process_compressed_mdblob, compressed_data, **kwargs)
except DatabaseIsCorrupted:
raise # re-raise this exception and terminate the Core process
raise # re-raise this exception and terminate the Core process

Check warning on line 251 in src/tribler/core/components/database/db/store.py

View check run for this annotation

Codecov / codecov/patch

src/tribler/core/components/database/db/store.py#L251

Added line #L251 was not covered by tests
except Exception as e: # pylint: disable=broad-except # pragma: no cover
self._logger.exception("DB transaction error when tried to process compressed mdblob: "
f"{e.__class__.__name__}: {e}", exc_info=e)
return []

def process_compressed_mdblob(self, compressed_data, **kwargs):
def process_compressed_mdblob(self, compressed_data, skip_personal_metadata_payload=True):
try:
with LZ4FrameDecompressor() as decompressor:
decompressed_data = decompressor.decompress(compressed_data)
Expand All @@ -278,7 +271,8 @@
self._logger.warning(f"Unable to parse health information: {type(e).__name__}: {str(e)}")
raise

return self.process_squashed_mdblob(decompressed_data, health_info=health_info, **kwargs)
return self.process_squashed_mdblob(decompressed_data, health_info=health_info,
skip_personal_metadata_payload=skip_personal_metadata_payload)

def process_torrent_health(self, health: HealthInfo) -> bool:
"""
Expand All @@ -305,7 +299,8 @@

return False

def process_squashed_mdblob(self, chunk_data, external_thread=False, health_info=None, **kwargs):
def process_squashed_mdblob(self, chunk_data, external_thread=False, health_info=None,
skip_personal_metadata_payload=True):
"""
Process raw concatenated payloads blob. This routine breaks the database access into smaller batches.
It uses a congestion-control like algorithm to determine the optimal batch size, targeting the
Expand Down Expand Up @@ -344,7 +339,7 @@
# We separate the sessions to minimize database locking.
with db_session(immediate=True):
for payload in batch:
result.extend(self.process_payload(payload, **kwargs))
result.extend(self.process_payload(payload, skip_personal_metadata_payload))

# Batch size adjustment
batch_end_time = datetime.now() - batch_start_time
Expand Down Expand Up @@ -374,7 +369,7 @@
return result

@db_session
def process_payload(self, payload, skip_personal_metadata_payload=True, **kwargs):
def process_payload(self, payload, skip_personal_metadata_payload=True):
# Don't process our own torrents
if skip_personal_metadata_payload and payload.public_key == self.my_public_key_bin:
return []
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from datetime import datetime
from unittest.mock import Mock, patch

from tribler.core.components.database.db.serialization import TorrentMetadataPayload
import pytest

from tribler.core.components.database.db.serialization import TorrentMetadataPayload, UnknownBlobTypeException, \
read_payload_with_offset


def test_fix_torrent_metadata_payload():
Expand All @@ -24,3 +28,10 @@ def test_torrent_metadata_payload_magnet():
expected = "magnet:?xt=urn:btih:000102030405060708090a0b0c0d0e0f10111213&dn=b'title'&tr=b'tracker_info'"

assert expected == payload.get_magnet()


@patch('struct.unpack_from', Mock(return_value=(301,)))
def test_read_payload_with_offset_exception():
# Test that an exception is raised when metadata_type != REGULAR_TORRENT
with pytest.raises(UnknownBlobTypeException):
read_payload_with_offset(b'')
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
'date': "torrent_date",
'created': "torrent_date",
'status': 'status',
'torrents': 'num_entries',
'votes': 'votes',
'subscribed': 'subscribed',
'health': 'HEALTH',
Expand Down
Loading