Skip to content

Commit

Permalink
Add thumbnails and descriptions to channels
Browse files Browse the repository at this point in the history
This PR enables the user to add thumbnails and Markdown-based descriptions to their channels.
  • Loading branch information
ichorid committed Mar 19, 2021
1 parent fc2a641 commit 5215937
Show file tree
Hide file tree
Showing 53 changed files with 2,053 additions and 540 deletions.
2 changes: 1 addition & 1 deletion src/tribler-core/tribler_core/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from tribler_core.session import Session
from tribler_core.tests.tools.common import TESTS_DATA_DIR, TESTS_DIR
from tribler_core.tests.tools.tracker.udp_tracker import UDPTracker
from tribler_core.upgrade.db72_to_pony import DispersyToPonyMigration
from tribler_core.upgrade.legacy_to_pony import DispersyToPonyMigration
from tribler_core.utilities.unicode import hexlify


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
# self.eva_register_error_callback(self.on_error)
#
# def my_function(self, peer):
# self.eva_send_message(peer, b'info1', b'data1')
# self.eva_send_message(peer, b'info2', b'data2')
# self.eva_send_message(peer, b'info3', b'data3')
# self.eva_send_binary(peer, b'info1', b'data1')
# self.eva_send_binary(peer, b'info2', b'data2')
# self.eva_send_binary(peer, b'info3', b'data3')
#
# def on_receive(self, peer, binary_info, binary_data, nonce):
# logger.info(f'Data has been received: {binary_info}')
Expand All @@ -33,13 +33,15 @@
import time
from collections import defaultdict, deque
from enum import Enum, auto
from random import randint
from types import SimpleNamespace

from ipv8.lazy_community import lazy_wrapper
from ipv8.messaging.lazy_payload import VariablePayload, vp_compile

logger = logging.getLogger('EVA')

MAX_U64 = 0xFFFFFFFF

# fmt: off

