Skip to content

Commit

Permalink
Remote search v1
Browse files Browse the repository at this point in the history
  • Loading branch information
drew2a committed Jan 6, 2022
1 parent d8cf392 commit 444a030
Show file tree
Hide file tree
Showing 10 changed files with 57 additions and 24 deletions.
1 change: 1 addition & 0 deletions src/tribler-common/tribler_common/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def test_extract_tags():
assert extract_tags('####') == (set(), '####')

assert extract_tags('#tag') == ({'tag'}, '')
assert extract_tags('#Tag') == ({'tag'}, '')
assert extract_tags('a #tag in the middle') == ({'tag'}, 'a in the middle')
assert extract_tags('at the end of the query #tag') == ({'tag'}, 'at the end of the query ')
assert extract_tags('multiple tags: #tag1 #tag2#tag3') == ({'tag1', 'tag2', 'tag3'}, 'multiple tags: ')
Expand Down
3 changes: 2 additions & 1 deletion src/tribler-common/tribler_common/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ def extract_tags(text: str) -> Tuple[Set[str], str]:
positions = [0]

for m in tags_re.finditer(text):
tags.add(m.group(0)[1:])
tag = m.group(0)[1:]
tags.add(tag.lower())
positions.extend(itertools.chain.from_iterable(m.regs))
positions.append(len(text))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ async def _send_remote_select(peer):

def send_search_request(self, **kwargs):
# Send a remote query request to multiple random peers to search for some terms
self.logger.info('Send search request')
request_uuid = uuid.uuid4()

def notify_gui(request, processing_results):
Expand All @@ -221,8 +222,10 @@ def notify_gui(request, processing_results):
peers_to_query = self.get_known_subscribed_peers_for_node(
kwargs["channel_pk"], kwargs["origin_id"], self.settings.max_mapped_query_peers
)
self.logger.info(f'Use {len(peers_to_query)} known peers')
else:
peers_to_query = self.get_random_peers(self.rqc_settings.max_query_peers)
self.logger.info(f'Use {len(peers_to_query)} random peers')

for p in peers_to_query:
self.send_remote_select(p, **kwargs, processing_callback=notify_gui)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from tribler_core.components.ipv8.ipv8_component import INFINITE, Ipv8Component
from tribler_core.components.metadata_store.metadata_store_component import MetadataStoreComponent
from tribler_core.components.reporter.reporter_component import ReporterComponent
from tribler_core.components.tag.tag_component import TagComponent


class GigaChannelComponent(Component):
Expand All @@ -24,6 +25,7 @@ async def run(self):

self._ipv8_component = await self.require_component(Ipv8Component)
metadata_store_component = await self.require_component(MetadataStoreComponent)
tag_component = await self.get_component(TagComponent)

giga_channel_cls = GigaChannelTestnetCommunity if config.general.testnet else GigaChannelCommunity
community = giga_channel_cls(
Expand All @@ -35,6 +37,7 @@ async def run(self):
rqc_settings=config.remote_query_community,
metadata_store=metadata_store_component.mds,
max_peers=50,
tags_db=tag_component.tags_db if tag_component else None
)
self.community = community
self._ipv8_component.initialise_community_by_default(community, default_random_walk_max_peers=30)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import json
import struct
from asyncio import Future
from typing import List, Optional, Set

from binascii import unhexlify
from pony.orm import db_session
from pony.orm.dbapiprovider import OperationalError

from ipv8.lazy_community import lazy_wrapper
from ipv8.messaging.lazy_payload import VariablePayload, vp_compile
from ipv8.requestcache import NumberCache, RandomNumberCache, RequestCache

from pony.orm.dbapiprovider import OperationalError

from tribler_core.components.ipv8.tribler_community import TriblerCommunity
from tribler_core.components.metadata_store.db.orm_bindings.channel_metadata import LZ4_EMPTY_ARCHIVE, entries_to_chunk
from tribler_core.components.metadata_store.db.serialization import CHANNEL_TORRENT, COLLECTION_NODE, REGULAR_TORRENT
Expand All @@ -17,6 +18,7 @@
from tribler_core.components.metadata_store.remote_query_community.payload_checker import ObjState
from tribler_core.components.metadata_store.remote_query_community.settings import RemoteQueryCommunitySettings
from tribler_core.components.metadata_store.utils import RequestTimeoutException
from tribler_core.components.tag.community.tag_validator import is_valid_tag, validate_tag
from tribler_core.utilities.unicode import hexlify

BINARY_FIELDS = ("infohash", "channel_pk")
Expand Down Expand Up @@ -129,12 +131,13 @@ class RemoteQueryCommunity(TriblerCommunity, EVAProtocolMixin):
def __init__(self, my_peer, endpoint, network,
rqc_settings: RemoteQueryCommunitySettings = None,
metadata_store=None,
tags_db=None,
**kwargs):
super().__init__(my_peer, endpoint, network=network, **kwargs)

self.rqc_settings = rqc_settings
self.mds: MetadataStore = metadata_store

