Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

READY: Channel description and thumbnails #6025

Merged
merged 19 commits into from
Mar 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,18 @@ repos:
additional_dependencies: [
'flake8-import-order==0.18',
]

- repo: local
hooks:
- id: pylint
name: pylint
entry: pylint
language: system
files: |
(?x)(
^src/tribler-gui/|
^src/tribler-core/|
^src/tribler-common
)
types: [file, python]

10 changes: 5 additions & 5 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ init-hook='import sys; import os; sys.path.append(os.path.join("src", "pyipv8"))

# Add files or directories to the blacklist. They should be base names, not
# paths.
ignore=.git,libnacl,data
ignore=.git,libnacl,data,.pylint.rc,pyproject.toml

# Pickle collected data for later comparisons.
persistent=yes
Expand All @@ -32,7 +32,7 @@ unsafe-load-any-extension=no
# A comma-separated list of package or module names from where C extensions may
# be loaded. Extensions are loading into the active Python interpreter and may
# run arbitrary code
extension-pkg-whitelist=libtorrent
extension-pkg-whitelist=libtorrent,PyQt5

# Allow optimization of some AST trees. This will activate a peephole AST
# optimizer, which will apply various small optimizations. For instance, it can
Expand Down Expand Up @@ -65,7 +65,7 @@ confidence=
# --disable=W"
#disable=import-star-module-level,old-octal-literal,oct-method,print-statement,unpacking-in-except,parameter-unpacking,backtick,old-raise-syntax,old-ne-operator,long-suffix,dict-view-method,dict-iter-method,metaclass-assignment,next-method-called,raising-string,indexing-exception,raw_input-builtin,long-builtin,file-builtin,execfile-builtin,coerce-builtin,cmp-builtin,buffer-builtin,basestring-builtin,apply-builtin,filter-builtin-not-iterating,using-cmp-argument,useless-suppression,range-builtin-not-iterating,suppressed-message,no-absolute-import,old-division,cmp-method,reload-builtin,zip-builtin-not-iterating,intern-builtin,unichr-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,input-builtin,round-builtin,hex-method,nonzero-method,map-builtin-not-iterating
disable=C0321,W0142,invalid-name,missing-docstring,no-member,no-name-in-module,
no-self-use,too-few-public-methods,C0330,W1203
no-self-use,too-few-public-methods,C0330,W1203,too-many-ancestors,too-many-arguments,too-many-public-methods,too-many-statements,too-many-instance-attributes,too-many-locals,too-many-branches,too-many-return-statements

#missing-type-doc

Expand Down Expand Up @@ -129,12 +129,12 @@ ignore-mixin-members=yes
# (useful for modules/projects where namespaces are manipulated during runtime
# and thus existing member attributes cannot be deduced by static analysis. It
# supports qualified module names, as well as Unix pattern matching.
ignored-modules=six.moves
ignored-modules=

# List of classes names for which member attributes should not be checked
# (useful for classes with attributes dynamically set). This supports can work
# with qualified names.
ignored-classes=SQLObject,twisted.internet.reactor,ephem,libtorrent
ignored-classes=ephem,libtorrent

# List of members which are set dynamically and missed by pylint inference
# system, and so shouldn't trigger E1101 when accessed. Python regular
Expand Down
2 changes: 1 addition & 1 deletion src/tribler-core/tribler_core/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from tribler_core.session import Session
from tribler_core.tests.tools.common import TESTS_DATA_DIR, TESTS_DIR
from tribler_core.tests.tools.tracker.udp_tracker import UDPTracker
from tribler_core.upgrade.db72_to_pony import DispersyToPonyMigration
from tribler_core.upgrade.legacy_to_pony import DispersyToPonyMigration
from tribler_core.utilities.unicode import hexlify


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
# self.eva_register_error_callback(self.on_error)
#
# def my_function(self, peer):
# self.eva_send_message(peer, b'info1', b'data1')
# self.eva_send_message(peer, b'info2', b'data2')
# self.eva_send_message(peer, b'info3', b'data3')
# self.eva_send_binary(peer, b'info1', b'data1')
# self.eva_send_binary(peer, b'info2', b'data2')
# self.eva_send_binary(peer, b'info3', b'data3')
#
# def on_receive(self, peer, binary_info, binary_data, nonce):
# logger.info(f'Data has been received: {binary_info}')
Expand All @@ -33,13 +33,15 @@
import time
from collections import defaultdict, deque
from enum import Enum, auto
from random import randint
from types import SimpleNamespace

