Skip to content

Commit

Permalink
Add thumbnails and descriptions to channels
Browse files Browse the repository at this point in the history
  • Loading branch information
ichorid committed Mar 15, 2021
1 parent 2bc647a commit a052941
Show file tree
Hide file tree
Showing 34 changed files with 1,599 additions and 376 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
# self.eva_register_error_callback(self.on_error)
#
# def my_function(self, peer):
# self.eva_send_message(peer, b'info1', b'data1')
# self.eva_send_message(peer, b'info2', b'data2')
# self.eva_send_message(peer, b'info3', b'data3')
# self.eva_send_binary(peer, b'info1', b'data1')
# self.eva_send_binary(peer, b'info2', b'data2')
# self.eva_send_binary(peer, b'info3', b'data3')
#
# def on_receive(self, peer, binary_info, binary_data, nonce):
# logger.info(f'Data has been received: {binary_info}')
Expand All @@ -33,13 +33,15 @@
import time
from collections import defaultdict, deque
from enum import Enum, auto
from random import randint
from types import SimpleNamespace

from ipv8.lazy_community import lazy_wrapper
from ipv8.messaging.lazy_payload import VariablePayload, vp_compile

logger = logging.getLogger('EVA')

MAX_U64 = 0xFFFFFFFF

# fmt: off

Expand Down Expand Up @@ -168,8 +170,7 @@ def eva_send_binary(self, peer, info_binary, data_binary, nonce=None):
data_binary: binary data that will be sent to the target.
It is limited by several GB, but the protocol is slow by design, so
try to send less rather than more.
nonce: a uniq number for identifying the session. If not specified,
then `self.nonce + 1` will be used
nonce: a unique number for identifying the session. If not specified, generated randomly
"""
self.eva_protocol.send_binary(peer, info_binary, data_binary, nonce)

Expand Down Expand Up @@ -303,8 +304,6 @@ def __init__( # pylint: disable=too-many-arguments
self.retransmit_enabled = True
self.terminate_by_timeout_enabled = True

self.nonce = 0

# register tasks
community.register_task('scheduled send', self.send_scheduled, interval=scheduled_send_interval_in_sec)

Expand All @@ -324,9 +323,7 @@ def send_binary(self, peer, info_binary, data_binary, nonce=None):
return

if nonce is None:
self.nonce += 1

nonce = nonce or self.nonce
nonce = randint(0, MAX_U64)

if peer in self.outgoing:
scheduled_transfer = SimpleNamespace(info_binary=info_binary, data_binary=data_binary, nonce=nonce)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import struct
from binascii import unhexlify
from dataclasses import dataclass

Expand All @@ -9,11 +10,25 @@

from pony.orm.dbapiprovider import OperationalError

from tribler_core.modules.metadata_store.community.eva_protocol import EVAProtocolMixin
from tribler_core.modules.metadata_store.orm_bindings.channel_metadata import entries_to_chunk
from tribler_core.modules.metadata_store.store import GOT_NEWER_VERSION, UNKNOWN_CHANNEL, UNKNOWN_COLLECTION
from tribler_core.modules.metadata_store.serialization import (
CHANNEL_DESCRIPTION,
CHANNEL_THUMBNAIL,
CHANNEL_TORRENT,
COLLECTION_NODE,
REGULAR_TORRENT,
)
from tribler_core.modules.metadata_store.store import (
GOT_NEWER_VERSION,
UNKNOWN_CHANNEL,
UNKNOWN_COLLECTION,
UPDATED_OUR_VERSION,
)
from tribler_core.utilities.unicode import hexlify

BINARY_FIELDS = ("infohash", "channel_pk")
NO_RESPONSE = unhexlify("7ca1e9e922895a477a52cc9d6031020355eb172735bf83c058cb03ddcc9c6408")


def sanitize_query(query_dict, cap=100):
Expand Down Expand Up @@ -90,7 +105,7 @@ def channel_query_back_enabled(self):
return self.max_channel_query_back > 0


class RemoteQueryCommunity(Community):
class RemoteQueryCommunity(Community, EVAProtocolMixin):
"""
Community for general purpose SELECT-like queries into remote Channels database
"""
Expand All @@ -110,6 +125,22 @@ def __init__(self, my_peer, endpoint, network, metadata_store, settings=None, **
self.add_message_handler(RemoteSelectPayload, self.on_remote_select)
self.add_message_handler(SelectResponsePayload, self.on_remote_select_response)

self.eva_init()
self.eva_register_receive_callback(self.on_receive)
self.eva_register_send_complete_callback(self.on_send_complete)
self.eva_register_error_callback(self.on_error)

def on_receive(self, peer, binary_info, binary_data, nonce):
print(f'Data has been received: {binary_info}')
packet = (peer.address, binary_data)
self.on_packet(packet)

def on_send_complete(self, peer, binary_info, binary_data, nonce):
print(f'Transfer has been completed: {binary_info}')

def on_error(self, peer, exception):
print(f'Error has been occurred: {exception}')

def send_remote_select(self, peer, processing_callback=None, **kwargs):

request = SelectRequest(self.request_cache, hexlify(peer.mid), kwargs, processing_callback)
Expand All @@ -126,13 +157,19 @@ async def process_rpc_query(self, json_bytes: bytes):
:raises pony.orm.dbapiprovider.OperationalError: if an illegal query was performed.
"""
request_sanitized = sanitize_query(json.loads(json_bytes), self.settings.max_response_size)
return await self.mds.MetadataNode.get_entries_threaded(**request_sanitized)
return await self.mds.get_entries_threaded(**request_sanitized)

