Skip to content

Commit

Permalink
Added search request & response payload structure
Browse files Browse the repository at this point in the history
  • Loading branch information
xoriole committed Jun 18, 2018
1 parent 667c48f commit dcc815a
Show file tree
Hide file tree
Showing 4 changed files with 232 additions and 13 deletions.
70 changes: 70 additions & 0 deletions Tribler/Test/Community/popular/test_payload.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import random
import string
from unittest import TestCase

from Tribler.community.popular.payload import SearchResponsePayload, SearchResponseItemPayload
from Tribler.pyipv8.ipv8.messaging.serialization import Serializer


class TestSerializer(TestCase):

def setUp(self):
self.serializer = Serializer()

def test_search_result_payload_serialization(self):
"""
Test serialization & deserialization of search payload
:return:
"""

def random_string(size=6, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))

def random_infohash():
return ''.join(random.choice('0123456789abcdef') for _ in range(20))

# sample search response items
sample_items = []
for index in range(10):
infohash = random_infohash()
name = random_string()
length = random.randint(1000, 9999)
num_files = random.randint(1, 10)
category_list = ['video', 'audio']
creation_date = random.randint(1000000, 111111111)
seeders = random.randint(10, 200)
leechers = random.randint(5, 1000)
cid = random_string(size=20)

sample_items.append(SearchResponseItemPayload(infohash, name, length, num_files, category_list,
creation_date, seeders, leechers, cid))

# Search identifier
identifier = 111

# Serialize the results
results = ''
for item in sample_items:
results += self.serializer.pack_multiple(item.to_pack_list())
serialized_results = self.serializer.pack_multiple(SearchResponsePayload(identifier, results).to_pack_list())

# De-serialize the response payload and check the identifier and get the results
response_format = SearchResponsePayload.format_list
(search_results, _) = self.serializer.unpack_multiple(response_format, serialized_results)

# De-serialize each individual search result items
item_format = SearchResponseItemPayload.format_list
(all_items, _) = self.serializer.unpack_multiple_as_list(item_format, search_results[1])
for index in xrange(len(all_items)):
response_item = SearchResponseItemPayload.from_unpack_list(*all_items[index])
sample_item = sample_items[index]

self.assertEqual(sample_item.infohash, response_item.infohash)
self.assertEqual(sample_item.name, response_item.name)
self.assertEqual(sample_item.length, response_item.length)
self.assertEqual(sample_item.num_files, response_item.num_files)
self.assertEqual(sample_item.creation_date, response_item.creation_date)
self.assertEqual(sample_item.category_list, response_item.category_list)
self.assertEqual(sample_item.seeders, response_item.seeders)
self.assertEqual(sample_item.leechers, response_item.leechers)
self.assertEqual(sample_item.cid, response_item.cid)
16 changes: 10 additions & 6 deletions Tribler/Test/Community/popular/test_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ def fake_logger_error(repo, *args):
repo.unknown_torrent = True
repo.logger_error_called = True

def update_torrent(repo, _):
repo.update_torrent_called = True

original_logger = self.content_repository._logger
self.content_repository._logger.error = lambda *args, **kw: fake_logger_error(self.content_repository, *args)

Expand All @@ -67,18 +70,19 @@ def fake_logger_error(repo, *args):
self.content_repository.torrent_db = None
self.content_repository.logger_error_called = False

self.content_repository.update_torrent(fake_torrent_health_payload, peer_trust=0)
self.content_repository.update_torrent_health(fake_torrent_health_payload, peer_trust=0)
self.assertTrue(self.content_repository.torrent_db_none)
self.assertTrue(self.content_repository.logger_error_called)

# Case2: torrent db does not have torrent
self.content_repository.torrent_db = MockObject()
self.content_repository.torrent_db.hasTorrent = lambda _: False
self.content_repository.torrent_db.updateTorrent = lambda infohash, *args, **kw: \
update_torrent(self.content_repository, infohash)
self.content_repository.logger_error_called = False
self.content_repository.has_torrent = lambda infohash: False