from ipv8.lazy_community import lazy_wrapper
from ipv8.messaging.lazy_payload import VariablePayload, vp_compile

logger = logging.getLogger('EVA')

MAX_U64 = 0xFFFFFFFF

# fmt: off

Expand Down Expand Up @@ -168,8 +170,7 @@ def eva_send_binary(self, peer, info_binary, data_binary, nonce=None):
data_binary: binary data that will be sent to the target.
It is limited by several GB, but the protocol is slow by design, so
try to send less rather than more.
nonce: a uniq number for identifying the session. If not specified,
then `self.nonce + 1` will be used
nonce: a unique number for identifying the session. If not specified, generated randomly
"""
self.eva_protocol.send_binary(peer, info_binary, data_binary, nonce)

Expand Down Expand Up @@ -303,8 +304,6 @@ def __init__( # pylint: disable=too-many-arguments
self.retransmit_enabled = True
self.terminate_by_timeout_enabled = True

self.nonce = 0

# register tasks
community.register_task('scheduled send', self.send_scheduled, interval=scheduled_send_interval_in_sec)

Expand All @@ -324,9 +323,7 @@ def send_binary(self, peer, info_binary, data_binary, nonce=None):
return

if nonce is None:
self.nonce += 1

nonce = nonce or self.nonce
nonce = randint(0, MAX_U64)

if peer in self.outgoing:
scheduled_transfer = SimpleNamespace(info_binary=info_binary, data_binary=data_binary, nonce=nonce)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
RemoteQueryCommunitySettings,
)
from tribler_core.modules.metadata_store.serialization import CHANNEL_TORRENT
from tribler_core.modules.metadata_store.store import UNKNOWN_CHANNEL, UNKNOWN_COLLECTION, UNKNOWN_TORRENT
from tribler_core.modules.metadata_store.store import ObjState

minimal_blob_size = 200
maximum_payload_size = 1024
Expand Down Expand Up @@ -80,16 +80,21 @@ def send_remote_select_subscribed_channels(self, peer):
def on_packet_callback(_, processing_results):
# We use responses for requests about subscribed channels to bump our local channels ratings
with db_session:
for c in [md for md, _ in processing_results if md.metadata_type == CHANNEL_TORRENT]:
for c in (r.md_obj for r in processing_results if r.md_obj.metadata_type == CHANNEL_TORRENT):
self.mds.vote_bump(c.public_key, c.id_, peer.public_key.key_to_bin()[10:])

# Notify GUI about the new channels
new_channels = [md for md, result in processing_results if result == UNKNOWN_CHANNEL and md.origin_id == 0]
if self.notifier and new_channels:
self.notifier.notify(
NTFY.CHANNEL_DISCOVERED,
{"results": [md.to_simple_dict() for md in new_channels], "uuid": str(CHANNELS_VIEW_UUID)},
results = [
r.md_obj.to_simple_dict()
for r in processing_results
if (
r.obj_state == ObjState.UNKNOWN_OBJECT
and r.md_obj.metadata_type == CHANNEL_TORRENT
and r.md_obj.origin_id == 0
)
]
if self.notifier and results:
self.notifier.notify(NTFY.CHANNEL_DISCOVERED, {"results": results, "uuid": str(CHANNELS_VIEW_UUID)})

request_dict = {
"metadata_type": [CHANNEL_TORRENT],
Expand All @@ -104,12 +109,13 @@ def send_search_request(self, **kwargs):
request_uuid = uuid.uuid4()

def notify_gui(_, processing_results):
search_results = [
md.to_simple_dict()
for md, result in processing_results
if result in (UNKNOWN_TORRENT, UNKNOWN_CHANNEL, UNKNOWN_COLLECTION)
results = [
r.md_obj.to_simple_dict()
for r in processing_results
if r.obj_state in (ObjState.UNKNOWN_OBJECT, ObjState.UPDATED_OUR_VERSION)
drew2a marked this conversation as resolved.
Show resolved Hide resolved
]
self.notifier.notify(NTFY.REMOTE_QUERY_RESULTS, {"uuid": str(request_uuid), "results": search_results})
if self.notifier and results:
self.notifier.notify(NTFY.REMOTE_QUERY_RESULTS, {"results": results, "uuid": str(request_uuid)})

for p in self.get_random_peers(self.settings.max_query_peers):
self.send_remote_select(p, **kwargs, processing_callback=notify_gui)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import struct
from binascii import unhexlify
from dataclasses import dataclass

Expand All @@ -9,11 +10,14 @@

from pony.orm.dbapiprovider import OperationalError

from tribler_core.modules.metadata_store.community.eva_protocol import EVAProtocolMixin
from tribler_core.modules.metadata_store.orm_bindings.channel_metadata import entries_to_chunk
from tribler_core.modules.metadata_store.store import GOT_NEWER_VERSION, UNKNOWN_CHANNEL, UNKNOWN_COLLECTION
from tribler_core.modules.metadata_store.serialization import CHANNEL_TORRENT, COLLECTION_NODE, REGULAR_TORRENT
from tribler_core.modules.metadata_store.store import ObjState
from tribler_core.utilities.unicode import hexlify

BINARY_FIELDS = ("infohash", "channel_pk")
NO_RESPONSE = unhexlify("7ca1e9e922895a477a52cc9d6031020355eb172735bf83c058cb03ddcc9c6408")


def sanitize_query(query_dict, cap=100):
Expand Down Expand Up @@ -90,7 +94,7 @@ def channel_query_back_enabled(self):
return self.max_channel_query_back > 0


class RemoteQueryCommunity(Community):
class RemoteQueryCommunity(Community, EVAProtocolMixin):
"""
Community for general purpose SELECT-like queries into remote Channels database
"""
Expand All @@ -110,6 +114,22 @@ def __init__(self, my_peer, endpoint, network, metadata_store, settings=None, **
self.add_message_handler(RemoteSelectPayload, self.on_remote_select)
self.add_message_handler(SelectResponsePayload, self.on_remote_select_response)

self.eva_init()
self.eva_register_receive_callback(self.on_receive)
self.eva_register_send_complete_callback(self.on_send_complete)
self.eva_register_error_callback(self.on_error)

def on_receive(self, peer, binary_info, binary_data, nonce):
self.logger.info(f"EVA data received: peer {hexlify(peer.mid)}, info {binary_info}")
packet = (peer.address, binary_data)
self.on_packet(packet)

def on_send_complete(self, peer, binary_info, binary_data, nonce):
self.logger.info(f"EVA outgoing transfer complete: peer {hexlify(peer.mid)}, info {binary_info}")

def on_error(self, peer, exception):
self.logger.warning(f"EVA transfer error: peer {hexlify(peer.mid)}, exception: {exception}")

def send_remote_select(self, peer, processing_callback=None, **kwargs):

request = SelectRequest(self.request_cache, hexlify(peer.mid), kwargs, processing_callback)
Expand All @@ -126,13 +146,19 @@ async def process_rpc_query(self, json_bytes: bytes):
:raises pony.orm.dbapiprovider.OperationalError: if an illegal query was performed.
"""
request_sanitized = sanitize_query(json.loads(json_bytes), self.settings.max_response_size)
return await self.mds.MetadataNode.get_entries_threaded(**request_sanitized)
return await self.mds.get_entries_threaded(**request_sanitized)

