From dcc815ad625fe02ec09a41f2d8ff056809b72e4b Mon Sep 17 00:00:00 2001 From: Sandip Pandey Date: Wed, 13 Jun 2018 17:07:59 +0200 Subject: [PATCH] Added search request & response payload structure --- .../Test/Community/popular/test_payload.py | 70 +++++++++++ .../Test/Community/popular/test_repository.py | 16 ++- Tribler/community/popular/community.py | 42 ++++++- Tribler/community/popular/payload.py | 117 +++++++++++++++++- 4 files changed, 232 insertions(+), 13 deletions(-) create mode 100644 Tribler/Test/Community/popular/test_payload.py diff --git a/Tribler/Test/Community/popular/test_payload.py b/Tribler/Test/Community/popular/test_payload.py new file mode 100644 index 00000000000..4fe9432a446 --- /dev/null +++ b/Tribler/Test/Community/popular/test_payload.py @@ -0,0 +1,70 @@ +import random +import string +from unittest import TestCase + +from Tribler.community.popular.payload import SearchResponsePayload, SearchResponseItemPayload +from Tribler.pyipv8.ipv8.messaging.serialization import Serializer + + +class TestSerializer(TestCase): + + def setUp(self): + self.serializer = Serializer() + + def test_search_result_payload_serialization(self): + """ + Test serialization & deserialization of search payload + :return: + """ + + def random_string(size=6, chars=string.ascii_uppercase + string.digits): + return ''.join(random.choice(chars) for _ in range(size)) + + def random_infohash(): + return ''.join(random.choice('0123456789abcdef') for _ in range(20)) + + # sample search response items + sample_items = [] + for index in range(10): + infohash = random_infohash() + name = random_string() + length = random.randint(1000, 9999) + num_files = random.randint(1, 10) + category_list = ['video', 'audio'] + creation_date = random.randint(1000000, 111111111) + seeders = random.randint(10, 200) + leechers = random.randint(5, 1000) + cid = random_string(size=20) + + sample_items.append(SearchResponseItemPayload(infohash, name, length, num_files, category_list, + creation_date, seeders, leechers, cid)) + + # Search identifier + identifier = 111 + + # Serialize the results + results = '' + for item in sample_items: + results += self.serializer.pack_multiple(item.to_pack_list()) + serialized_results = self.serializer.pack_multiple(SearchResponsePayload(identifier, results).to_pack_list()) + + # De-serialize the response payload and check the identifier and get the results + response_format = SearchResponsePayload.format_list + (search_results, _) = self.serializer.unpack_multiple(response_format, serialized_results) + + # De-serialize each individual search result items + item_format = SearchResponseItemPayload.format_list + (all_items, _) = self.serializer.unpack_multiple_as_list(item_format, search_results[1]) + for index in xrange(len(all_items)): + response_item = SearchResponseItemPayload.from_unpack_list(*all_items[index]) + sample_item = sample_items[index] + + self.assertEqual(sample_item.infohash, response_item.infohash) + self.assertEqual(sample_item.name, response_item.name) + self.assertEqual(sample_item.length, response_item.length) + self.assertEqual(sample_item.num_files, response_item.num_files) + self.assertEqual(sample_item.creation_date, response_item.creation_date) + self.assertEqual(sample_item.category_list, response_item.category_list) + self.assertEqual(sample_item.seeders, response_item.seeders) + self.assertEqual(sample_item.leechers, response_item.leechers) + self.assertEqual(sample_item.cid, response_item.cid) diff --git a/Tribler/Test/Community/popular/test_repository.py b/Tribler/Test/Community/popular/test_repository.py index 15c85934c94..2be83c59a3e 100644 --- a/Tribler/Test/Community/popular/test_repository.py +++ b/Tribler/Test/Community/popular/test_repository.py @@ -57,6 +57,9 @@ def fake_logger_error(repo, *args): repo.unknown_torrent = True repo.logger_error_called = True + def update_torrent(repo, _): + repo.update_torrent_called = True + original_logger = self.content_repository._logger self.content_repository._logger.error = lambda *args, **kw: fake_logger_error(self.content_repository, *args) @@ -67,18 +70,19 @@ def fake_logger_error(repo, *args): self.content_repository.torrent_db = None self.content_repository.logger_error_called = False - self.content_repository.update_torrent(fake_torrent_health_payload, peer_trust=0) + self.content_repository.update_torrent_health(fake_torrent_health_payload, peer_trust=0) self.assertTrue(self.content_repository.torrent_db_none) self.assertTrue(self.content_repository.logger_error_called) # Case2: torrent db does not have torrent self.content_repository.torrent_db = MockObject() - self.content_repository.torrent_db.hasTorrent = lambda _: False + self.content_repository.torrent_db.updateTorrent = lambda infohash, *args, **kw: \ + update_torrent(self.content_repository, infohash) self.content_repository.logger_error_called = False + self.content_repository.has_torrent = lambda infohash: False - self.content_repository.update_torrent(fake_torrent_health_payload, peer_trust=0) - self.assertTrue(self.content_repository.logger_error_called) - self.assertTrue(self.content_repository.unknown_torrent) + self.content_repository.update_torrent_health(fake_torrent_health_payload, peer_trust=0) + self.assertTrue(self.content_repository.update_torrent_called) # restore logger self.content_repository._logger = original_logger @@ -132,6 +136,6 @@ def get_torrent(infohash): lambda infohash, *args, **kw: update_torrent(self.content_repository, infohash) self.content_repository.update_torrent_called = False - self.content_repository.update_torrent(sample_payload, peer_trust=peer_trust) + self.content_repository.update_torrent_health(sample_payload, peer_trust=peer_trust) return self.content_repository.update_torrent_called diff --git a/Tribler/community/popular/community.py b/Tribler/community/popular/community.py index 658bf9777b1..77695863ab0 100644 --- a/Tribler/community/popular/community.py +++ b/Tribler/community/popular/community.py @@ -1,5 +1,9 @@ +import time +from twisted.python.failure import Failure + +from Tribler.pyipv8.ipv8.requestcache import RandomNumberCache, RequestCache from copy import copy -from twisted.internet.defer import inlineCallbacks +from twisted.internet.defer import inlineCallbacks, Deferred from twisted.internet.task import LoopingCall from Tribler.community.popular.payload import TorrentHealthPayload, ContentSubscription, TorrentInfoRequestPayload, \ @@ -25,7 +29,37 @@ ERROR_NO_CONTENT = "Nothing to publish" +class Request(RandomNumberCache): + """ + This request cache keeps track of all outstanding requests within the PopularCommunity. + """ + def __init__(self, community, peer, params=None): + super(Request, self).__init__(community.request_cache, u'request') + self.peer = peer + self.params = params + self.deferred = Deferred() + self.start_time = time.time() + + @property + def timeout_delay(self): + return 5.0 + + def on_timeout(self): + if not self.deferred.called: + self._logger.error('Request to %s timed out', self.peer.address) + self.peer.failed += 1 + self.deferred.errback(Failure(RuntimeError("Peer timeout"))) + + def on_complete(self): + self.peer.last_response = time.time() + self.peer.failed = 0 + self.peer.rtt = time.time() - self.start_time + + class PopularCommunity(Community): + """ + Community for disseminating the content across the network. Follows publish-subscribe model. + """ master_peer = Peer("3081a7301006072a8648ce3d020106052b810400270381920004073beff7002b6a9fc2824a3b1bbb1c4fc32546261" "e3ef7537874560346c5fdc0c17fe654f67d23b08cbb44141879f79a7a4c8deddf9beb4fbc7a0f02ee1586ccebedb6" @@ -39,6 +73,7 @@ def __init__(self, *args, **kwargs): # Handles database stuffs self.content_repository = ContentRepository(self.torrent_db) + self.request_cache = RequestCache() # Register messages self.decode_map.update({ @@ -55,6 +90,8 @@ def __init__(self, *args, **kwargs): self.subscribers = set() self.publishers = set() + self._logger.info('Popular Community initialized (peer mid %s)', self.my_peer.mid.encode('HEX')) + def start(self): """ Starts the community by subscribing to peers, and periodically publishing the content updates to @@ -74,6 +111,7 @@ def start_publishing(): @inlineCallbacks def unload(self): + self.request_cache.clear() self.content_repository.cleanup() self.content_repository = None @@ -209,7 +247,7 @@ def on_torrent_info_request(self, source_address, data): self._logger.error(ERROR_UNKNOWN_RESPONSE) return - self.send_torrent_info_response(payload.infohash, peer=peer) + self.send_torrent_info_response(str(payload.infohash), peer=peer) def on_torrent_info_response(self, source_address, data): """ diff --git a/Tribler/community/popular/payload.py b/Tribler/community/popular/payload.py index 34621f2ed36..41a9a7384ba 100644 --- a/Tribler/community/popular/payload.py +++ b/Tribler/community/popular/payload.py @@ -1,6 +1,23 @@ +from struct import pack, unpack_from, calcsize + from Tribler.pyipv8.ipv8.deprecated.payload import Payload +def encode_values(values): + return ''.join([pack('!H', len(value)) + value for value in values]) + + +def decode_values(values_str): + values = [] + index = 0 + while index < len(values_str): + length = unpack_from('!H', values_str)[0] + index += calcsize('!H') + values.append(values_str[index:index + length]) + index += length + return values + + class ContentSubscription(Payload): format_list = ['I'] @@ -32,7 +49,7 @@ def __init__(self, infohash, num_seeders, num_leechers, timestamp): self._timestamp = timestamp def to_pack_list(self): - data = [('20s', str(self.infohash)), + data = [('20s', self.infohash), ('I', self.num_seeders), ('I', self.num_leechers), ('f', self.timestamp)] @@ -77,7 +94,7 @@ def __init__(self, channel_id, num_votes, num_torrents, swarm_size_sum, timestam self.timestamp = timestamp def to_pack_list(self): - data = [('varlenI', str(self.channel_id)), + data = [('varlenI', self.channel_id), ('I', self.num_votes), ('I', self.num_torrents), ('I', self.swarm_size_sum), @@ -102,7 +119,7 @@ def __init__(self, infohash): self._infohash = infohash def to_pack_list(self): - data = [('20s', str(self.infohash))] + data = [('20s', self.infohash)] return data @classmethod @@ -131,12 +148,12 @@ def __init__(self, infohash, name, length, creation_date, num_files, comment): self._comment = comment def to_pack_list(self): - data = [('20s', str(self.infohash)), + data = [('20s', self.infohash), ('varlenH', str(self._name)), ('I', self._length), ('I', self._creation_date), ('I', self._num_files), - ('varlenH', self._comment)] + ('varlenH', str(self._comment))] return data @classmethod @@ -147,3 +164,93 @@ def from_unpack_list(cls, *args): @property def infohash(self): return self._infohash + + +class SearchRequestPayload(Payload): + """ + Payload for search request + """ + format_list = ['I', 'varlenH'] + + def __init__(self, timestamp, query): + super(SearchRequestPayload, self).__init__() + self._timestamp = timestamp + self._query = query + + def to_pack_list(self): + data = [('I', self._timestamp), + ('varlenH', str(self._query))] + return data + + @classmethod + def from_unpack_list(cls, *args): + (timestamp, query) = args + return SearchRequestPayload(timestamp, query) + + @property + def timestamp(self): + return self._timestamp + + @property + def query(self): + return self._query + + +class SearchResponseItemPayload(Payload): + """ + Payload for search response items + """ + + format_list = ['20s', 'varlenH', 'I', 'I', 'varlenH', 'I', 'I', 'I', '20s'] + is_list_descriptor = True + + def __init__(self, infohash, name, length, num_files, category_list, creation_date, seeders, leechers, cid): + self.infohash = infohash + self.name = name + self.length = length + self.num_files = num_files + self.category_list = category_list + self.creation_date = creation_date + self.seeders = seeders + self.leechers = leechers + self.cid = cid + + def to_pack_list(self): + data = [('20s', str(self.infohash)), + ('varlenH', self.name), + ('I', self.length), + ('I', self.num_files), + ('varlenH', encode_values(self.category_list)), + ('I', self.creation_date), + ('I', self.seeders), + ('I', self.leechers), + ('20s', self.cid)] + return data + + @classmethod + def from_unpack_list(cls, *args): + (infohash, name, length, num_files, category_list_str, creation_date, seeders, leechers, cid) = args + category_list = decode_values(category_list_str) + return SearchResponseItemPayload(infohash, name, length, num_files, category_list, creation_date, seeders, + leechers, cid) + + +class SearchResponsePayload(Payload): + """ + Payload for search response + """ + format_list = ['varlenI', 'varlenH'] + + def __init__(self, identifier, results): + self.identifier = identifier + self.results = results + + def to_pack_list(self): + data = [('varlenI', str(self.identifier)), + ('varlenH', self.results)] + return data + + @classmethod + def from_unpack_list(cls, *args): + (identifier, results) = args + return SearchResponsePayload(identifier, results)