self.content_repository.update_torrent(fake_torrent_health_payload, peer_trust=0)
self.assertTrue(self.content_repository.logger_error_called)
self.assertTrue(self.content_repository.unknown_torrent)
self.content_repository.update_torrent_health(fake_torrent_health_payload, peer_trust=0)
self.assertTrue(self.content_repository.update_torrent_called)

# restore logger
self.content_repository._logger = original_logger
Expand Down Expand Up @@ -132,6 +136,6 @@ def get_torrent(infohash):
lambda infohash, *args, **kw: update_torrent(self.content_repository, infohash)

self.content_repository.update_torrent_called = False
self.content_repository.update_torrent(sample_payload, peer_trust=peer_trust)
self.content_repository.update_torrent_health(sample_payload, peer_trust=peer_trust)

return self.content_repository.update_torrent_called
42 changes: 40 additions & 2 deletions Tribler/community/popular/community.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import time
from twisted.python.failure import Failure

from Tribler.pyipv8.ipv8.requestcache import RandomNumberCache, RequestCache
from copy import copy
from twisted.internet.defer import inlineCallbacks
from twisted.internet.defer import inlineCallbacks, Deferred
from twisted.internet.task import LoopingCall

from Tribler.community.popular.payload import TorrentHealthPayload, ContentSubscription, TorrentInfoRequestPayload, \
Expand All @@ -25,7 +29,37 @@
ERROR_NO_CONTENT = "Nothing to publish"


class Request(RandomNumberCache):
"""
This request cache keeps track of all outstanding requests within the PopularCommunity.
"""
def __init__(self, community, peer, params=None):
super(Request, self).__init__(community.request_cache, u'request')
self.peer = peer
self.params = params
self.deferred = Deferred()
self.start_time = time.time()

@property
def timeout_delay(self):
return 5.0

def on_timeout(self):
if not self.deferred.called:
self._logger.error('Request to %s timed out', self.peer.address)
self.peer.failed += 1
self.deferred.errback(Failure(RuntimeError("Peer timeout")))

def on_complete(self):
self.peer.last_response = time.time()
self.peer.failed = 0
self.peer.rtt = time.time() - self.start_time


class PopularCommunity(Community):
"""
Community for disseminating the content across the network. Follows publish-subscribe model.
"""