def send_db_results(self, peer, request_payload_id, db_results):
index = 0
while index < len(db_results):
data, index = entries_to_chunk(db_results, self.settings.maximum_payload_size, start_index=index)
self.ez_send(peer, SelectResponsePayload(request_payload_id, data))
payload = SelectResponsePayload(request_payload_id, data)
if len(data) > self.settings.maximum_payload_size:
self.eva_send_binary(
peer, struct.pack('>i', request_payload_id), self.ezr_pack(payload.msg_id, payload)
)
drew2a marked this conversation as resolved.
Show resolved Hide resolved
else:
self.ez_send(peer, payload)

@lazy_wrapper(RemoteSelectPayload)
async def on_remote_select(self, peer, request_payload):
Expand All @@ -157,7 +183,7 @@ async def on_remote_select_response(self, peer, response_payload):
"""
self.logger.info(f"Response from {hexlify(peer.mid)}")

# ACHTUNG! the returned request cache can be either a SelectRequest or PushbackWindow
# ACHTUNG! the returned request cache can be any one of SelectRequest, PushbackWindow
request = self.request_cache.get(hexlify(peer.mid), response_payload.id)
if request is None:
return
Expand All @@ -171,23 +197,36 @@ async def on_remote_select_response(self, peer, response_payload):
processing_results = await self.mds.process_compressed_mdblob_threaded(response_payload.raw_blob)
self.logger.info(f"Response result: {processing_results}")

# If we now about updated versions of the received stuff, push the updates back
# If we know about updated versions of the received stuff, push the updates back
if isinstance(request, SelectRequest) and self.settings.push_updates_back_enabled:
newer_entities = [md for md, result in processing_results if result == GOT_NEWER_VERSION]
newer_entities = [r.md_obj for r in processing_results if r.obj_state == ObjState.GOT_NEWER_VERSION]
self.send_db_results(peer, response_payload.id, newer_entities)

# Query back the sender for preview contents for the new channels
# TODO: maybe transform this into a processing_callback?
if self.settings.channel_query_back_enabled:
new_channels = [md for md, result in processing_results if result in (UNKNOWN_CHANNEL, UNKNOWN_COLLECTION)]
for channel in new_channels:
request_dict = {
"channel_pk": hexlify(channel.public_key),
"origin_id": channel.id_,
"first": 0,
"last": self.settings.max_channel_query_back,
}
self.send_remote_select(peer=peer, **request_dict)
for result in processing_results:
# Query back the sender for preview contents for the new channels
# The fact that the object is previously unknown is indicated by process_payload in the
# .obj_state property of returned ProcessingResults objects.
if result.obj_state == ObjState.UNKNOWN_OBJECT and result.md_obj.metadata_type in (
CHANNEL_TORRENT,
COLLECTION_NODE,
):
request_dict = {
"metadata_type": [COLLECTION_NODE, REGULAR_TORRENT],
"channel_pk": hexlify(result.md_obj.public_key),
"origin_id": result.md_obj.id_,
"first": 0,
"last": self.settings.max_channel_query_back,
}
self.send_remote_select(peer=peer, **request_dict)

# Query back for missing dependencies, e.g. thumbnail/description.
# The fact that some dependency is missing is checked by the lower layer during
# the query to process_payload and indicated through .missing_deps property of the
# ProcessingResults objects returned by process_payload.
for dep_query_dict in result.missing_deps:
self.send_remote_select(peer=peer, **dep_query_dict)

if isinstance(request, SelectRequest) and request.processing_callback:
request.processing_callback(request, processing_results)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ def fake_on_acknowledgement0(peer, payload):
assert isinstance(self.overlay(0).most_recent_received_exception, TransferException)
assert isinstance(self.overlay(1).most_recent_received_exception, SizeLimitException)

@pytest.mark.timeout(10)
async def test_wrong_message_order_and_wrong_nonce(self):
self.overlay(0).eva_protocol.terminate_by_timeout_enabled = False
self.overlay(1).eva_protocol.terminate_by_timeout_enabled = False
Expand Down
Loading