self.tags_db = tags_db
# This object stores requests for "select" queries that we sent to other hosts.
# We keep track of peers we actually requested for data so people can't randomly push spam at us.
# Also, this keeps track of hosts we responded to. There is a possibility that
Expand Down Expand Up @@ -188,9 +191,18 @@ async def process_rpc_query(self, json_bytes: bytes):
:raises ValueError: if no JSON could be decoded.
:raises pony.orm.dbapiprovider.OperationalError: if an illegal query was performed.
"""
request_sanitized = sanitize_query(json.loads(json_bytes), self.rqc_settings.max_response_size)
parameters = json.loads(json_bytes)
parameters['infohash_set'] = await self.mds.run_threaded(self.search_for_tags, parameters.get('tags'))
request_sanitized = sanitize_query(parameters, self.rqc_settings.max_response_size)
return await self.mds.get_entries_threaded(**request_sanitized)

@db_session
def search_for_tags(self, tags: Optional[List[str]]) -> Optional[Set[bytes]]:
if not tags or not self.tags_db:
return None
valid_tags = {tag for tag in tags if is_valid_tag(tag)}
return self.tags_db.get_infohashes(valid_tags)

def send_db_results(self, peer, request_payload_id, db_results, force_eva_response=False):

# Special case of empty results list - sending empty lz4 archive
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ def sanitize_parameters(self, parameters):
)
@querystring_schema(RemoteQueryParameters)
async def create_remote_search_request(self, request):
self._logger.info(f'Create remote search request')
# Query remote results from the GigaChannel Community.
# Results are returned over the Events endpoint.
try:
sanitized = self.sanitize_parameters(request.query)
except (ValueError, KeyError) as e:
return RESTResponse({"error": f"Error processing request parameters: {e}"}, status=HTTP_BAD_REQUEST)
self._logger.info(f'Parameters: {sanitized}')

request_uuid, peers_list = self.gigachannel_community.send_search_request(**sanitized)
peers_mid_list = [hexlify(p.mid) for p in peers_list]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ def search_db():
try:
with db_session:
if tags:
lower_tags = {tag.lower() for tag in tags}
infohash_set = self.tags_db.get_infohashes(lower_tags)
sanitized['infohash_set'] = infohash_set
sanitized['infohash_set'] = self.tags_db.get_infohashes(set(tags))

search_results, total, max_rowid = await mds.run_threaded(search_db)
except Exception as e: # pylint: disable=broad-except; # pragma: no cover
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,13 @@ def validate_tag(tag: str):
raise ValueError('Tag should not contain any spaces')


def is_valid_tag(tag: str) -> bool:
try:
validate_tag(tag)
except ValueError:
return False
return True


def validate_operation(operation: int):
TagOperationEnum(operation)
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import pytest

from tribler_core.components.tag.community.tag_payload import TagOperationEnum
from tribler_core.components.tag.community.tag_validator import validate_operation, validate_tag
from tribler_core.components.tag.community.tag_validator import is_valid_tag, validate_operation, validate_tag

pytestmark = pytest.mark.asyncio

Expand Down Expand Up @@ -49,3 +49,10 @@ async def test_contains_upper_case_not_latin():
async def test_contain_any_space():
with pytest.raises(ValueError):
validate_tag('tag with space')


async def test_is_valid_tag():
# test that is_valid_tag works similar to validate_tag but it returns `bool`
# instead of raise the ValueError exception
assert is_valid_tag('valid-tag')
assert not is_valid_tag('invalid tag')
26 changes: 12 additions & 14 deletions src/tribler-gui/tribler_gui/widgets/searchresultswidget.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@
from PyQt5 import uic

from tribler_common.utilities import Query, to_fts_query

from tribler_core.components.metadata_store.db.serialization import CHANNEL_TORRENT, COLLECTION_NODE, REGULAR_TORRENT

from tribler_gui.sentry_mixin import AddBreadcrumbOnShowMixin
from tribler_gui.tribler_request_manager import TriblerNetworkRequest
from tribler_gui.utilities import connect, get_ui_file_path, tr
Expand All @@ -25,11 +23,11 @@ def format_search_loading_label(search_request):
}

return (
tr(
"Remote responses: %(num_complete_peers)i / %(total_peers)i"
"\nNew remote results received: %(num_remote_results)i"
)
% data
tr(
"Remote responses: %(num_complete_peers)i / %(total_peers)i"
"\nNew remote results received: %(num_remote_results)i"
)
% data
)


Expand Down Expand Up @@ -101,9 +99,9 @@ def show_results(self, *_):

def check_can_show(self, query):
if (
self.last_search_query == query
and self.last_search_time is not None
and time.time() - self.last_search_time < 1
self.last_search_query == query
and self.last_search_time is not None
and time.time() - self.last_search_time < 1
):
self._logger.info("Same search query already sent within 500ms so dropping this one")
return False
Expand All @@ -128,7 +126,7 @@ def register_request(response):
self.timeout_progress_bar.start()
self.setCurrentWidget(self.loading_page)

params = {'txt_filter': fts_query, 'hide_xxx': self.hide_xxx}
params = {'txt_filter': fts_query, 'hide_xxx': self.hide_xxx, 'tags': list(query.tags)}
TriblerNetworkRequest('remote_query', register_request, method="PUT", url_params=params)
return True

Expand All @@ -138,9 +136,9 @@ def reset(self):

def update_loading_page(self, remote_results):
if (
not self.search_request
or remote_results.get("uuid") != self.search_request.uuid
or self.currentWidget() == self.results_page
not self.search_request
or remote_results.get("uuid") != self.search_request.uuid
or self.currentWidget() == self.results_page
):
return
peer = remote_results["peer"]
Expand Down

0 comments on commit 444a030

Please sign in to comment.