master_peer = Peer("3081a7301006072a8648ce3d020106052b810400270381920004073beff7002b6a9fc2824a3b1bbb1c4fc32546261"
"e3ef7537874560346c5fdc0c17fe654f67d23b08cbb44141879f79a7a4c8deddf9beb4fbc7a0f02ee1586ccebedb6"
Expand All @@ -39,6 +73,7 @@ def __init__(self, *args, **kwargs):

# Handles database stuffs
self.content_repository = ContentRepository(self.torrent_db)
self.request_cache = RequestCache()

# Register messages
self.decode_map.update({
Expand All @@ -55,6 +90,8 @@ def __init__(self, *args, **kwargs):
self.subscribers = set()
self.publishers = set()

self._logger.info('Popular Community initialized (peer mid %s)', self.my_peer.mid.encode('HEX'))

def start(self):
"""
Starts the community by subscribing to peers, and periodically publishing the content updates to
Expand All @@ -74,6 +111,7 @@ def start_publishing():

@inlineCallbacks
def unload(self):
self.request_cache.clear()
self.content_repository.cleanup()
self.content_repository = None

Expand Down Expand Up @@ -209,7 +247,7 @@ def on_torrent_info_request(self, source_address, data):
self._logger.error(ERROR_UNKNOWN_RESPONSE)
return

self.send_torrent_info_response(payload.infohash, peer=peer)
self.send_torrent_info_response(str(payload.infohash), peer=peer)

def on_torrent_info_response(self, source_address, data):
"""
Expand Down
117 changes: 112 additions & 5 deletions Tribler/community/popular/payload.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,23 @@
from struct import pack, unpack_from, calcsize

from Tribler.pyipv8.ipv8.deprecated.payload import Payload


def encode_values(values):
return ''.join([pack('!H', len(value)) + value for value in values])


def decode_values(values_str):
values = []
index = 0
while index < len(values_str):
length = unpack_from('!H', values_str)[0]
index += calcsize('!H')
values.append(values_str[index:index + length])
index += length
return values


class ContentSubscription(Payload):

format_list = ['I']
Expand Down Expand Up @@ -32,7 +49,7 @@ def __init__(self, infohash, num_seeders, num_leechers, timestamp):
self._timestamp = timestamp

def to_pack_list(self):
data = [('20s', str(self.infohash)),
data = [('20s', self.infohash),
('I', self.num_seeders),
('I', self.num_leechers),
('f', self.timestamp)]
Expand Down Expand Up @@ -77,7 +94,7 @@ def __init__(self, channel_id, num_votes, num_torrents, swarm_size_sum, timestam
self.timestamp = timestamp

def to_pack_list(self):
data = [('varlenI', str(self.channel_id)),
data = [('varlenI', self.channel_id),
('I', self.num_votes),
('I', self.num_torrents),
('I', self.swarm_size_sum),
Expand All @@ -102,7 +119,7 @@ def __init__(self, infohash):
self._infohash = infohash

def to_pack_list(self):
data = [('20s', str(self.infohash))]
data = [('20s', self.infohash)]
return data

@classmethod
Expand Down Expand Up @@ -131,12 +148,12 @@ def __init__(self, infohash, name, length, creation_date, num_files, comment):
self._comment = comment

def to_pack_list(self):
data = [('20s', str(self.infohash)),
data = [('20s', self.infohash),
('varlenH', str(self._name)),
('I', self._length),
('I', self._creation_date),
('I', self._num_files),
('varlenH', self._comment)]
('varlenH', str(self._comment))]
return data

@classmethod
Expand All @@ -147,3 +164,93 @@ def from_unpack_list(cls, *args):
@property
def infohash(self):
return self._infohash


class SearchRequestPayload(Payload):
"""
Payload for search request
"""
format_list = ['I', 'varlenH']

def __init__(self, timestamp, query):
super(SearchRequestPayload, self).__init__()
self._timestamp = timestamp
self._query = query

def to_pack_list(self):
data = [('I', self._timestamp),
('varlenH', str(self._query))]
return data

@classmethod
def from_unpack_list(cls, *args):
(timestamp, query) = args
return SearchRequestPayload(timestamp, query)

@property
def timestamp(self):
return self._timestamp

@property
def query(self):
return self._query


class SearchResponseItemPayload(Payload):
"""
Payload for search response items
"""

format_list = ['20s', 'varlenH', 'I', 'I', 'varlenH', 'I', 'I', 'I', '20s']
is_list_descriptor = True

def __init__(self, infohash, name, length, num_files, category_list, creation_date, seeders, leechers, cid):
self.infohash = infohash
self.name = name
self.length = length
self.num_files = num_files
self.category_list = category_list
self.creation_date = creation_date
self.seeders = seeders
self.leechers = leechers
self.cid = cid

def to_pack_list(self):
data = [('20s', str(self.infohash)),
('varlenH', self.name),
('I', self.length),
('I', self.num_files),
('varlenH', encode_values(self.category_list)),
('I', self.creation_date),
('I', self.seeders),
('I', self.leechers),
('20s', self.cid)]
return data

@classmethod
def from_unpack_list(cls, *args):
(infohash, name, length, num_files, category_list_str, creation_date, seeders, leechers, cid) = args
category_list = decode_values(category_list_str)
return SearchResponseItemPayload(infohash, name, length, num_files, category_list, creation_date, seeders,
leechers, cid)


class SearchResponsePayload(Payload):
"""
Payload for search response
"""
format_list = ['varlenI', 'varlenH']

def __init__(self, identifier, results):
self.identifier = identifier
self.results = results

def to_pack_list(self):
data = [('varlenI', str(self.identifier)),
('varlenH', self.results)]
return data

@classmethod
def from_unpack_list(cls, *args):
(identifier, results) = args
return SearchResponsePayload(identifier, results)

0 comments on commit dcc815a

Please sign in to comment.