Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

READY: Channel description and thumbnails #6025

Merged
merged 19 commits into from
Mar 23, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/tribler-core/tribler_core/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from tribler_core.session import Session
from tribler_core.tests.tools.common import TESTS_DATA_DIR, TESTS_DIR
from tribler_core.tests.tools.tracker.udp_tracker import UDPTracker
from tribler_core.upgrade.db72_to_pony import DispersyToPonyMigration
from tribler_core.upgrade.legacy_to_pony import DispersyToPonyMigration
from tribler_core.utilities.unicode import hexlify


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
# self.eva_register_error_callback(self.on_error)
#
# def my_function(self, peer):
# self.eva_send_message(peer, b'info1', b'data1')
# self.eva_send_message(peer, b'info2', b'data2')
# self.eva_send_message(peer, b'info3', b'data3')
# self.eva_send_binary(peer, b'info1', b'data1')
# self.eva_send_binary(peer, b'info2', b'data2')
# self.eva_send_binary(peer, b'info3', b'data3')
#
# def on_receive(self, peer, binary_info, binary_data, nonce):
# logger.info(f'Data has been received: {binary_info}')
Expand All @@ -33,13 +33,15 @@
import time
from collections import defaultdict, deque
from enum import Enum, auto
from random import randint
from types import SimpleNamespace

from ipv8.lazy_community import lazy_wrapper
from ipv8.messaging.lazy_payload import VariablePayload, vp_compile

logger = logging.getLogger('EVA')

MAX_U64 = 0xFFFFFFFF

# fmt: off