def send_db_results(self, peer, request_payload_id, db_results):
index = 0
while index < len(db_results):
data, index = entries_to_chunk(db_results, self.settings.maximum_payload_size, start_index=index)
self.ez_send(peer, SelectResponsePayload(request_payload_id, data))
payload = SelectResponsePayload(request_payload_id, data)
if len(data) > self.settings.maximum_payload_size:
self.eva_send_binary(
peer, struct.pack('>i', request_payload_id), self.ezr_pack(payload.msg_id, payload)
)
else:
self.ez_send(peer, payload)

@lazy_wrapper(RemoteSelectPayload)
async def on_remote_select(self, peer, request_payload):
Expand All @@ -157,7 +194,7 @@ async def on_remote_select_response(self, peer, response_payload):
"""
self.logger.info(f"Response from {hexlify(peer.mid)}")

# ACHTUNG! the returned request cache can be either a SelectRequest or PushbackWindow
# ACHTUNG! the returned request cache can be any one of SelectRequest, PushbackWindow
request = self.request_cache.get(hexlify(peer.mid), response_payload.id)
if request is None:
return
Expand All @@ -179,16 +216,50 @@ async def on_remote_select_response(self, peer, response_payload):
# Query back the sender for preview contents for the new channels
# TODO: maybe transform this into a processing_callback?
if self.settings.channel_query_back_enabled:
new_channels = [md for md, result in processing_results if result in (UNKNOWN_CHANNEL, UNKNOWN_COLLECTION)]
for channel in new_channels:
for channel in [md for md, result in processing_results if result in (UNKNOWN_CHANNEL, UNKNOWN_COLLECTION)]:
request_dict = {
"metadata_type": [COLLECTION_NODE, REGULAR_TORRENT],
"channel_pk": hexlify(channel.public_key),
"origin_id": channel.id_,
"first": 0,
"last": self.settings.max_channel_query_back,
}
self.send_remote_select(peer=peer, **request_dict)

# Query back for thumbnail/description
for channel in [md for md, result in processing_results if result == UNKNOWN_CHANNEL]:
request_dict = {
"metadata_type": [CHANNEL_THUMBNAIL, CHANNEL_DESCRIPTION],
"channel_pk": hexlify(channel.public_key),
"origin_id": channel.id_,
"first": 0,
"last": 2,
}
self.send_remote_select(peer=peer, **request_dict)

# Query back for updated thumbnail/description
for channel in [
md
for md, result in processing_results
if (result == UPDATED_OUR_VERSION and md.metadata_type == CHANNEL_TORRENT)
]:
for md_type in [CHANNEL_DESCRIPTION, CHANNEL_THUMBNAIL]:
request_dict = {
"metadata_type": [md_type],
"channel_pk": hexlify(channel.public_key),
"origin_id": channel.id_,
"first": 0,
"last": 1,
}

our_obj = await self.mds.get_entries_threaded(
channel_pk=channel.public_key, origin_id=channel.id_, metadata_type=md_type, first=0, last=1
)
if our_obj and our_obj[0]:
request_dict["attribute_ranges"] = (("timestamp", our_obj[0].timestamp + 1, None),)

self.send_remote_select(peer=peer, **request_dict)

if isinstance(request, SelectRequest) and request.processing_callback:
request.processing_callback(request, processing_results)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ def fake_on_acknowledgement0(peer, payload):
assert isinstance(self.overlay(0).most_recent_received_exception, TransferException)
assert isinstance(self.overlay(1).most_recent_received_exception, SizeLimitException)

@pytest.mark.timeout(10)
async def test_wrong_message_order_and_wrong_nonce(self):
self.overlay(0).eva_protocol.terminate_by_timeout_enabled = False
self.overlay(1).eva_protocol.terminate_by_timeout_enabled = False
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from binascii import unhexlify
from datetime import datetime
from json import dumps
from os import urandom
from unittest.mock import Mock

from ipv8.keyvault.crypto import default_eccrypto
Expand All @@ -9,9 +10,11 @@
from pony.orm import db_session
from pony.orm.dbapiprovider import OperationalError

import pytest

from tribler_core.modules.metadata_store.community.remote_query_community import RemoteQueryCommunity, sanitize_query
from tribler_core.modules.metadata_store.orm_bindings.channel_node import NEW
from tribler_core.modules.metadata_store.serialization import CHANNEL_TORRENT, REGULAR_TORRENT
from tribler_core.modules.metadata_store.serialization import CHANNEL_THUMBNAIL, CHANNEL_TORRENT, REGULAR_TORRENT
from tribler_core.modules.metadata_store.store import MetadataStore
from tribler_core.utilities.path_util import Path
from tribler_core.utilities.random_utils import random_infohash, random_string
Expand Down Expand Up @@ -83,8 +86,8 @@ async def test_remote_select(self):

# All the matching torrent entries should have been sent to Node 1
with db_session:
torrents0 = self.nodes[0].overlay.mds.MetadataNode.get_entries(**kwargs_dict)
torrents1 = self.nodes[1].overlay.mds.MetadataNode.get_entries(**kwargs_dict)
torrents0 = self.nodes[0].overlay.mds.get_entries(**kwargs_dict)
torrents1 = self.nodes[1].overlay.mds.get_entries(**kwargs_dict)
self.assertEqual(len(torrents0), len(torrents1))
self.assertEqual(len(torrents0), 20)

Expand Down Expand Up @@ -191,7 +194,8 @@ async def test_remote_select_packets_limit(self):
with db_session:
received_channels = self.nodes[1].overlay.mds.ChannelMetadata.select()
# We should receive less that 6 packets, so all the channels should not fit there.
self.assertTrue(40 < received_channels.count() < 60)
print(received_channels.count())
assert 40 < received_channels.count() < 60

# The list of outstanding requests should be empty
self.assertFalse(self.nodes[1].overlay.request_cache._identifiers)
Expand Down Expand Up @@ -299,3 +303,97 @@ async def test_process_rpc_query_no_column(self):
"""
with self.assertRaises(OperationalError):
await self.overlay(0).process_rpc_query(b'{"txt_filter":{"key":"bla"}}')