Expand Down Expand Up @@ -168,8 +170,7 @@ def eva_send_binary(self, peer, info_binary, data_binary, nonce=None):
data_binary: binary data that will be sent to the target.
It is limited by several GB, but the protocol is slow by design, so
try to send less rather than more.
nonce: a uniq number for identifying the session. If not specified,
then `self.nonce + 1` will be used
nonce: a unique number for identifying the session. If not specified, generated randomly
"""
self.eva_protocol.send_binary(peer, info_binary, data_binary, nonce)

Expand Down Expand Up @@ -303,8 +304,6 @@ def __init__( # pylint: disable=too-many-arguments
self.retransmit_enabled = True
self.terminate_by_timeout_enabled = True

self.nonce = 0

# register tasks
community.register_task('scheduled send', self.send_scheduled, interval=scheduled_send_interval_in_sec)

Expand All @@ -324,9 +323,7 @@ def send_binary(self, peer, info_binary, data_binary, nonce=None):
return

if nonce is None:
self.nonce += 1

nonce = nonce or self.nonce
nonce = randint(0, MAX_U64)

if peer in self.outgoing:
scheduled_transfer = SimpleNamespace(info_binary=info_binary, data_binary=data_binary, nonce=nonce)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
RemoteQueryCommunitySettings,
)
from tribler_core.modules.metadata_store.serialization import CHANNEL_TORRENT
from tribler_core.modules.metadata_store.store import UNKNOWN_CHANNEL, UNKNOWN_COLLECTION, UNKNOWN_TORRENT
from tribler_core.modules.metadata_store.store import ObjState

minimal_blob_size = 200
maximum_payload_size = 1024
Expand Down Expand Up @@ -80,11 +80,19 @@ def send_remote_select_subscribed_channels(self, peer):
def on_packet_callback(_, processing_results):
# We use responses for requests about subscribed channels to bump our local channels ratings
with db_session:
for c in [md for md, _ in processing_results if md.metadata_type == CHANNEL_TORRENT]:
for c in [r.md_obj for r in processing_results if r.md_obj.metadata_type == CHANNEL_TORRENT]:
self.mds.vote_bump(c.public_key, c.id_, peer.public_key.key_to_bin()[10:])

# Notify GUI about the new channels
new_channels = [md for md, result in processing_results if result == UNKNOWN_CHANNEL and md.origin_id == 0]
new_channels = [
r.md_obj
for r in processing_results
if (
r.obj_state == ObjState.UNKNOWN_OBJECT
and r.md_obj.metadata_type == CHANNEL_TORRENT
and r.md_obj.origin_id == 0
)
]
if self.notifier and new_channels:
self.notifier.notify(
NTFY.CHANNEL_DISCOVERED,
Expand All @@ -105,9 +113,9 @@ def send_search_request(self, **kwargs):

def notify_gui(_, processing_results):
search_results = [
md.to_simple_dict()
for md, result in processing_results
if result in (UNKNOWN_TORRENT, UNKNOWN_CHANNEL, UNKNOWN_COLLECTION)
r.md_obj.to_simple_dict()
for r in processing_results
if r.obj_state in (ObjState.UNKNOWN_OBJECT, ObjState.UPDATED_OUR_VERSION)
]
self.notifier.notify(NTFY.REMOTE_QUERY_RESULTS, {"uuid": str(request_uuid), "results": search_results})

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import struct
from binascii import unhexlify
from dataclasses import dataclass

Expand All @@ -9,11 +10,14 @@

from pony.orm.dbapiprovider import OperationalError

from tribler_core.modules.metadata_store.community.eva_protocol import EVAProtocolMixin
from tribler_core.modules.metadata_store.orm_bindings.channel_metadata import entries_to_chunk
from tribler_core.modules.metadata_store.store import GOT_NEWER_VERSION, UNKNOWN_CHANNEL, UNKNOWN_COLLECTION
from tribler_core.modules.metadata_store.serialization import CHANNEL_TORRENT, COLLECTION_NODE, REGULAR_TORRENT
from tribler_core.modules.metadata_store.store import ObjState
from tribler_core.utilities.unicode import hexlify

BINARY_FIELDS = ("infohash", "channel_pk")
NO_RESPONSE = unhexlify("7ca1e9e922895a477a52cc9d6031020355eb172735bf83c058cb03ddcc9c6408")


def sanitize_query(query_dict, cap=100):
Expand Down Expand Up @@ -90,7 +94,7 @@ def channel_query_back_enabled(self):
return self.max_channel_query_back > 0


class RemoteQueryCommunity(Community):
class RemoteQueryCommunity(Community, EVAProtocolMixin):
"""
Community for general purpose SELECT-like queries into remote Channels database
"""
Expand All @@ -110,6 +114,22 @@ def __init__(self, my_peer, endpoint, network, metadata_store, settings=None, **
self.add_message_handler(RemoteSelectPayload, self.on_remote_select)
self.add_message_handler(SelectResponsePayload, self.on_remote_select_response)

self.eva_init()
self.eva_register_receive_callback(self.on_receive)
self.eva_register_send_complete_callback(self.on_send_complete)
self.eva_register_error_callback(self.on_error)

def on_receive(self, peer, binary_info, binary_data, nonce):
self.logger.info(f"EVA data received: peer {hexlify(peer.mid)}, info {binary_info}")
packet = (peer.address, binary_data)
self.on_packet(packet)

def on_send_complete(self, peer, binary_info, binary_data, nonce):
self.logger.info(f"EVA outgoing transfer complete: peer {hexlify(peer.mid)}, info {binary_info}")

def on_error(self, peer, exception):
self.logger.warning(f"EVA transfer error: peer {hexlify(peer.mid)}, exception: {exception}")

def send_remote_select(self, peer, processing_callback=None, **kwargs):

request = SelectRequest(self.request_cache, hexlify(peer.mid), kwargs, processing_callback)
Expand All @@ -126,13 +146,19 @@ async def process_rpc_query(self, json_bytes: bytes):
:raises pony.orm.dbapiprovider.OperationalError: if an illegal query was performed.
"""
request_sanitized = sanitize_query(json.loads(json_bytes), self.settings.max_response_size)
return await self.mds.MetadataNode.get_entries_threaded(**request_sanitized)
return await self.mds.get_entries_threaded(**request_sanitized)