Expand Down Expand Up @@ -168,8 +170,7 @@ def eva_send_binary(self, peer, info_binary, data_binary, nonce=None):
data_binary: binary data that will be sent to the target.
It is limited by several GB, but the protocol is slow by design, so
try to send less rather than more.
nonce: a uniq number for identifying the session. If not specified,
then `self.nonce + 1` will be used
nonce: a unique number for identifying the session. If not specified, generated randomly
"""
self.eva_protocol.send_binary(peer, info_binary, data_binary, nonce)

Expand Down Expand Up @@ -303,8 +304,6 @@ def __init__( # pylint: disable=too-many-arguments
self.retransmit_enabled = True
self.terminate_by_timeout_enabled = True

self.nonce = 0

# register tasks
community.register_task('scheduled send', self.send_scheduled, interval=scheduled_send_interval_in_sec)

Expand All @@ -324,9 +323,7 @@ def send_binary(self, peer, info_binary, data_binary, nonce=None):
return

if nonce is None:
self.nonce += 1

nonce = nonce or self.nonce
nonce = randint(0, MAX_U64)

if peer in self.outgoing:
scheduled_transfer = SimpleNamespace(info_binary=info_binary, data_binary=data_binary, nonce=nonce)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
RemoteQueryCommunitySettings,
)
from tribler_core.modules.metadata_store.serialization import CHANNEL_TORRENT
from tribler_core.modules.metadata_store.store import UNKNOWN_CHANNEL, UNKNOWN_COLLECTION, UNKNOWN_TORRENT
from tribler_core.modules.metadata_store.store import ObjState

minimal_blob_size = 200
maximum_payload_size = 1024
Expand Down Expand Up @@ -80,11 +80,19 @@ def send_remote_select_subscribed_channels(self, peer):
def on_packet_callback(_, processing_results):
# We use responses for requests about subscribed channels to bump our local channels ratings
with db_session:
for c in [md for md, _ in processing_results if md.metadata_type == CHANNEL_TORRENT]:
for c in [r.md_obj for r in processing_results if r.md_obj.metadata_type == CHANNEL_TORRENT]:
ichorid marked this conversation as resolved.
Show resolved Hide resolved
self.mds.vote_bump(c.public_key, c.id_, peer.public_key.key_to_bin()[10:])

# Notify GUI about the new channels
new_channels = [md for md, result in processing_results if result == UNKNOWN_CHANNEL and md.origin_id == 0]
new_channels = [
r.md_obj
for r in processing_results
if (
r.obj_state == ObjState.UNKNOWN_OBJECT
and r.md_obj.metadata_type == CHANNEL_TORRENT
and r.md_obj.origin_id == 0
)
]
if self.notifier and new_channels:
self.notifier.notify(
NTFY.CHANNEL_DISCOVERED,
Expand All @@ -105,9 +113,9 @@ def send_search_request(self, **kwargs):

def notify_gui(_, processing_results):
search_results = [
md.to_simple_dict()
for md, result in processing_results
if result in (UNKNOWN_TORRENT, UNKNOWN_CHANNEL, UNKNOWN_COLLECTION)
r.md_obj.to_simple_dict()
for r in processing_results
if r.obj_state in (ObjState.UNKNOWN_OBJECT, ObjState.UPDATED_OUR_VERSION)
drew2a marked this conversation as resolved.
Show resolved Hide resolved
]
self.notifier.notify(NTFY.REMOTE_QUERY_RESULTS, {"uuid": str(request_uuid), "results": search_results})

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import struct
from binascii import unhexlify
from dataclasses import dataclass

Expand All @@ -9,11 +10,14 @@

from pony.orm.dbapiprovider import OperationalError

from tribler_core.modules.metadata_store.community.eva_protocol import EVAProtocolMixin
from tribler_core.modules.metadata_store.orm_bindings.channel_metadata import entries_to_chunk
from tribler_core.modules.metadata_store.store import GOT_NEWER_VERSION, UNKNOWN_CHANNEL, UNKNOWN_COLLECTION
from tribler_core.modules.metadata_store.serialization import CHANNEL_TORRENT, COLLECTION_NODE, REGULAR_TORRENT
from tribler_core.modules.metadata_store.store import ObjState
from tribler_core.utilities.unicode import hexlify

BINARY_FIELDS = ("infohash", "channel_pk")
NO_RESPONSE = unhexlify("7ca1e9e922895a477a52cc9d6031020355eb172735bf83c058cb03ddcc9c6408")


def sanitize_query(query_dict, cap=100):
Expand Down Expand Up @@ -90,7 +94,7 @@ def channel_query_back_enabled(self):
return self.max_channel_query_back > 0


class RemoteQueryCommunity(Community):
class RemoteQueryCommunity(Community, EVAProtocolMixin):
"""
Community for general purpose SELECT-like queries into remote Channels database
"""
Expand All @@ -110,6 +114,22 @@ def __init__(self, my_peer, endpoint, network, metadata_store, settings=None, **
self.add_message_handler(RemoteSelectPayload, self.on_remote_select)
self.add_message_handler(SelectResponsePayload, self.on_remote_select_response)

self.eva_init()
self.eva_register_receive_callback(self.on_receive)
self.eva_register_send_complete_callback(self.on_send_complete)
self.eva_register_error_callback(self.on_error)

def on_receive(self, peer, binary_info, binary_data, nonce):
self.logger.info(f"EVA data received: peer {hexlify(peer.mid)}, info {binary_info}")
packet = (peer.address, binary_data)
self.on_packet(packet)

def on_send_complete(self, peer, binary_info, binary_data, nonce):
self.logger.info(f"EVA outgoing transfer complete: peer {hexlify(peer.mid)}, info {binary_info}")

def on_error(self, peer, exception):
self.logger.warning(f"EVA transfer error: peer {hexlify(peer.mid)}, exception: {exception}")

def send_remote_select(self, peer, processing_callback=None, **kwargs):

request = SelectRequest(self.request_cache, hexlify(peer.mid), kwargs, processing_callback)
Expand All @@ -126,13 +146,19 @@ async def process_rpc_query(self, json_bytes: bytes):
:raises pony.orm.dbapiprovider.OperationalError: if an illegal query was performed.
"""
request_sanitized = sanitize_query(json.loads(json_bytes), self.settings.max_response_size)
return await self.mds.MetadataNode.get_entries_threaded(**request_sanitized)
return await self.mds.get_entries_threaded(**request_sanitized)