async def test_remote_query_big_response(self):

value = urandom(20000)
with db_session:
self.nodes[1].overlay.mds.ChannelThumbnail(binary_data=value)

kwargs_dict = {"metadata_type": [CHANNEL_THUMBNAIL]}
callback = Mock()
self.nodes[0].overlay.send_remote_select(self.nodes[1].my_peer, **kwargs_dict, processing_callback=callback)

await self.deliver_messages(timeout=0.5)
# Test optional response processing callback
callback.assert_called()

# All the matching torrent entries should have been sent to Node 1
with db_session:
torrents0 = self.nodes[0].overlay.mds.get_entries(**kwargs_dict)
torrents1 = self.nodes[1].overlay.mds.get_entries(**kwargs_dict)
self.assertEqual(len(torrents0), len(torrents1))

@pytest.mark.timeout(0)
async def test_remote_select_query_back_thumbs_and_descriptions(self):
"""
Test querying back preview thumbnail and description for previously unknown and updated channels.
"""

with db_session:
# Generate channels on Node 0
chan = self.nodes[0].overlay.mds.ChannelMetadata.create_channel("channel", "")
chan.commit_all_channels()
self.nodes[0].overlay.mds.ChannelThumbnail(
public_key=chan.public_key, origin_id=chan.id_, binary_data=urandom(2000), data_type="image/png"
)
chan_v = chan.timestamp