def send_db_results(self, peer, request_payload_id, db_results):
index = 0
while index < len(db_results):
data, index = entries_to_chunk(db_results, self.settings.maximum_payload_size, start_index=index)
self.ez_send(peer, SelectResponsePayload(request_payload_id, data))
payload = SelectResponsePayload(request_payload_id, data)
if len(data) > self.settings.maximum_payload_size:
self.eva_send_binary(
peer, struct.pack('>i', request_payload_id), self.ezr_pack(payload.msg_id, payload)
)
else:
self.ez_send(peer, payload)

@lazy_wrapper(RemoteSelectPayload)
async def on_remote_select(self, peer, request_payload):
Expand All @@ -157,7 +183,7 @@ async def on_remote_select_response(self, peer, response_payload):
"""
self.logger.info(f"Response from {hexlify(peer.mid)}")

# ACHTUNG! the returned request cache can be either a SelectRequest or PushbackWindow
# ACHTUNG! the returned request cache can be any one of SelectRequest, PushbackWindow
request = self.request_cache.get(hexlify(peer.mid), response_payload.id)
if request is None:
return
Expand All @@ -171,23 +197,30 @@ async def on_remote_select_response(self, peer, response_payload):
processing_results = await self.mds.process_compressed_mdblob_threaded(response_payload.raw_blob)
self.logger.info(f"Response result: {processing_results}")

# If we now about updated versions of the received stuff, push the updates back
# If we know about updated versions of the received stuff, push the updates back
if isinstance(request, SelectRequest) and self.settings.push_updates_back_enabled:
newer_entities = [md for md, result in processing_results if result == GOT_NEWER_VERSION]
newer_entities = [r.md_obj for r in processing_results if r.obj_state == ObjState.GOT_NEWER_VERSION]
self.send_db_results(peer, response_payload.id, newer_entities)

# Query back the sender for preview contents for the new channels
# TODO: maybe transform this into a processing_callback?
if self.settings.channel_query_back_enabled:
new_channels = [md for md, result in processing_results if result in (UNKNOWN_CHANNEL, UNKNOWN_COLLECTION)]
for channel in new_channels:
request_dict = {
"channel_pk": hexlify(channel.public_key),
"origin_id": channel.id_,
"first": 0,
"last": self.settings.max_channel_query_back,
}
self.send_remote_select(peer=peer, **request_dict)
for result in processing_results:
if result.obj_state == ObjState.UNKNOWN_OBJECT and result.md_obj.metadata_type in (
CHANNEL_TORRENT,
COLLECTION_NODE,
):
request_dict = {
"metadata_type": [COLLECTION_NODE, REGULAR_TORRENT],
"channel_pk": hexlify(result.md_obj.public_key),
"origin_id": result.md_obj.id_,
"first": 0,
"last": self.settings.max_channel_query_back,
}
self.send_remote_select(peer=peer, **request_dict)
for dep_query_dict in result.missing_deps:
# Query back for missing dependencies, e.g. thumbnail/description
self.send_remote_select(peer=peer, **dep_query_dict)

if isinstance(request, SelectRequest) and request.processing_callback:
request.processing_callback(request, processing_results)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ def fake_on_acknowledgement0(peer, payload):
assert isinstance(self.overlay(0).most_recent_received_exception, TransferException)
assert isinstance(self.overlay(1).most_recent_received_exception, SizeLimitException)

@pytest.mark.timeout(10)
async def test_wrong_message_order_and_wrong_nonce(self):
self.overlay(0).eva_protocol.terminate_by_timeout_enabled = False
self.overlay(1).eva_protocol.terminate_by_timeout_enabled = False
Expand Down
Loading

0 comments on commit 5215937

Please sign in to comment.