def send_db_results(self, peer, request_payload_id, db_results):
index = 0
while index < len(db_results):
data, index = entries_to_chunk(db_results, self.settings.maximum_payload_size, start_index=index)
self.ez_send(peer, SelectResponsePayload(request_payload_id, data))
payload = SelectResponsePayload(request_payload_id, data)
if len(data) > self.settings.maximum_payload_size:
self.eva_send_binary(
peer, struct.pack('>i', request_payload_id), self.ezr_pack(payload.msg_id, payload)
)
drew2a marked this conversation as resolved.
Show resolved Hide resolved
else:
self.ez_send(peer, payload)

@lazy_wrapper(RemoteSelectPayload)
async def on_remote_select(self, peer, request_payload):
Expand All @@ -157,7 +183,7 @@ async def on_remote_select_response(self, peer, response_payload):
"""
self.logger.info(f"Response from {hexlify(peer.mid)}")

# ACHTUNG! the returned request cache can be either a SelectRequest or PushbackWindow
# ACHTUNG! the returned request cache can be any one of SelectRequest, PushbackWindow
request = self.request_cache.get(hexlify(peer.mid), response_payload.id)
if request is None:
return
Expand All @@ -171,23 +197,30 @@ async def on_remote_select_response(self, peer, response_payload):
processing_results = await self.mds.process_compressed_mdblob_threaded(response_payload.raw_blob)
self.logger.info(f"Response result: {processing_results}")

# If we now about updated versions of the received stuff, push the updates back
# If we know about updated versions of the received stuff, push the updates back
if isinstance(request, SelectRequest) and self.settings.push_updates_back_enabled:
newer_entities = [md for md, result in processing_results if result == GOT_NEWER_VERSION]
newer_entities = [r.md_obj for r in processing_results if r.obj_state == ObjState.GOT_NEWER_VERSION]
self.send_db_results(peer, response_payload.id, newer_entities)

# Query back the sender for preview contents for the new channels
# TODO: maybe transform this into a processing_callback?
if self.settings.channel_query_back_enabled:
new_channels = [md for md, result in processing_results if result in (UNKNOWN_CHANNEL, UNKNOWN_COLLECTION)]
for channel in new_channels:
request_dict = {
"channel_pk": hexlify(channel.public_key),
"origin_id": channel.id_,
"first": 0,
"last": self.settings.max_channel_query_back,
}
self.send_remote_select(peer=peer, **request_dict)
for result in processing_results:
if result.obj_state == ObjState.UNKNOWN_OBJECT and result.md_obj.metadata_type in (
CHANNEL_TORRENT,
COLLECTION_NODE,
):
request_dict = {
"metadata_type": [COLLECTION_NODE, REGULAR_TORRENT],
"channel_pk": hexlify(result.md_obj.public_key),
"origin_id": result.md_obj.id_,
"first": 0,
"last": self.settings.max_channel_query_back,
}
self.send_remote_select(peer=peer, **request_dict)
for dep_query_dict in result.missing_deps:
# Query back for missing dependencies, e.g. thumbnail/description
self.send_remote_select(peer=peer, **dep_query_dict)
drew2a marked this conversation as resolved.
Show resolved Hide resolved

if isinstance(request, SelectRequest) and request.processing_callback:
request.processing_callback(request, processing_results)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ def fake_on_acknowledgement0(peer, payload):
assert isinstance(self.overlay(0).most_recent_received_exception, TransferException)
assert isinstance(self.overlay(1).most_recent_received_exception, SizeLimitException)

@pytest.mark.timeout(10)
async def test_wrong_message_order_and_wrong_nonce(self):
self.overlay(0).eva_protocol.terminate_by_timeout_enabled = False
self.overlay(1).eva_protocol.terminate_by_timeout_enabled = False
Expand Down
Loading