self.nodes[0].overlay.mds.ChannelDescription(
public_key=chan.public_key, origin_id=chan.id_, json_text='{"description_text": "foobar"}'
)

peer = self.nodes[0].my_peer
kwargs_dict = {"metadata_type": [CHANNEL_TORRENT]}
self.nodes[1].overlay.send_remote_select(peer, **kwargs_dict)

await self.deliver_messages(timeout=0.5)

with db_session:
assert self.nodes[1].overlay.mds.ChannelMetadata.get(lambda g: g.title == "channel")
assert self.nodes[1].overlay.mds.ChannelThumbnail.get()
assert self.nodes[1].overlay.mds.ChannelDescription.get()

# Now test querying for updated version of description/thumbnail
with db_session:
thumb = self.nodes[0].overlay.mds.ChannelThumbnail.get()
new_pic_bytes = urandom(2500)
thumb.update_properties({"binary_data": new_pic_bytes})
descr = self.nodes[0].overlay.mds.ChannelDescription.get()
descr.update_properties({"json_text": '{"description_text": "yummy"}'})

chan = self.nodes[0].overlay.mds.ChannelMetadata.get()
chan.commit_all_channels()
chan_v2 = chan.timestamp
assert chan_v2 > chan_v

self.nodes[1].overlay.send_remote_select(peer, **kwargs_dict)

await self.deliver_messages(timeout=2)

with db_session:
assert self.nodes[1].overlay.mds.ChannelMetadata.get(lambda g: g.title == "channel")
assert self.nodes[1].overlay.mds.ChannelThumbnail.get().binary_data == new_pic_bytes
assert self.nodes[1].overlay.mds.ChannelDescription.get().json_text == '{"description_text": "yummy"}'

# Make sure that we're only going to query for updated objects and skip old ones
with db_session:
chan = self.nodes[0].overlay.mds.ChannelMetadata.get()
self.nodes[0].overlay.mds.TorrentMetadata(
public_key=chan.public_key, origin_id=chan.id_, infohash=random_infohash(), status=NEW
)

chan.commit_all_channels()
chan_v3 = chan.timestamp
assert chan_v3 > chan_v2

self.nodes[0].overlay.eva_send_binary = Mock()

self.nodes[1].overlay.send_remote_select(peer, **kwargs_dict)
await self.deliver_messages(timeout=2)

# Big transfer should not have been called, because we only queried for updated version of the thumbnail
self.nodes[0].overlay.eva_send_binary.assert_not_called()

with db_session:
assert self.nodes[1].overlay.mds.ChannelMetadata.get(lambda g: g.title == "channel").timestamp == chan_v3
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from tribler_core.modules.libtorrent.download_config import DownloadConfig
from tribler_core.modules.libtorrent.torrentdef import TorrentDef
from tribler_core.modules.metadata_store.orm_bindings.channel_node import COMMITTED
from tribler_core.modules.metadata_store.serialization import CHANNEL_TORRENT
from tribler_core.utilities.unicode import hexlify

PROCESS_CHANNEL_DIR = 1
Expand Down Expand Up @@ -115,7 +116,7 @@ def remove_cruft_channels(self):
"""
with db_session:
# FIXME: if someone is subscribed to more than 1000 channels, they are in trouble...
channels = self.session.mds.ChannelMetadata.get_entries(last=1000, subscribed=True)
channels = self.session.mds.get_entries(last=1000, subscribed=True, metadata_type=CHANNEL_TORRENT)
subscribed_infohashes = [bytes(c.infohash) for c in list(channels)]
dirnames = [c.dirname for c in channels]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from pony.orm import Optional

from tribler_core.modules.metadata_store.serialization import BINARY_NODE, BinaryNodePayload


def define_binding(db):
class BinaryNode(db.ChannelNode):
"""
This ORM class represents channel descriptions.
"""

_discriminator_ = BINARY_NODE

# Serializable
binary_data = Optional(bytes, default=b"")
data_type = Optional(str, default="")

# Special class-level properties
_payload_class = BinaryNodePayload
payload_arguments = _payload_class.__init__.__code__.co_varnames[
: _payload_class.__init__.__code__.co_argcount
][1:]
nonpersonal_attributes = db.ChannelNode.nonpersonal_attributes + ('binary_data', 'data_type')

return BinaryNode
Loading

0 comments on commit a052941

Please sign in to comment.