diff --git a/Tribler/Core/APIImplementation/LaunchManyCore.py b/Tribler/Core/APIImplementation/LaunchManyCore.py index ec5ab765634..84078c98afb 100644 --- a/Tribler/Core/APIImplementation/LaunchManyCore.py +++ b/Tribler/Core/APIImplementation/LaunchManyCore.py @@ -103,7 +103,7 @@ def __init__(self): self.tunnel_community = None self.trustchain_community = None self.wallets = {} - self.popular_community = None + self.popularity_community = None self.startup_deferred = Deferred() @@ -289,19 +289,19 @@ def load_ipv8_overlays(self): self.ipv8.strategies.append((RandomWalk(self.market_community), 20)) # Popular Community - if self.session.config.get_popular_community_enabled(): - from Tribler.community.popular.community import PopularCommunity + if self.session.config.get_popularity_community_enabled(): + from Tribler.community.popularity.community import PopularityCommunity local_peer = Peer(self.session.trustchain_keypair) - self.popular_community = PopularCommunity(local_peer, self.ipv8.endpoint, self.ipv8.network, - torrent_db=self.session.lm.torrent_db, session=self.session) + self.popularity_community = PopularityCommunity(local_peer, self.ipv8.endpoint, self.ipv8.network, + torrent_db=self.session.lm.torrent_db, session=self.session) - self.ipv8.overlays.append(self.popular_community) + self.ipv8.overlays.append(self.popularity_community) - self.ipv8.strategies.append((RandomWalk(self.popular_community), 20)) + self.ipv8.strategies.append((RandomWalk(self.popularity_community), 20)) - self.popular_community.start() + self.popularity_community.start() @blocking_call_on_reactor_thread def load_dispersy_communities(self): diff --git a/Tribler/Core/Config/config.spec b/Tribler/Core/Config/config.spec index c514a0b0cf4..2205b6a0284 100644 --- a/Tribler/Core/Config/config.spec +++ b/Tribler/Core/Config/config.spec @@ -116,6 +116,6 @@ enabled = boolean(default=True) sources = string_list(default=list()) max_disk_space = integer(min=0, default=53687091200) -[popular_community] +[popularity_community] enabled = boolean(default=True) cache_dir = string(default=health_cache) \ No newline at end of file diff --git a/Tribler/Core/Config/tribler_config.py b/Tribler/Core/Config/tribler_config.py index 48133d14702..a779a770aae 100644 --- a/Tribler/Core/Config/tribler_config.py +++ b/Tribler/Core/Config/tribler_config.py @@ -531,11 +531,11 @@ def get_dummy_wallets_enabled(self): # Popular Community - def get_popular_community_enabled(self): - return self.config['popular_community']['enabled'] + def get_popularity_community_enabled(self): + return self.config['popularity_community']['enabled'] - def set_popular_community_enabled(self, value): - self.config['popular_community']['enabled'] = value + def set_popularity_community_enabled(self, value): + self.config['popularity_community']['enabled'] = value # Torrent store diff --git a/Tribler/Core/Modules/search_manager.py b/Tribler/Core/Modules/search_manager.py index 9c071bd255d..fcd4ebb697c 100644 --- a/Tribler/Core/Modules/search_manager.py +++ b/Tribler/Core/Modules/search_manager.py @@ -56,9 +56,9 @@ def search_for_torrents(self, keywords): break self._current_keywords = keywords - # If popular community is enabled, send the search request there as well - if self.session.lm.popular_community: - self.session.lm.popular_community.send_torrent_search_request(keywords) + # If popularity community is enabled, send the search request there as well + if self.session.lm.popularity_community: + self.session.lm.popularity_community.send_torrent_search_request(keywords) return nr_requests_made diff --git a/Tribler/Core/TorrentChecker/torrent_checker.py b/Tribler/Core/TorrentChecker/torrent_checker.py index fd01fcf14d2..890bb88ada6 100644 --- a/Tribler/Core/TorrentChecker/torrent_checker.py +++ b/Tribler/Core/TorrentChecker/torrent_checker.py @@ -13,7 +13,7 @@ from Tribler.Core.TorrentChecker.session import create_tracker_session, FakeDHTSession, UdpSocketManager from Tribler.Core.Utilities.tracker_utils import MalformedTrackerURLException from Tribler.Core.simpledefs import NTFY_TORRENTS -from Tribler.community.popular.repository import TYPE_TORRENT_HEALTH +from Tribler.community.popularity.repository import TYPE_TORRENT_HEALTH from Tribler.dispersy.util import blocking_call_on_reactor_thread, call_on_reactor_thread from Tribler.pyipv8.ipv8.taskmanager import TaskManager @@ -199,7 +199,7 @@ def on_gui_request_completed(self, infohash, result): self._update_torrent_result(torrent_update_dict) - # Add this result to popular community to publish to subscribers + # Add this result to popularity community to publish to subscribers self.publish_torrent_result(torrent_update_dict) return final_response @@ -336,7 +336,7 @@ def publish_torrent_result(self, response): self._logger.info("Not publishing zero seeded torrents") return content = (response['infohash'], response['seeders'], response['leechers'], response['last_check']) - if self.tribler_session.lm.popular_community: - self.tribler_session.lm.popular_community.queue_content(TYPE_TORRENT_HEALTH, content) + if self.tribler_session.lm.popularity_community: + self.tribler_session.lm.popularity_community.queue_content(TYPE_TORRENT_HEALTH, content) else: self._logger.info("Popular community not available to publish torrent checker result") diff --git a/Tribler/Test/Community/popular/test_repository.py b/Tribler/Test/Community/popular/test_repository.py deleted file mode 100644 index c84dc6e6ebf..00000000000 --- a/Tribler/Test/Community/popular/test_repository.py +++ /dev/null @@ -1,142 +0,0 @@ -import time -import unittest - -from Tribler.Test.Core.base_test import MockObject -from Tribler.community.popular.payload import TorrentHealthPayload -from Tribler.community.popular.repository import ContentRepository, DEFAULT_FRESHNESS_LIMIT - - -class TestContentRepository(unittest.TestCase): - - def setUp(self): - torrent_db = MockObject() - channel_db = MockObject() - self.content_repository = ContentRepository(torrent_db, channel_db) - - def test_add_content(self): - """ - Test adding and removing content works as expected. - """ - # Initial content queue is zero - self.assertEqual(self.content_repository.num_content(), 0, "No item expected in queue initially") - - # Add a sample content and check the size - sample_content = ('a' * 20, 6, 3, 123456789) - sample_content_type = 1 - self.content_repository.add_content(sample_content_type, sample_content) - self.assertEqual(self.content_repository.num_content(), 1, "One item expected in queue") - - # Pop an item - (content_type, content) = self.content_repository.pop_content() - self.assertEqual(content_type, sample_content_type, "Content type should be equal") - self.assertEqual(content, sample_content, "Content should be equal") - - # Check size again - self.assertEqual(self.content_repository.num_content(), 0, "No item expected in queue") - - def test_get_top_torrents(self): - """ - Test if content repository returns expected top torrents. - """ - - def get_fake_torrents(limit): - return [[chr(x) * 20, x, 0, 1525704192] for x in range(limit)] - - self.content_repository.torrent_db.getRecentlyCheckedTorrents = get_fake_torrents - - limit = 10 - self.assertEqual(self.content_repository.get_top_torrents(limit=limit), get_fake_torrents(limit)) - - def test_update_torrent(self): - - MSG_TORRENT_DB_NONE = "Torrent DB is None" - - def fake_logger_error(repo, *args): - if args[0] == MSG_TORRENT_DB_NONE: - repo.torrent_db_none = True - elif 'unknown' in args[0].lower(): - repo.unknown_torrent = True - repo.logger_error_called = True - - def update_torrent(repo, _): - repo.update_torrent_called = True - - original_logger = self.content_repository.logger - self.content_repository.logger.error = lambda *args, **kw: fake_logger_error(self.content_repository, *args) - - # Assume a fake torrent response - fake_torrent_health_payload = TorrentHealthPayload('a' * 20, 10, 4, time.time()) - - # Case1: torrent db is none - self.content_repository.torrent_db = None - self.content_repository.logger_error_called = False - - self.content_repository.update_torrent_health(fake_torrent_health_payload, peer_trust=0) - self.assertTrue(self.content_repository.torrent_db_none) - self.assertTrue(self.content_repository.logger_error_called) - - # Case2: torrent db does not have torrent - self.content_repository.torrent_db = MockObject() - self.content_repository.torrent_db.updateTorrent = lambda infohash, *args, **kw: \ - update_torrent(self.content_repository, infohash) - self.content_repository.logger_error_called = False - self.content_repository.has_torrent = lambda infohash: False - - self.content_repository.update_torrent_health(fake_torrent_health_payload, peer_trust=0) - self.assertTrue(self.content_repository.update_torrent_called) - - # restore logger - self.content_repository.logger = original_logger - - def test_update_torrent_with_higher_trust(self): - """ - Scenario: The database torrent has still fresh last_check_time and you receive a new response from - peer with trust > 1. - Expect: Torrent in database is updated. - """ - # last_check_time for existing torrent in database - db_last_time_check = time.time() - 10 - # Peer trust, higher than 1 in this scenario - peer_trust = 10 - - # Database record is expected to be updated - self.assertTrue(self.try_torrent_update_with_options(db_last_time_check, peer_trust)) - - def test_update_torrent_with_stale_check_time(self): - """ - Scenario: The database torrent has stale last_check_time and you receive a new response from - peer with no previous trust. - Expect: Torrent in database is still updated. - """ - # last_check_time for existing torrent in database - db_last_time_check = time.time() - DEFAULT_FRESHNESS_LIMIT - # Peer trust, higher than 1 in this scenario - peer_trust = 0 - - # Database record is expected to be updated - self.assertTrue(self.try_torrent_update_with_options(db_last_time_check, peer_trust)) - - def try_torrent_update_with_options(self, db_last_check_time, peer_trust): - """ - Tries updating torrent considering the given last check time of existing torrent and a new response - obtained from a peer with given peer_trust value. - """ - sample_infohash, seeders, leechers, timestamp = 'a' * 20, 10, 5, db_last_check_time - sample_payload = TorrentHealthPayload(sample_infohash, seeders, leechers, timestamp) - - def update_torrent(content_repo, _): - content_repo.update_torrent_called = True - - def get_torrent(infohash): - return {'infohash': infohash, 'num_seeders': seeders, - 'num_leechers': leechers, 'last_tracker_check': timestamp} - - self.content_repository.torrent_db.getTorrent = lambda infohash, **kw: get_torrent(infohash) - self.content_repository.torrent_db.hasTorrent = lambda infohash: infohash == sample_infohash - self.content_repository.torrent_db.updateTorrent = \ - lambda infohash, *args, **kw: update_torrent(self.content_repository, infohash) - - self.content_repository.update_torrent_called = False - self.content_repository.update_torrent_health(sample_payload, peer_trust=peer_trust) - - return self.content_repository.update_torrent_called diff --git a/Tribler/Test/Community/popular/__init__.py b/Tribler/Test/Community/popularity/__init__.py similarity index 100% rename from Tribler/Test/Community/popular/__init__.py rename to Tribler/Test/Community/popularity/__init__.py diff --git a/Tribler/Test/Community/popular/test_community.py b/Tribler/Test/Community/popularity/test_community.py similarity index 53% rename from Tribler/Test/Community/popular/test_community.py rename to Tribler/Test/Community/popularity/test_community.py index 66ed55008ec..a607ee64793 100644 --- a/Tribler/Test/Community/popular/test_community.py +++ b/Tribler/Test/Community/popularity/test_community.py @@ -2,23 +2,25 @@ import string from Tribler.Test.Core.base_test import MockObject -from Tribler.community.popular.community import PopularCommunity, MSG_TORRENT_HEALTH_RESPONSE, \ - MSG_CHANNEL_HEALTH_RESPONSE, ERROR_UNKNOWN_PEER, MSG_SUBSCRIPTION, ERROR_NO_CONTENT, \ +from Tribler.community.popularity import constants +from Tribler.community.popularity.community import PopularityCommunity, MSG_TORRENT_HEALTH_RESPONSE, \ + MSG_CHANNEL_HEALTH_RESPONSE, ERROR_UNKNOWN_PEER, ERROR_NO_CONTENT, \ ERROR_UNKNOWN_RESPONSE -from Tribler.community.popular.constants import SEARCH_TORRENT_REQUEST -from Tribler.community.popular.payload import SearchResponseItemPayload -from Tribler.community.popular.repository import TYPE_TORRENT_HEALTH +from Tribler.community.popularity.constants import SEARCH_TORRENT_REQUEST, MSG_TORRENT_INFO_RESPONSE, MSG_SUBSCRIPTION +from Tribler.community.popularity.payload import SearchResponseItemPayload, TorrentInfoResponsePayload, \ + TorrentHealthPayload, ContentSubscription +from Tribler.community.popularity.repository import TYPE_TORRENT_HEALTH from Tribler.pyipv8.ipv8.test.base import TestBase from Tribler.pyipv8.ipv8.test.mocking.ipv8 import MockIPv8 from Tribler.pyipv8.ipv8.test.util import twisted_wrapper -class TestPopularCommunityBase(TestBase): +class TestPopularityCommunityBase(TestBase): NUM_NODES = 2 def setUp(self): - super(TestPopularCommunityBase, self).setUp() - self.initialize(PopularCommunity, self.NUM_NODES) + super(TestPopularityCommunityBase, self).setUp() + self.initialize(PopularityCommunity, self.NUM_NODES) def create_node(self, *args, **kwargs): def load_random_torrents(limit): @@ -34,24 +36,20 @@ def load_random_torrents(limit): channel_db = MockObject() - return MockIPv8(u"curve25519", PopularCommunity, torrent_db=torrent_db, channel_db=channel_db) + return MockIPv8(u"curve25519", PopularityCommunity, torrent_db=torrent_db, channel_db=channel_db) class MockRepository(object): - def _random_string(self, size=6, chars=string.ascii_uppercase + string.digits): - return ''.join(random.choice(chars) for _ in range(size)) + def __init__(self): + super(MockRepository, self).__init__() + self.sample_torrents = [] + self.setup_torrents() - def _random_infohash(self): - return ''.join(random.choice('0123456789abcdef') for _ in range(20)) - - def search_torrent(self, query): - # sample search response items - query_str = ' '.join(query) - sample_items = [] + def setup_torrents(self): for _ in range(10): infohash = self._random_infohash() - name = query_str + " " + self._random_string() + name = self._random_string() length = random.randint(1000, 9999) num_files = random.randint(1, 10) category_list = ['video', 'audio'] @@ -60,8 +58,19 @@ def search_torrent(self, query): leechers = random.randint(5, 1000) cid = self._random_string(size=20) - sample_items.append(SearchResponseItemPayload(infohash, name, length, num_files, category_list, - creation_date, seeders, leechers, cid)) + self.sample_torrents.append([infohash, name, length, num_files, category_list, creation_date, + seeders, leechers, cid]) + + def _random_string(self, size=6, chars=string.ascii_uppercase + string.digits): + return ''.join(random.choice(chars) for _ in range(size)) + + def _random_infohash(self): + return ''.join(random.choice('0123456789abcdef') for _ in range(20)) + + def search_torrent(self, _): + sample_items = [] + for torrent in self.sample_torrents: + sample_items.append(SearchResponseItemPayload(*torrent)) return sample_items def search_channels(self, _): @@ -77,16 +86,22 @@ def update_from_search_results(self, results): pass def get_torrent(self, _): - return None + torrent = self.sample_torrents[0] + db_torrent = {'name': torrent[1], + 'length': torrent[2], + 'creation_date': torrent[5], + 'num_files': torrent[3], + 'comment': ''} + return db_torrent def get_top_torrents(self): - return [] + return self.sample_torrents def update_from_torrent_search_results(self, search_results): pass -class TestPopularCommunity(TestPopularCommunityBase): +class TestPopularityCommunity(TestPopularityCommunityBase): __testing__ = False NUM_NODES = 2 @@ -95,6 +110,7 @@ def test_subscribe_peers(self): """ Tests subscribing to peers populate publishers and subscribers list. """ + self.nodes[1].overlay.send_torrent_info_response = lambda infohash, peer: None yield self.introduce_nodes() self.nodes[0].overlay.subscribe_peers() yield self.deliver_messages() @@ -109,6 +125,7 @@ def test_subscribe_unsubscribe_individual_peers(self): """ Tests subscribing/subscribing an individual peer. """ + self.nodes[1].overlay.send_torrent_info_response = lambda infohash, peer: None self.nodes[1].overlay.publish_latest_torrents = lambda *args, **kwargs: None yield self.introduce_nodes() @@ -124,9 +141,9 @@ def test_subscribe_unsubscribe_individual_peers(self): self.assertEqual(len(self.nodes[0].overlay.publishers), 0, "Expected no publisher") self.assertEqual(len(self.nodes[1].overlay.subscribers), 0, "Expected no subscriber") - def test_unsubscribe_peers_unit(self): + def test_unsubscribe_multiple_peers(self): """ - Tests unsubscribing peer works as expected. + Tests unsubscribing multiple peers works as expected. """ def send_popular_content_subscribe(my_peer, _, subscribe): if not subscribe: @@ -176,20 +193,36 @@ def test_refresh_peers(self): self.assertEqual(len(self.nodes[0].overlay.subscribers), 5) self.assertEqual(len(self.nodes[0].overlay.publishers), 5) - @twisted_wrapper + @twisted_wrapper(6) def test_start(self): """ Tests starting of the community. Peer should start subscribing to other connected peers. """ + self.nodes[1].overlay.send_torrent_info_response = lambda infohash, peer: None + + def fake_refresh_peer_list(peer): + peer.called_refresh_peer_list = True + + def fake_publish_next_content(peer): + peer.called_publish_next_content = True + + self.nodes[0].called_refresh_peer_list = False + self.nodes[0].called_publish_next_content = False + self.nodes[0].overlay.refresh_peer_list = lambda: fake_refresh_peer_list(self.nodes[0]) + self.nodes[0].overlay.publish_next_content = lambda: fake_publish_next_content(self.nodes[0]) + yield self.introduce_nodes() self.nodes[0].overlay.start() - yield self.deliver_messages() + yield self.sleep(constants.PUBLISH_INTERVAL) # Node 0 should have a publisher added self.assertEqual(len(self.nodes[0].overlay.publishers), 1, "Expected one publisher") # Node 1 should have a subscriber added self.assertEqual(len(self.nodes[1].overlay.subscribers), 1, "Expected one subscriber") + self.assertTrue(self.nodes[0].called_refresh_peer_list) + self.assertTrue(self.nodes[0].called_publish_next_content) + @twisted_wrapper def test_content_publishing(self): """ @@ -241,86 +274,6 @@ def test_publish_no_content(self): # Restore logger self.nodes[0].overlay.logger = original_logger - @twisted_wrapper - def test_subscribe_unsubscribe(self): - """ - Tests sending popular content subscribe request. - """ - original_logger = self.nodes[0].overlay.logger - self.nodes[0].overlay.logger.error = lambda *args, **kw: self.fake_logger_error(self.nodes[0], *args) - - self.nodes[0].overlay.broadcast_message = lambda packet, peer: \ - self.fake_broadcast_message(self.nodes[0], packet, peer) - - # Two default peers - default_peers = [self.create_node() for _ in range(2)] - # Assuming only one is connected - self.nodes[0].overlay.get_peers = lambda: default_peers[:1] - - # Case1: Try to send subscribe request to connected peer - self.nodes[0].broadcast_called = False - self.nodes[0].broadcast_packet_type = None - self.nodes[0].overlay.subscribe(default_peers[0], subscribe=True) - yield self.deliver_messages() - - # Expect peer to be listed in publisher list and message to be sent - self.assertTrue(default_peers[0] in self.nodes[0].overlay.publishers) - self.assertTrue(self.nodes[0].broadcast_called, "Should send a subscribe message to the peer") - self.assertEqual(self.nodes[0].receiver, default_peers[0], "Intended publisher is different") - - # Try unsubscribing now - self.nodes[0].overlay.subscribe(default_peers[0], subscribe=False) - yield self.deliver_messages() - - # peer should no longer be in publisher list - self.assertTrue(default_peers[0] not in self.nodes[0].overlay.publishers) - - # Restore logger - self.nodes[0].overlay.logger = original_logger - - @twisted_wrapper - def test_send_popular_content_subscription(self): - """ - Tests sending popular content subscription response. - """ - original_logger = self.nodes[0].overlay.logger - self.nodes[0].overlay.logger.error = lambda *args, **kw: self.fake_logger_error(self.nodes[0], *args) - - self.nodes[0].overlay.create_message_packet = lambda _type, _payload: \ - self.fake_create_message_packet(self.nodes[0], _type, _payload) - self.nodes[0].overlay.broadcast_message = lambda packet, peer: \ - self.fake_broadcast_message(self.nodes[0], packet, peer) - - # Two default peers - default_peers = [self.create_node() for _ in range(2)] - # Assuming only one is connected - self.nodes[0].overlay.get_peers = lambda: default_peers[:1] - - # Case1: Try to send subscribe response to non-connected peer - self.nodes[0].unknown_peer_found = False - self.nodes[0].logger_error_called = False - self.nodes[0].overlay.send_subscription_status(default_peers[1], subscribed=True) - yield self.deliver_messages() - - # Expected unknown peer error log - self.assertTrue(self.nodes[0].logger_error_called) - self.assertTrue(self.nodes[0].unknown_peer_found) - - # Case2: Try to send response to the connected peer - self.nodes[0].broadcast_called = False - self.nodes[0].broadcast_packet_type = None - self.nodes[0].overlay.send_subscription_status(default_peers[0], subscribed=True) - yield self.deliver_messages() - - # Expect message to be sent - self.assertTrue(self.nodes[0].packet_created, "Create packet failed") - self.assertEqual(self.nodes[0].packet_type, MSG_SUBSCRIPTION, "Unexpected payload type found") - self.assertTrue(self.nodes[0].broadcast_called, "Should send a message to the peer") - self.assertEqual(self.nodes[0].receiver, default_peers[0], "Intended receiver is different") - - # Restore logger - self.nodes[0].overlay.logger = original_logger - @twisted_wrapper def test_send_torrent_health_response(self): """ @@ -409,6 +362,60 @@ def test_send_channel_health_response(self): # Restore logger self.nodes[0].overlay.logger = original_logger + @twisted_wrapper + def test_send_torrent_info_request_response(self): + """ Test if torrent info request response works as expected. """ + self.nodes[1].called_send_torrent_info_response = False + original_send_torrent_info_response = self.nodes[1].overlay.send_torrent_info_response + + def send_torrent_info_response(node, infohash, peer): + node.called_infohash = infohash + node.called_peer = peer + node.called_send_torrent_info_response = True + + self.nodes[1].overlay.send_torrent_info_response = lambda infohash, peer: \ + send_torrent_info_response(self.nodes[1], infohash, peer) + + yield self.introduce_nodes() + self.nodes[0].overlay.subscribe_peers() + yield self.deliver_messages() + + infohash = 'a'*20 + self.nodes[0].overlay.send_torrent_info_request(infohash, self.nodes[1].my_peer) + yield self.deliver_messages() + + self.assertTrue(self.nodes[1].called_send_torrent_info_response) + self.nodes[1].overlay.send_torrent_info_response = original_send_torrent_info_response + + @twisted_wrapper + def test_send_content_info_request_response(self): + """ Test if content info request response works as expected """ + + self.nodes[0].overlay.content_repository = MockRepository() + self.nodes[1].overlay.content_repository = MockRepository() + self.nodes[1].overlay.publish_latest_torrents = lambda *args, **kwargs: None + + self.nodes[1].called_send_content_info_response = False + + def send_content_info_response(node, peer, content_type): + node.called_send_content_info_response = True + node.called_peer = peer + node.called_content_type = content_type + + self.nodes[1].overlay.send_content_info_response = lambda peer, identifier, content_type, _: \ + send_content_info_response(self.nodes[1], peer, content_type) + + yield self.introduce_nodes() + self.nodes[0].overlay.subscribe_peers() + yield self.deliver_messages() + + content_type = SEARCH_TORRENT_REQUEST + request_list = ['ubuntu'] + self.nodes[0].overlay.send_content_info_request(content_type, request_list, peer=self.nodes[1].my_peer) + yield self.deliver_messages() + + self.assertTrue(self.nodes[1].called_send_content_info_response) + @twisted_wrapper def test_on_torrent_health_response_from_unknown_peer(self): """ @@ -417,21 +424,16 @@ def test_on_torrent_health_response_from_unknown_peer(self): original_logger = self.nodes[0].overlay.logger self.nodes[0].overlay.logger.error = lambda *args, **kw: self.fake_logger_error(self.nodes[0], *args) - def fake_unpack_auth(): - mock_auth = MockObject() - mock_payload = MockObject() - return mock_auth, None, mock_payload - - def fake_get_peer_from_auth(peer): - return peer + infohash = 'a' * 20 + num_seeders = 10 + num_leechers = 5 + timestamp = 123123123 - self.nodes[0].overlay._ez_unpack_auth = lambda payload_class, data: fake_unpack_auth() - self.nodes[0].overlay.get_peer_from_auth = lambda auth, address: fake_get_peer_from_auth(self.nodes[1]) + payload = TorrentHealthPayload(infohash, num_seeders, num_leechers, timestamp) + source_address = ('1.1.1.1', 1024) + data = self.nodes[0].overlay.create_message_packet(MSG_TORRENT_HEALTH_RESPONSE, payload) - source_address = MockObject() - data = MockObject() - - self.nodes[0].unknown_response = True + self.nodes[0].unknown_response = False self.nodes[0].overlay.on_torrent_health_response(source_address, data) yield self.deliver_messages() @@ -441,87 +443,148 @@ def fake_get_peer_from_auth(peer): self.nodes[0].overlay.logger = original_logger @twisted_wrapper - def test_on_popular_content_subscribe_unknown_peer(self): + def test_on_torrent_health_response(self): """ Tests receiving torrent health response from unknown peer """ - original_logger = self.nodes[0].overlay.logger - self.nodes[0].overlay.logger.error = lambda *args, **kw: self.fake_logger_error(self.nodes[0], *args) - - def fake_unpack_auth(): - mock_auth = MockObject() - mock_payload = MockObject() - mock_payload.subscribe = True - return mock_auth, None, mock_payload + def fake_update_torrent(peer): + peer.called_update_torrent = True - def fake_get_peer_from_auth(peer): - return peer + self.nodes[0].overlay.content_repository = MockRepository() + self.nodes[0].overlay.content_repository.update_torrent_health = lambda payload, peer_trust: \ + fake_update_torrent(self.nodes[0]) - def fake_publish_latest_torrents(my_peer, _peer): - my_peer.publish_latest_torrents_called = True + infohash = 'a' * 20 + num_seeders = 10 + num_leechers = 5 + timestamp = 123123123 - self.nodes[0].overlay.publish_latest_torrents = lambda peer: fake_publish_latest_torrents(self.nodes[1], peer) - self.nodes[0].overlay._ez_unpack_auth = lambda payload_class, data: fake_unpack_auth() - self.nodes[0].overlay.get_peer_from_auth = lambda auth, address: fake_get_peer_from_auth(self.nodes[1]) + payload = TorrentHealthPayload(infohash, num_seeders, num_leechers, timestamp) + data = self.nodes[1].overlay.create_message_packet(MSG_TORRENT_HEALTH_RESPONSE, payload) - source_address = MockObject() - data = MockObject() + yield self.introduce_nodes() - self.nodes[0].unknown_peer_found = False - self.nodes[0].overlay.on_subscribe(source_address, data) + # Add node 1 in publisher list of node 0 + self.nodes[0].overlay.publishers.add(self.nodes[1].my_peer) + self.nodes[0].overlay.on_torrent_health_response(self.nodes[1].my_peer.address, data) yield self.deliver_messages() - self.assertTrue(self.nodes[0].unknown_peer_found) + self.assertTrue(self.nodes[0].called_update_torrent) - # Restore logger - self.nodes[0].overlay.logger = original_logger + @twisted_wrapper + def test_on_torrent_info_response(self): + """ + Tests receiving torrent health response. + """ + def fake_update_torrent_info(peer): + peer.called_update_torrent = True + + self.nodes[0].overlay.content_repository = MockRepository() + self.nodes[0].overlay.content_repository.update_torrent_info = lambda payload: \ + fake_update_torrent_info(self.nodes[0]) - @twisted_wrapper(5) - def test_on_popular_content_subscribe_ok(self): + infohash = 'a' * 20 + name = "ubuntu" + length = 100 + creation_date = 123123123 + num_files = 33 + comment = '' + + payload = TorrentInfoResponsePayload(infohash, name, length, creation_date, num_files, comment) + data = self.nodes[1].overlay.create_message_packet(MSG_TORRENT_INFO_RESPONSE, payload) + + yield self.introduce_nodes() + + # Add node 1 in publisher list of node 0 + self.nodes[0].overlay.publishers.add(self.nodes[1].my_peer) + self.nodes[0].overlay.on_torrent_info_response(self.nodes[1].my_peer.address, data) + yield self.deliver_messages() + + self.assertTrue(self.nodes[0].called_update_torrent) + + @twisted_wrapper + def test_on_torrent_info_response_from_unknown_peer(self): """ - Tests receiving torrent health response from unknown peer + Tests receiving torrent health response from unknown peer. """ - original_logger = self.nodes[0].overlay.logger - self.nodes[0].overlay.logger.error = lambda *args, **kw: self.fake_logger_error(self.nodes[0], *args) - def fake_unpack_auth(): - mock_auth = MockObject() - mock_payload = MockObject() - mock_payload.subscribe = True - return mock_auth, None, mock_payload + def fake_update_torrent_info(peer): + peer.called_update_torrent = True - def fake_get_peer_from_auth(peer): - return peer + self.nodes[0].overlay.content_repository = MockRepository() + self.nodes[0].overlay.content_repository.update_torrent_info = lambda payload: \ + fake_update_torrent_info(self.nodes[0]) - def fake_publish_latest_torrents(my_peer, _peer): - my_peer.publish_latest_torrents_called = True + infohash = 'a' * 20 + name = "ubuntu" + length = 100 + creation_date = 123123123 + num_files = 33 + comment = '' - def fake_send_popular_content_subscription(my_peer): - my_peer.send_content_subscription_called = True + payload = TorrentInfoResponsePayload(infohash, name, length, creation_date, num_files, comment) + data = self.nodes[1].overlay.create_message_packet(MSG_TORRENT_INFO_RESPONSE, payload) - self.nodes[0].overlay.publish_latest_torrents = lambda peer: fake_publish_latest_torrents(self.nodes[0], peer) - self.nodes[0].overlay._ez_unpack_auth = lambda payload_class, data: fake_unpack_auth() - self.nodes[0].overlay.get_peer_from_auth = lambda auth, address: fake_get_peer_from_auth(self.nodes[1]) - self.nodes[0].overlay.send_subscription_status = lambda peer, subscribed: \ - fake_send_popular_content_subscription(self.nodes[1]) + yield self.introduce_nodes() - source_address = MockObject() - data = MockObject() - self.nodes[0].unknown_peer_found = False - self.nodes[0].overlay.on_subscribe(source_address, data) + self.nodes[0].called_update_torrent = False + self.nodes[0].overlay.on_torrent_info_response(self.nodes[1].my_peer.address, data) yield self.deliver_messages() - self.assertFalse(self.nodes[0].unknown_peer_found) - self.assertTrue(self.nodes[0].publish_latest_torrents_called) + self.assertFalse(self.nodes[0].called_update_torrent) - # Restore logger - self.nodes[0].overlay.logger = original_logger + @twisted_wrapper + def test_on_subscription_status1(self): + """ + Tests receiving subscription status. + """ + subscribe = True + identifier = 123123123 + payload = ContentSubscription(identifier, subscribe) + data = self.nodes[1].overlay.create_message_packet(MSG_SUBSCRIPTION, payload) + # Set the cache request + self.nodes[0].overlay.request_cache.pop = lambda prefix, identifer: MockObject() - @twisted_wrapper(5) - def test_search_request_response(self): + yield self.introduce_nodes() + self.assertEqual(len(self.nodes[0].overlay.publishers), 0) + self.nodes[0].overlay.on_subscription_status(self.nodes[1].my_peer.address, data) + yield self.deliver_messages() + + self.assertEqual(len(self.nodes[0].overlay.publishers), 1) + + @twisted_wrapper + def test_on_subscription_status_with_unsubscribe(self): + """ + Tests receiving subscription status with unsubscribe status. + """ + yield self.introduce_nodes() + self.nodes[0].overlay.publishers.add(self.nodes[1].my_peer) + self.assertEqual(len(self.nodes[0].overlay.publishers), 1) + # Set the cache request + self.nodes[0].overlay.request_cache.pop = lambda prefix, identifer: MockObject() + + subscribe = False + identifier = 123123123 + payload = ContentSubscription(identifier, subscribe) + data = self.nodes[1].overlay.create_message_packet(MSG_SUBSCRIPTION, payload) + + self.nodes[0].overlay.on_subscription_status(self.nodes[1].my_peer.address, data) + yield self.deliver_messages() + + self.assertEqual(len(self.nodes[0].overlay.publishers), 0) + + @twisted_wrapper + def test_search_request_response(self): self.nodes[0].overlay.content_repository = MockRepository() self.nodes[1].overlay.content_repository = MockRepository() + self.nodes[1].overlay.publish_latest_torrents = lambda *args, **kwargs: None + + def fake_process_torrent_search_response(peer): + peer.called_process_torrent_search_response = True + + self.nodes[0].overlay.process_torrent_search_response = lambda query, payload: \ + fake_process_torrent_search_response(self.nodes[0]) yield self.introduce_nodes() self.nodes[0].overlay.subscribe_peers() @@ -533,10 +596,53 @@ def test_search_request_response(self): yield self.deliver_messages() - @twisted_wrapper(5) + self.assertTrue(self.nodes[0].called_process_torrent_search_response) + + @twisted_wrapper + def test_search_request_response(self): + self.nodes[0].overlay.content_repository = MockRepository() + self.nodes[1].overlay.content_repository = MockRepository() + self.nodes[1].overlay.publish_latest_torrents = lambda *args, **kwargs: None + + def fake_notify(peer, result_dict): + peer.called_search_result_notify = True + self.assertEqual(result_dict['keywords'], 'ubuntu') + self.assertGreater(len(result_dict['results']), 1) + + self.nodes[0].overlay.tribler_session = MockObject() + self.nodes[0].overlay.tribler_session.notifier = MockObject() + self.nodes[0].overlay.tribler_session.notifier.notify = lambda signal1, signal2, _, result_dict: \ + fake_notify(self.nodes[0], result_dict) + + + yield self.introduce_nodes() + self.nodes[0].overlay.subscribe_peers() + yield self.deliver_messages() + + # Create a search request + query = "ubuntu" + self.nodes[0].called_search_result_notify = False + + self.nodes[0].overlay.send_torrent_search_request(query) + yield self.deliver_messages() + + self.assertTrue(self.nodes[0].called_search_result_notify) + + @twisted_wrapper def test_send_content_info_request(self): self.nodes[0].overlay.content_repository = MockRepository() self.nodes[1].overlay.content_repository = MockRepository() + self.nodes[1].overlay.publish_latest_torrents = lambda *args, **kwargs: None + + self.nodes[0].received_response = False + self.nodes[0].received_query = None + + def process_torrent_search_response(node, query): + node.received_response = True + node.received_query = query + + self.nodes[0].overlay.process_torrent_search_response = lambda query, data: \ + process_torrent_search_response(self.nodes[0], query) yield self.introduce_nodes() self.nodes[0].overlay.subscribe_peers() @@ -547,6 +653,32 @@ def test_send_content_info_request(self): self.nodes[0].overlay.send_content_info_request(content_type, request_list, limit=5, peer=None) yield self.deliver_messages() + self.assertTrue(self.nodes[0].received_response) + self.assertEqual(self.nodes[0].received_query, request_list) + + @twisted_wrapper + def test_send_torrent_info_response(self): + self.nodes[1].overlay.publish_latest_torrents = lambda *args, **kwargs: None + self.nodes[0].overlay.content_repository = MockRepository() + self.nodes[1].overlay.content_repository = MockRepository() + + self.nodes[0].called_on_torrent_info_response = False + + def on_torrent_info_response(node): + node.called_on_torrent_info_response = True + + self.nodes[0].overlay.decode_map[chr(MSG_TORRENT_INFO_RESPONSE)] = lambda _source_address, _data: \ + on_torrent_info_response(self.nodes[0]) + + yield self.introduce_nodes() + self.nodes[0].overlay.subscribe_peers() + yield self.deliver_messages() + + infohash = 'a'*20 + self.nodes[1].overlay.send_torrent_info_response(infohash, self.nodes[0].my_peer) + yield self.deliver_messages() + self.assertTrue(self.nodes[0].called_on_torrent_info_response) + def fake_logger_error(self, my_peer, *args): if ERROR_UNKNOWN_PEER in args[0]: my_peer.unknown_peer_found = True diff --git a/Tribler/Test/Community/popular/test_payload.py b/Tribler/Test/Community/popularity/test_payload.py similarity index 95% rename from Tribler/Test/Community/popular/test_payload.py rename to Tribler/Test/Community/popularity/test_payload.py index a7eb316d824..027ef00985c 100644 --- a/Tribler/Test/Community/popular/test_payload.py +++ b/Tribler/Test/Community/popularity/test_payload.py @@ -2,7 +2,7 @@ import string from unittest import TestCase -from Tribler.community.popular.payload import SearchResponsePayload, SearchResponseItemPayload, ContentInfoRequest, \ +from Tribler.community.popularity.payload import SearchResponsePayload, SearchResponseItemPayload, ContentInfoRequest, \ Pagination, ContentInfoResponse, ContentSubscription, TorrentHealthPayload, ChannelHealthPayload from Tribler.pyipv8.ipv8.messaging.serialization import Serializer @@ -21,13 +21,15 @@ def random_infohash(self): def test_content_subscription(self): """ Test serialization/deserialization of Content subscription """ subscribe = True - subscription = ContentSubscription(subscribe) + identifier = 123123 + subscription = ContentSubscription(identifier, subscribe) serialized = self.serializer.pack_multiple(subscription.to_pack_list()) # Deserialize and test it (deserialized, _) = self.serializer.unpack_multiple(ContentSubscription.format_list, serialized) deserialized_subscription = ContentSubscription.from_unpack_list(*deserialized) + self.assertEqual(deserialized_subscription.identifier, identifier) self.assertTrue(deserialized_subscription.subscribe) def test_torrent_health_payload(self): @@ -169,9 +171,9 @@ def test_content_info_response(self): serialized_response = self.serializer.pack_multiple(in_response.to_pack_list()) # Deserialize request and test it - (deserialized_ressponse, _) = self.serializer.unpack_multiple(ContentInfoResponse.format_list, - serialized_response) - out_request = ContentInfoResponse.from_unpack_list(*deserialized_ressponse) + (deserialized_response, _) = self.serializer.unpack_multiple(ContentInfoResponse.format_list, + serialized_response) + out_request = ContentInfoResponse.from_unpack_list(*deserialized_response) self.assertEqual(in_response.identifier, out_request.identifier) self.assertEqual(in_response.response, out_request.response) self.assertEqual(in_response.content_type, out_request.content_type) diff --git a/Tribler/Test/Community/popularity/test_repository.py b/Tribler/Test/Community/popularity/test_repository.py new file mode 100644 index 00000000000..b8f54f7fbf1 --- /dev/null +++ b/Tribler/Test/Community/popularity/test_repository.py @@ -0,0 +1,313 @@ +import random +import string +import time +import unittest + +from Tribler.Test.Core.base_test import MockObject +from Tribler.community.popularity.payload import TorrentHealthPayload +from Tribler.community.popularity.repository import ContentRepository, DEFAULT_FRESHNESS_LIMIT + + +class TestContentRepository(unittest.TestCase): + + def setUp(self): + torrent_db = MockObject() + channel_db = MockObject() + self.content_repository = ContentRepository(torrent_db, channel_db) + + def test_add_content(self): + """ + Test adding and removing content works as expected. + """ + # Initial content queue is zero + self.assertEqual(self.content_repository.count_content(), 0, "No item expected in queue initially") + + # Add a sample content and check the size + sample_content = ('a' * 20, 6, 3, 123456789) + sample_content_type = 1 + self.content_repository.add_content(sample_content_type, sample_content) + self.assertEqual(self.content_repository.count_content(), 1, "One item expected in queue") + + # Pop an item + (content_type, content) = self.content_repository.pop_content() + self.assertEqual(content_type, sample_content_type, "Content type should be equal") + self.assertEqual(content, sample_content, "Content should be equal") + + # Check size again + self.assertEqual(self.content_repository.count_content(), 0, "No item expected in queue") + + def test_get_top_torrents(self): + """ + Test if content repository returns expected top torrents. + """ + + def get_fake_torrents(limit): + return [[chr(x) * 20, x, 0, 1525704192] for x in range(limit)] + + self.content_repository.torrent_db.getRecentlyCheckedTorrents = get_fake_torrents + + limit = 10 + self.assertEqual(self.content_repository.get_top_torrents(limit=limit), get_fake_torrents(limit)) + + def test_update_torrent_health(self): + """ + Tests update torrent health. + """ + def update_torrent(repo, _): + repo.update_torrent_called = True + + # Assume a fake torrent response + fake_torrent_health_payload = TorrentHealthPayload('a' * 20, 10, 4, time.time()) + + self.content_repository.torrent_db = MockObject() + self.content_repository.torrent_db.updateTorrent = lambda infohash, *args, **kw: \ + update_torrent(self.content_repository, infohash) + + # If torrent does not exist in the database, then it should be added to the database + self.content_repository.has_torrent = lambda infohash: False + self.content_repository.update_torrent_health(fake_torrent_health_payload, peer_trust=0) + + self.assertTrue(self.content_repository.update_torrent_called) + + def test_update_torrent_with_higher_trust(self): + """ + Scenario: The database torrent has still fresh last_check_time and you receive a new response from + peer with trust > 1. + Expect: Torrent in database is updated. + """ + # last_check_time for existing torrent in database + db_last_time_check = time.time() - 10 + # Peer trust, higher than 1 in this scenario + peer_trust = 10 + + # Database record is expected to be updated + self.assertTrue(self.try_torrent_update_with_options(db_last_time_check, peer_trust)) + + def test_update_torrent_with_stale_check_time(self): + """ + Scenario: The database torrent has stale last_check_time and you receive a new response from + peer with no previous trust. + Expect: Torrent in database is still updated. + """ + # last_check_time for existing torrent in database + db_last_time_check = time.time() - DEFAULT_FRESHNESS_LIMIT + # Peer trust, higher than 1 in this scenario + peer_trust = 0 + + # Database record is expected to be updated + self.assertTrue(self.try_torrent_update_with_options(db_last_time_check, peer_trust)) + + def try_torrent_update_with_options(self, db_last_check_time, peer_trust): + """ + Tries updating torrent considering the given last check time of existing torrent and a new response + obtained from a peer with given peer_trust value. + """ + sample_infohash, seeders, leechers, timestamp = 'a' * 20, 10, 5, db_last_check_time + sample_payload = TorrentHealthPayload(sample_infohash, seeders, leechers, timestamp) + + def update_torrent(content_repo, _): + content_repo.update_torrent_called = True + + def get_torrent(infohash): + return {'infohash': infohash, 'num_seeders': seeders, + 'num_leechers': leechers, 'last_tracker_check': timestamp} + + self.content_repository.torrent_db.getTorrent = lambda infohash, **kw: get_torrent(infohash) + self.content_repository.torrent_db.hasTorrent = lambda infohash: infohash == sample_infohash + self.content_repository.torrent_db.updateTorrent = \ + lambda infohash, *args, **kw: update_torrent(self.content_repository, infohash) + + self.content_repository.update_torrent_called = False + self.content_repository.update_torrent_health(sample_payload, peer_trust=peer_trust) + + return self.content_repository.update_torrent_called + + def test_update_torrent_info(self): + """ Test updating torrent info """ + self.content_repository.called_update_torrent = False + + def fake_update_torrent(ref): + ref.called_update_torrent = True + + self.content_repository.torrent_db.updateTorrent = lambda infohash, **kw: \ + fake_update_torrent(self.content_repository) + self.content_repository.has_torrent = lambda infohash: False + torrent_info_response = MockObject() + torrent_info_response.infohash = 'a' * 20 + + torrent_info_response.name = 'ubuntu' + torrent_info_response.length = 123 + torrent_info_response.creation_date = 123123123 + torrent_info_response.num_files = 2 + torrent_info_response.comment = 'Ubuntu ISO' + + self.content_repository.update_torrent_info(torrent_info_response) + self.assertTrue(self.content_repository.called_update_torrent) + + def test_update_conflicting_torrent_info(self): + """ Test updating torrent info response with existing record in the database.""" + torrent_info_response = MockObject() + torrent_info_response.infohash = 'a' * 20 + torrent_info_response.name = 'ubuntu' + torrent_info_response.length = 123 + torrent_info_response.creation_date = 123123123 + torrent_info_response.num_files = 2 + torrent_info_response.comment = 'Ubuntu ISO' + + self.content_repository.called_update_torrent = False + + def fake_update_torrent(ref): + ref.called_update_torrent = True + + def fake_get_torrent(infohash, name): + torrent = {'infohash': infohash, 'name': name} + return torrent + + self.content_repository.torrent_db.updateTorrent = lambda infohash, **kw: fake_update_torrent( + self.content_repository) + self.content_repository.has_torrent = lambda infohash: True + self.content_repository.get_torrent = lambda infohash: fake_get_torrent(infohash, torrent_info_response.name) + + self.content_repository.update_torrent_info(torrent_info_response) + self.assertFalse(self.content_repository.called_update_torrent) + + def test_search_torrent(self): + """ Test torrent search """ + def random_string(size=6, chars=string.ascii_uppercase + string.digits): + return ''.join(random.choice(chars) for _ in range(size)) + + def random_infohash(): + return ''.join(random.choice('0123456789abcdef') for _ in range(20)) + + sample_torrents = [] + for _ in range(10): + infohash = random_infohash() + name = random_string() + length = random.randint(1000, 9999) + num_files = random.randint(1, 10) + category_list = ['video', 'audio'] + creation_date = random.randint(1000000, 111111111) + seeders = random.randint(10, 200) + leechers = random.randint(5, 1000) + cid = random_string(size=20) + + sample_torrents.append([infohash, name, length, num_files, category_list, creation_date, seeders, + leechers, cid]) + + def fake_torrentdb_search_names(_): + return sample_torrents + + self.content_repository.torrent_db.searchNames = lambda query, **kw: fake_torrentdb_search_names(query) + + search_query = "Ubuntu" + search_results = self.content_repository.search_torrent(search_query) + + for index in range(10): + db_torrent = sample_torrents[index] + search_result = search_results[index] + + self.assertEqual(db_torrent[0], search_result.infohash) + self.assertEqual(db_torrent[1], search_result.name) + self.assertEqual(db_torrent[2], search_result.length) + self.assertEqual(db_torrent[3], search_result.num_files) + self.assertEqual(db_torrent[6], search_result.seeders) + self.assertEqual(db_torrent[7], search_result.leechers) + + def test_search_channel(self): + """ Test channel search """ + def random_string(size=6, chars=string.ascii_uppercase + string.digits): + return ''.join(random.choice(chars) for _ in range(size)) + + sample_channels = [] + for index in range(10): + dbid = index + cid = random_string(size=20) + name = random_string() + description = random_string(20) + nr_torrents = random.randint(1, 10) + nr_favorite = random.randint(1, 10) + nr_spam = random.randint(1, 10) + my_vote = 1 + modified = random.randint(1, 10000000) + relevance_score = 0.0 + + sample_channels.append([dbid, cid, name, description, nr_torrents, nr_favorite, nr_spam, my_vote, + modified, relevance_score]) + + def fake_torrentdb_search_channels(_): + return sample_channels + + self.content_repository.channel_db.search_in_local_channels_db = lambda query, **kw: \ + fake_torrentdb_search_channels(query) + + search_query = "Ubuntu" + search_results = self.content_repository.search_channels(search_query) + + for index in range(10): + db_channel = sample_channels[index] + search_result = search_results[index] + + self.assertEqual(db_channel[0], search_result.id) + self.assertEqual(db_channel[1], search_result.cid) + self.assertEqual(db_channel[2], search_result.name) + self.assertEqual(db_channel[3], search_result.description) + self.assertEqual(db_channel[4], search_result.nr_torrents) + self.assertEqual(db_channel[5], search_result.nr_favorite) + self.assertEqual(db_channel[6], search_result.nr_spam) + self.assertEqual(db_channel[8], search_result.modified) + + def test_update_torrent_from_search_results(self): + """ Tests updating database from the search results """ + def random_string(size=6, chars=string.ascii_uppercase + string.digits): + return ''.join(random.choice(chars) for _ in range(size)) + + def random_infohash(): + return ''.join(random.choice('0123456789abcdef') for _ in range(20)) + + search_results = dict() + for _ in range(10): + infohash = random_infohash() + name = random_string() + length = random.randint(1000, 9999) + num_files = random.randint(1, 10) + category_list = ['video', 'audio'] + creation_date = random.randint(1000000, 111111111) + seeders = random.randint(10, 200) + leechers = random.randint(5, 1000) + cid = random_string(size=20) + + search_results[infohash] = [infohash, name, length, num_files, category_list, creation_date, + seeders, leechers, cid] + + def get_torrent(torrent_as_list): + return {'infohash': torrent_as_list[0], + 'name': torrent_as_list[1], + 'length': torrent_as_list[2], + 'num_files': torrent_as_list[3], + 'category_list': torrent_as_list[4], + 'creation_date': torrent_as_list[5], + 'seeders': torrent_as_list[6], + 'leechers': torrent_as_list[7], + 'cid': torrent_as_list[8]} + + def fake_update_torrent(ref): + ref.called_update_torrent = True + + self.content_repository.torrent_db.updateTorrent = lambda infohash, **kw: fake_update_torrent( + self.content_repository) + + # Case 1: Assume torrent does not exist in the database + self.content_repository.has_torrent = lambda infohash: False + self.content_repository.get_torrent = lambda infohash: None + + self.content_repository.called_update_torrent = False + self.content_repository.update_from_torrent_search_results(search_results.values()) + self.assertTrue(self.content_repository.called_update_torrent) + + # Case 2: Torrent already exist in the database + self.content_repository.has_torrent = lambda infohash: infohash in search_results + self.content_repository.get_torrent = lambda infohash: get_torrent(search_results[infohash]) + + self.content_repository.called_update_torrent = False + self.content_repository.update_from_torrent_search_results(search_results.values()) + self.assertFalse(self.content_repository.called_update_torrent) diff --git a/Tribler/Test/Core/Config/test_tribler_config.py b/Tribler/Test/Core/Config/test_tribler_config.py index c4848757098..6ad1d5aafed 100644 --- a/Tribler/Test/Core/Config/test_tribler_config.py +++ b/Tribler/Test/Core/Config/test_tribler_config.py @@ -301,12 +301,12 @@ def test_get_set_methods_preview_channel_community(self): self.tribler_config.set_preview_channel_community_enabled(True) self.assertEqual(self.tribler_config.get_preview_channel_community_enabled(), True) - def test_get_set_methods_popular_community(self): + def test_get_set_methods_popularity_community(self): """ - Check whether popular community get and set methods are working as expected. + Check whether popularity community get and set methods are working as expected. """ - self.tribler_config.set_popular_community_enabled(True) - self.assertEqual(self.tribler_config.get_popular_community_enabled(), True) + self.tribler_config.set_popularity_community_enabled(True) + self.assertEqual(self.tribler_config.get_popularity_community_enabled(), True) def test_get_set_methods_watch_folder(self): """ diff --git a/Tribler/Test/Core/TorrentChecker/test_torrentchecker.py b/Tribler/Test/Core/TorrentChecker/test_torrentchecker.py index 449665979eb..1e326b10886 100644 --- a/Tribler/Test/Core/TorrentChecker/test_torrentchecker.py +++ b/Tribler/Test/Core/TorrentChecker/test_torrentchecker.py @@ -12,7 +12,7 @@ from Tribler.Core.simpledefs import NTFY_TORRENTS from Tribler.Test.Core.base_test import TriblerCoreTest, MockObject from Tribler.Test.twisted_thread import deferred -from Tribler.community.popular.repository import TYPE_TORRENT_HEALTH +from Tribler.community.popularity.repository import TYPE_TORRENT_HEALTH from Tribler.pyipv8.ipv8.util import blocking_call_on_reactor_thread @@ -33,7 +33,7 @@ def setUp(self, annotate=True): self.session.lm.torrent_db = TorrentDBHandler(self.session) self.session.lm.torrent_checker = TorrentChecker(self.session) self.session.lm.tracker_manager = TrackerManager(self.session) - self.session.lm.popular_community = MockObject() + self.session.lm.popularity_community = MockObject() self.torrent_checker = self.session.lm.torrent_checker self.torrent_checker._torrent_db = self.session.open_dbhandler(NTFY_TORRENTS) @@ -190,23 +190,23 @@ def remove_tracker(tracker_url): def test_publish_torrent_result(self): MSG_ZERO_SEED_TORRENT = "Not publishing zero seeded torrents" - MSG_NO_POPULAR_COMMUNITY = "Popular community not available to publish torrent checker result" + MSG_NO_popularity_community = "Popular community not available to publish torrent checker result" def _fake_logger_info(torrent_checker, msg): if msg == MSG_ZERO_SEED_TORRENT: torrent_checker.zero_seed_torrent = True - if msg == MSG_NO_POPULAR_COMMUNITY: - torrent_checker.popular_community_not_found = True + if msg == MSG_NO_popularity_community: + torrent_checker.popularity_community_not_found = True original_logger_info = self.torrent_checker._logger.info self.torrent_checker._logger.info = lambda msg: _fake_logger_info(self.torrent_checker, msg) - def popular_community_queue_content(torrent_checker, _type, _): - torrent_checker.popular_community_queue_content_called = True - torrent_checker.popular_community_queue_content_called_type = _type + def popularity_community_queue_content(torrent_checker, _type, _): + torrent_checker.popularity_community_queue_content_called = True + torrent_checker.popularity_community_queue_content_called_type = _type - self.torrent_checker.tribler_session.lm.popular_community.queue_content = lambda _type, _content: \ - popular_community_queue_content(self.torrent_checker, _type, _content) + self.torrent_checker.tribler_session.lm.popularity_community.queue_content = lambda _type, _content: \ + popularity_community_queue_content(self.torrent_checker, _type, _content) # Case1: Fake torrent checker response, seeders:0 fake_response = {'infohash': 'a'*20, 'seeders': 0, 'leechers': 0, 'last_check': time.time()} @@ -215,17 +215,17 @@ def popular_community_queue_content(torrent_checker, _type, _): # Case2: Positive seeders fake_response['seeders'] = 5 - self.torrent_checker.popular_community_queue_content_called = False - self.torrent_checker.popular_community_queue_content_called_type = None + self.torrent_checker.popularity_community_queue_content_called = False + self.torrent_checker.popularity_community_queue_content_called_type = None self.torrent_checker.publish_torrent_result(fake_response) - self.assertTrue(self.torrent_checker.popular_community_queue_content_called) - self.assertEqual(self.torrent_checker.popular_community_queue_content_called_type, TYPE_TORRENT_HEALTH) + self.assertTrue(self.torrent_checker.popularity_community_queue_content_called) + self.assertEqual(self.torrent_checker.popularity_community_queue_content_called_type, TYPE_TORRENT_HEALTH) # Case3: Popular community is None - self.torrent_checker.tribler_session.lm.popular_community = None + self.torrent_checker.tribler_session.lm.popularity_community = None self.torrent_checker.publish_torrent_result(fake_response) - self.assertTrue(self.torrent_checker.popular_community_not_found) + self.assertTrue(self.torrent_checker.popularity_community_not_found) self.torrent_checker._logger.info = original_logger_info diff --git a/Tribler/Test/test_as_server.py b/Tribler/Test/test_as_server.py index 76d741873c3..964fc51026d 100644 --- a/Tribler/Test/test_as_server.py +++ b/Tribler/Test/test_as_server.py @@ -298,7 +298,7 @@ def setUpPreSession(self): self.config.set_tunnel_community_enabled(False) self.config.set_credit_mining_enabled(False) self.config.set_market_community_enabled(False) - self.config.set_popular_community_enabled(False) + self.config.set_popularity_community_enabled(False) @blocking_call_on_reactor_thread @inlineCallbacks diff --git a/Tribler/community/popular/__init__.py b/Tribler/community/popularity/__init__.py similarity index 100% rename from Tribler/community/popular/__init__.py rename to Tribler/community/popularity/__init__.py diff --git a/Tribler/community/popular/community.py b/Tribler/community/popularity/community.py similarity index 53% rename from Tribler/community/popular/community.py rename to Tribler/community/popularity/community.py index 497583af31a..e65fdc6bcbc 100644 --- a/Tribler/community/popular/community.py +++ b/Tribler/community/popularity/community.py @@ -1,238 +1,28 @@ -import logging -from abc import abstractmethod -from copy import copy from twisted.internet.defer import inlineCallbacks -from twisted.internet.task import LoopingCall from Tribler.Core.simpledefs import SIGNAL_SEARCH_COMMUNITY, SIGNAL_ON_SEARCH_RESULTS -from Tribler.community.popular.constants import MSG_SUBSCRIBE, MSG_SUBSCRIPTION, \ - MSG_TORRENT_HEALTH_RESPONSE, MSG_CHANNEL_HEALTH_RESPONSE, MSG_TORRENT_INFO_REQUEST, MSG_TORRENT_INFO_RESPONSE, \ - MAX_PUBLISHERS, PUBLISH_INTERVAL, MAX_SUBSCRIBERS, \ - ERROR_UNKNOWN_RESPONSE, MAX_PACKET_PAYLOAD_SIZE, ERROR_UNKNOWN_PEER, ERROR_NO_CONTENT, MASTER_PUBLIC_KEY, \ +from Tribler.community.popularity.constants import MSG_TORRENT_HEALTH_RESPONSE, MSG_CHANNEL_HEALTH_RESPONSE, \ + MSG_TORRENT_INFO_REQUEST, MSG_TORRENT_INFO_RESPONSE, \ + ERROR_UNKNOWN_RESPONSE, MAX_PACKET_PAYLOAD_SIZE, ERROR_UNKNOWN_PEER, ERROR_NO_CONTENT, \ MSG_CONTENT_INFO_REQUEST, \ SEARCH_TORRENT_REQUEST, MSG_CONTENT_INFO_RESPONSE, SEARCH_TORRENT_RESPONSE -from Tribler.community.popular.payload import TorrentHealthPayload, ContentSubscription, TorrentInfoRequestPayload, \ +from Tribler.community.popularity.payload import TorrentHealthPayload, ContentSubscription, TorrentInfoRequestPayload, \ TorrentInfoResponsePayload, SearchResponseItemPayload, \ ContentInfoRequest, Pagination, ContentInfoResponse, decode_values -from Tribler.community.popular.repository import ContentRepository, TYPE_TORRENT_HEALTH -from Tribler.community.popular.request import SearchRequest -from Tribler.pyipv8.ipv8.deprecated.community import Community -from Tribler.pyipv8.ipv8.deprecated.payload_headers import BinMemberAuthenticationPayload, GlobalTimeDistributionPayload +from Tribler.community.popularity.pubsub import PubSubCommunity +from Tribler.community.popularity.repository import ContentRepository, TYPE_TORRENT_HEALTH +from Tribler.community.popularity.request import ContentRequest from Tribler.pyipv8.ipv8.peer import Peer -from Tribler.pyipv8.ipv8.requestcache import RequestCache -class PubSubCommunity(Community): - - def __init__(self, *args, **kwargs): - super(PubSubCommunity, self).__init__(*args, **kwargs) - self.logger = logging.getLogger(self.__class__.__name__) - self.request_cache = RequestCache() - - # Register messages - self.decode_map.update({ - chr(MSG_SUBSCRIBE): self.on_subscribe, - chr(MSG_SUBSCRIPTION): self.on_subscription_status - }) - - # A set of publisher and subscriber. - # Sends data updates to subscribers, and receives updates from subscribers. - self.subscribers = set() - self.publishers = set() - - def start(self): - """ - Starts the community by subscribing to peers, and periodically publishing the content updates to - the subscribers. - """ - # Subscribe peers - self.subscribe_peers() - - def start_publishing(): - # Update the publisher and subscriber list - self.refresh_peer_list() - - # publish the new cotent from the content repository - self.publish_next_content() - - self.register_task("start_publishing", LoopingCall(start_publishing)).start(PUBLISH_INTERVAL, False) - - @inlineCallbacks - def unload(self): - self.request_cache.clear() - yield super(PubSubCommunity, self).unload() - - def subscribe_peers(self): - """ - Subscribes to the connected peers. First, the peers are sorted based on the trust score on descending order and - content subscribe request is sent to the top peers. - This method is called periodically so it can fill up for the disconnected peers by connecting to new peers. - Note that, existing publisher peers are not disconnected even if we find new peers with higher trust score but - only fill up the remaining publisher slots with new top peers. - """ - num_publishers = len(self.publishers) - num_peers = len(self.get_peers()) - # If we have some free publisher slots and there are peers available - if num_publishers < MAX_PUBLISHERS and num_publishers < num_peers: - available_publishers = [peer for peer in self.get_peers() if peer not in self.publishers] - sorted_peers = sorted(available_publishers, - key=lambda _peer: self.trustchain.get_trust(_peer) if self.trustchain else 1, - reverse=True) - for peer in sorted_peers[: MAX_PUBLISHERS - num_publishers]: - self.subscribe(peer, subscribe=True) - - def refresh_peer_list(self): - """ - Updates the publishers and subscribers list by filtering out the disconnected peers. It also calls subscribe - peers to replenish the available publisher slots if necessary. - """ - peers = self.get_peers() - self.logger.info("Num peers: %d", len(peers)) - self.publishers = set([peer for peer in self.publishers if peer in peers]) - self.subscribers = set([peer for peer in self.subscribers if peer in peers]) - - # Log the number of subscribers and publishers - self.logger.info("Publishers: %d, Subscribers: %d", len(self.publishers), len(self.subscribers)) - - # subscribe peers if necessary - self.subscribe_peers() - - def unsubscribe_peers(self): - """ - Unsubscribes from the existing publishers by sending content subscribe request with subscribe=False. It then - clears up its publishers list. - - Called at community unload. - """ - for peer in copy(self.publishers): - self.subscribe(peer, subscribe=False) - self.publishers.clear() - - def subscribe(self, peer, subscribe=True): - """ - Method to send content subscribe/unsubscribe message. This message is sent to each individual publisher peer we - want to subscribe/unsubscribe. - """ - # Add or remove the publisher peer - if subscribe: - self.publishers.add(peer) - else: - self.publishers.remove(peer) - - # Create subscription packet and send it - subscription = ContentSubscription(subscribe) - packet = self.create_message_packet(MSG_SUBSCRIBE, subscription) - self.broadcast_message(packet, peer=peer) - - def on_subscribe(self, source_address, data): - """ - Message handler for content subscribe message. It handles both subscribe and unsubscribe requests. - Upon successful subscription or unsubscription, it send the confirmation subscription message with status. - In case of subscription, it also publishes a list of recently checked torrents to the subscriber. - """ - auth, _, payload = self._ez_unpack_auth(ContentSubscription, data) - peer = self.get_peer_from_auth(auth, source_address) - - # Subscribe or unsubscribe peer - subscribed = peer in self.subscribers - - if payload.subscribe and not subscribed: - if len(self.subscribers) < MAX_SUBSCRIBERS: - self.subscribers.add(peer) - subscribed = True - - elif not payload.subscribe and subscribed: - self.subscribers.remove(peer) - subscribed = False - - # Send subscription response - self.send_subscription_status(peer, subscribed=subscribed) - - return subscribed - - def send_subscription_status(self, peer, subscribed=True): - """ - Method to send content subscription message. Content subscription message is send in response to content - subscribe or unsubscribe message. - """ - if peer not in self.get_peers(): - self.logger.error(ERROR_UNKNOWN_PEER) - return - - subscription = ContentSubscription(subscribed) - packet = self.create_message_packet(MSG_SUBSCRIPTION, subscription) - self.broadcast_message(packet, peer=peer) - - def on_subscription_status(self, source_address, data): - """ - Message handler for content subscription message. Content subscription message is sent by the publisher stating - the status of the subscription in response to subscribe or unsubscribe request. - - If the subscription message has subscribe=True, it means the subscription was successful, so the peer is added - to the subscriber. In other case, publisher is removed if it is still present in the publishers list. - """ - auth, _, payload = self._ez_unpack_auth(ContentSubscription, data) - peer = self.get_peer_from_auth(auth, source_address) - - if payload.subscribe: - self.publishers.add(peer) - elif peer in self.publishers: - self.publishers.remove(peer) - - def create_message_packet(self, message_type, payload): - """ - Helper method to creates a message packet of given type with provided payload. - """ - auth = BinMemberAuthenticationPayload(self.my_peer.public_key.key_to_bin()).to_pack_list() - dist = GlobalTimeDistributionPayload(self.claim_global_time()).to_pack_list() - payload = payload if isinstance(payload, list) else payload.to_pack_list() - return self._ez_pack(self._prefix, message_type, [auth, dist, payload]) - - def broadcast_message(self, packet, peer=None): - """ - Helper method to broadcast the message packet to a single peer or all the subscribers. - """ - if peer is not None: - self.endpoint.send(peer.address, packet) - return - - for _peer in self.subscribers: - self.endpoint.send(_peer.address, packet) - - def get_peer_from_auth(self, auth, source_address): - """ - Get Peer object from the message and auth and source_address. - It is used for mocking the peer in test. - """ - return Peer(auth.public_key_bin, source_address) - - def pack_sized(self, payload_list, fit_size, start_index=0): - assert isinstance(payload_list, list) - serialized_results = '' - size = 0 - current_index = start_index - num_payloads = len(payload_list) - while current_index < num_payloads: - item = payload_list[current_index] - packed_item = self.serializer.pack_multiple(item.to_pack_list()) - packed_item_length = len(packed_item) - if size + packed_item_length > fit_size: - break - else: - size += packed_item_length - serialized_results += packed_item - current_index += 1 - return serialized_results, current_index, current_index - start_index - - @abstractmethod - def publish_next_content(self): - """ Method responsible for publishing content during periodic push """ - pass - - -class PopularCommunity(PubSubCommunity): +class PopularityCommunity(PubSubCommunity): """ Community for disseminating the content across the network. Follows publish-subscribe model. """ + MASTER_PUBLIC_KEY = "3081a7301006072a8648ce3d020106052b810400270381920004073beff7002b6a9fc2824a3b1bbb1c4fc32546" \ + "261e3ef7537874560346c5fdc0c17fe654f67d23b08cbb44141879f79a7a4c8deddf9beb4fbc7a0f02ee1586cc" \ + "ebedb623eeef51710108d702f9250361c071482e83c0a4a86c8f45a0b13a19ef83eacb6267b4bfccf220ae5f6d" \ + "1db7125ea1d10da3744b65679828f23376e28b76ab33132b7fa984a77f159dba7351a7" master_peer = Peer(MASTER_PUBLIC_KEY.decode('hex')) @@ -242,12 +32,10 @@ def __init__(self, *args, **kwargs): self.trustchain = kwargs.pop('trustchain_community', None) self.tribler_session = kwargs.pop('session', None) - super(PopularCommunity, self).__init__(*args, **kwargs) + super(PopularityCommunity, self).__init__(*args, **kwargs) - # Handles database stuffs self.content_repository = ContentRepository(self.torrent_db, self.channel_db) - # Register messages self.decode_map.update({ chr(MSG_TORRENT_HEALTH_RESPONSE): self.on_torrent_health_response, chr(MSG_CHANNEL_HEALTH_RESPONSE): self.on_channel_health_response, @@ -263,13 +51,13 @@ def __init__(self, *args, **kwargs): def unload(self): self.content_repository.cleanup() self.content_repository = None - yield super(PopularCommunity, self).unload() + yield super(PopularityCommunity, self).unload() def on_subscribe(self, source_address, data): auth, _, _ = self._ez_unpack_auth(ContentSubscription, data) peer = self.get_peer_from_auth(auth, source_address) - subscribed = super(PopularCommunity, self).on_subscribe(source_address, data) + subscribed = super(PopularityCommunity, self).on_subscribe(source_address, data) # Publish the latest torrents to the subscriber if subscribed: self.publish_latest_torrents(peer=peer) @@ -309,7 +97,7 @@ def on_torrent_info_request(self, source_address, data): auth, _, payload = self._ez_unpack_auth(TorrentInfoRequestPayload, data) peer = self.get_peer_from_auth(auth, source_address) - if peer not in self.publishers: + if peer not in self.subscribers: self.logger.error(ERROR_UNKNOWN_RESPONSE) return @@ -320,7 +108,7 @@ def on_torrent_info_response(self, source_address, data): Message handler for torrent info response. """ self.logger.info("Got torrent info response from %s", source_address) - auth, _, payload = self._ez_unpack_auth(TorrentHealthPayload, data) + auth, _, payload = self._ez_unpack_auth(TorrentInfoResponsePayload, data) peer = self.get_peer_from_auth(auth, source_address) if peer not in self.publishers: @@ -354,7 +142,7 @@ def on_content_info_response(self, source_address, data): def process_torrent_search_response(self, query, payload): item_format = SearchResponseItemPayload.format_list - (response, _) = self.serializer.unpack_multiple_as_list(item_format, payload.response) + response, _ = self.serializer.unpack_multiple_as_list(item_format, payload.response) # Decode the category string to list for response_item in response: response_item[4] = decode_values(response_item[4]) @@ -424,7 +212,14 @@ def send_torrent_info_response(self, infohash, peer): self.broadcast_message(packet, peer=peer) def send_content_info_request(self, content_type, request_list, limit=25, peer=None): - cache = self.request_cache.add(SearchRequest(self.request_cache, content_type, request_list)) + """ + Sends the generic content request of givent content_type. + :param content_type: request content type + :param request_list: List request queries + :param limit: Number of expected responses + :param peer: Peer to send this request to + """ + cache = self.request_cache.add(ContentRequest(self.request_cache, content_type, request_list)) self.logger.info("Sending search request query:%s, identifier:%s", request_list, cache.number) content_request = ContentInfoRequest(cache.number, content_type, request_list, limit) @@ -437,6 +232,13 @@ def send_content_info_request(self, content_type, request_list, limit=25, peer=N self.broadcast_message(packet, peer=connected_peer) def send_content_info_response(self, peer, identifier, content_type, response_list): + """ + Sends the generic content info response with payload response list. + :param peer: Receiving peer + :param identifier: Request identifier + :param content_type: Message content type + :param response_list: Content response + """ num_results = len(response_list) current_index = 0 page_num = 1 @@ -453,10 +255,16 @@ def send_content_info_response(self, peer, identifier, content_type, response_li self.broadcast_message(packet, peer=peer) def send_torrent_search_request(self, query): + """ + Sends torrent search query as a content info request with content_type as SEARCH_TORRENT_REQUEST. + """ self.send_content_info_request(SEARCH_TORRENT_REQUEST, query) def send_channel_search_request(self, query): - raise NotImplementedError("Not implemented yet. Waiting for all channel 2.0") + """ + Sends channel search query to All Channel 2.0 to get a list of channels. + """ + # TODO: Not implemented yet. Waiting for All Channel 2.0 # CONTENT REPOSITORY STUFFS @@ -466,7 +274,7 @@ def publish_next_content(self): Does nothing if there are none subscribers. Only Torrent health response is published at the moment. """ - self.logger.info("Content to publish: %d", self.content_repository.num_content()) + self.logger.info("Content to publish: %d", self.content_repository.count_content()) if not self.subscribers: self.logger.info("No subscribers found. Not publishing anything") return @@ -495,6 +303,6 @@ def publish_latest_torrents(self, peer): def queue_content(self, content_type, content): """ - Basically addS a given content to the queue of content repository. + Basically adds a given content to the queue of content repository. """ self.content_repository.add_content(content_type, content) diff --git a/Tribler/community/popular/constants.py b/Tribler/community/popularity/constants.py similarity index 66% rename from Tribler/community/popular/constants.py rename to Tribler/community/popularity/constants.py index f6163857129..70542090734 100644 --- a/Tribler/community/popular/constants.py +++ b/Tribler/community/popularity/constants.py @@ -1,9 +1,3 @@ -# Master peer -MASTER_PUBLIC_KEY = "3081a7301006072a8648ce3d020106052b810400270381920004073beff7002b6a9fc2824a3b1bbb1c4fc32546261" \ - "e3ef7537874560346c5fdc0c17fe654f67d23b08cbb44141879f79a7a4c8deddf9beb4fbc7a0f02ee1586ccebedb6" \ - "23eeef51710108d702f9250361c071482e83c0a4a86c8f45a0b13a19ef83eacb6267b4bfccf220ae5f6d1db7125ea1" \ - "d10da3744b65679828f23376e28b76ab33132b7fa984a77f159dba7351a7" - # Message types for different requests & response MSG_SUBSCRIBE = 1 MSG_SUBSCRIPTION = 2 diff --git a/Tribler/community/popular/payload.py b/Tribler/community/popularity/payload.py similarity index 90% rename from Tribler/community/popular/payload.py rename to Tribler/community/popularity/payload.py index 0f585534a8e..26212303c22 100644 --- a/Tribler/community/popular/payload.py +++ b/Tribler/community/popularity/payload.py @@ -21,19 +21,22 @@ def decode_values(values_str): class ContentSubscription(Payload): - format_list = ['?'] + format_list = ['I', '?'] - def __init__(self, subscribe): + def __init__(self, identifier, subscribe): super(ContentSubscription, self).__init__() + self.identifier = identifier self.subscribe = subscribe def to_pack_list(self): - data = [('?', self.subscribe)] + data = [('I', self.identifier), + ('?', self.subscribe)] return data @classmethod - def from_unpack_list(cls, subscribe): - return ContentSubscription(subscribe) + def from_unpack_list(cls, *args): + (identifier, subscribe) = args + return ContentSubscription(identifier, subscribe) class TorrentHealthPayload(Payload): @@ -79,7 +82,7 @@ def timestamp(self): class ChannelHealthPayload(Payload): """ - Payload for a channel popularity message in the popular community. + Payload for a channel popularity message in the popularity community. """ format_list = ['varlenI', 'I', 'I', 'I', 'I'] @@ -123,7 +126,7 @@ def to_pack_list(self): @classmethod def from_unpack_list(cls, *args): - infohash = args + (infohash, ) = args return TorrentInfoRequestPayload(infohash) @@ -161,30 +164,6 @@ def infohash(self): return self._infohash -class SearchRequestPayload(Payload): - """ - Payload for search request - """ - format_list = ['I', 'I', 'varlenH'] - - def __init__(self, identifier, search_type, query): - super(SearchRequestPayload, self).__init__() - self.identifier = identifier - self.search_type = search_type - self.query = query - - def to_pack_list(self): - data = [('I', self.identifier), - ('I', self.search_type), - ('varlenH', str(self.query))] - return data - - @classmethod - def from_unpack_list(cls, *args): - (timestamp, search_type, query) = args - return SearchRequestPayload(timestamp, search_type, query) - - class SearchResponseItemPayload(Payload): """ Payload for search response items @@ -253,8 +232,7 @@ def to_pack_list(self): return data @classmethod - def from_unpack_list(cls, *args): - (dbid, dispersy_cid, name, description, nr_torrents, nr_favorite, nr_spam, modified) = args[:8] + def from_unpack_list(cls, dbid, dispersy_cid, name, description, nr_torrents, nr_favorite, nr_spam, modified): return ChannelItemPayload(dbid, dispersy_cid, name, description, nr_torrents, nr_favorite, nr_spam, modified) diff --git a/Tribler/community/popularity/pubsub.py b/Tribler/community/popularity/pubsub.py new file mode 100644 index 00000000000..d334605a833 --- /dev/null +++ b/Tribler/community/popularity/pubsub.py @@ -0,0 +1,239 @@ +import logging +from abc import abstractmethod +from copy import copy +from twisted.internet.defer import inlineCallbacks +from twisted.internet.task import LoopingCall + +from Tribler.community.popularity.constants import MSG_SUBSCRIPTION, ERROR_UNKNOWN_PEER, MAX_SUBSCRIBERS, \ + MSG_SUBSCRIBE, MAX_PUBLISHERS, PUBLISH_INTERVAL +from Tribler.community.popularity.payload import ContentSubscription +from Tribler.community.popularity.request import ContentRequest +from Tribler.pyipv8.ipv8.deprecated.community import Community +from Tribler.pyipv8.ipv8.deprecated.payload_headers import BinMemberAuthenticationPayload, GlobalTimeDistributionPayload +from Tribler.pyipv8.ipv8.peer import Peer +from Tribler.pyipv8.ipv8.requestcache import RequestCache + + +class PubSubCommunity(Community): + """ + This community is designed as a base community for all othe future communities that desires publish subscribe model + for content dissemination. It provides a few basic primitives like subscribe/unsubscribe to publisher peers and + publish/broadcast content to subscriber peers. + + All the derived community should implement publish_next_content() method which is responsible for publishing the + next available content to all the subscribers. + """ + + def __init__(self, *args, **kwargs): + super(PubSubCommunity, self).__init__(*args, **kwargs) + self.logger = logging.getLogger(self.__class__.__name__) + self.request_cache = RequestCache() + + # Register messages + self.decode_map.update({ + chr(MSG_SUBSCRIBE): self.on_subscribe, + chr(MSG_SUBSCRIPTION): self.on_subscription_status + }) + + # A set of publisher and subscriber. + # Sends data updates to subscribers, and receives updates from subscribers. + self.subscribers = set() + self.publishers = set() + + def start(self): + """ + Starts the community by subscribing to peers, and periodically publishing the content updates to + the subscribers. + """ + # Subscribe peers + self.subscribe_peers() + + def start_publishing(): + # Update the publisher and subscriber list + self.refresh_peer_list() + + # publish the new cotent from the content repository + self.publish_next_content() + + self.register_task("start_publishing", LoopingCall(start_publishing)).start(PUBLISH_INTERVAL, False) + + @inlineCallbacks + def unload(self): + self.request_cache.clear() + self.cancel_pending_task("start_publishing") + yield super(PubSubCommunity, self).unload() + + def subscribe_peers(self): + """ + Subscribes to the connected peers. First, the peers are sorted based on the trust score on descending order and + content subscribe request is sent to the top peers. + This method is called periodically through refresh_peer_list() in start_publishing() loop so it can fill up for + the disconnected peers by connecting to new peers. + Note that, existing publisher peers are not disconnected even if we find new peers with higher trust score but + only fill up the remaining publisher slots with new top peers. + """ + num_publishers = len(self.publishers) + num_peers = len(self.get_peers()) + # If we have some free publisher slots and there are peers available + if num_publishers < MAX_PUBLISHERS and num_publishers < num_peers: + available_publishers = [peer for peer in self.get_peers() if peer not in self.publishers] + sorted_peers = sorted(available_publishers, + key=lambda _peer: self.trustchain.get_trust(_peer) if self.trustchain else 1, + reverse=True) + for peer in sorted_peers[: MAX_PUBLISHERS - num_publishers]: + self.subscribe(peer, subscribe=True) + + def refresh_peer_list(self): + """ + Updates the publishers and subscribers list by filtering out the disconnected peers. It also calls subscribe + peers to replenish the available publisher slots if necessary. + """ + peers = self.get_peers() + self.publishers = set([peer for peer in self.publishers if peer in peers]) + self.subscribers = set([peer for peer in self.subscribers if peer in peers]) + + # subscribe peers if necessary + self.subscribe_peers() + + def unsubscribe_peers(self): + """ + Unsubscribes from the existing publishers by sending content subscribe request with subscribe=False. It then + clears up its publishers list. + - Called at community unload. + """ + for peer in copy(self.publishers): + self.subscribe(peer, subscribe=False) + self.publishers.clear() + + def subscribe(self, peer, subscribe=True): + """ + Method to send content subscribe/unsubscribe message. This message is sent to each individual publisher peer we + want to subscribe/unsubscribe. + """ + cache = self.request_cache.add(ContentRequest(self.request_cache, MSG_SUBSCRIBE, None)) + # Add or remove the publisher peer + if subscribe: + self.publishers.add(peer) + else: + self.publishers.remove(peer) + + # Create subscription packet and send it + subscription = ContentSubscription(cache.number, subscribe) + packet = self.create_message_packet(MSG_SUBSCRIBE, subscription) + self.broadcast_message(packet, peer=peer) + + def on_subscribe(self, source_address, data): + """ + Message handler for content subscribe message. It handles both subscribe and unsubscribe requests. + Upon successful subscription or unsubscription, it send the confirmation subscription message with status. + In case of subscription, it also publishes a list of recently checked torrents to the subscriber. + """ + auth, _, payload = self._ez_unpack_auth(ContentSubscription, data) + peer = self.get_peer_from_auth(auth, source_address) + + # Subscribe or unsubscribe peer + subscribed = peer in self.subscribers + + if payload.subscribe and not subscribed: + if len(self.subscribers) < MAX_SUBSCRIBERS: + self.subscribers.add(peer) + subscribed = True + + elif not payload.subscribe and subscribed: + self.subscribers.remove(peer) + subscribed = False + + # Send subscription response + self.send_subscription_status(peer, payload.identifier, subscribed=subscribed) + + return subscribed + + def send_subscription_status(self, peer, identifier, subscribed=True): + """ + Method to send content subscription message. Content subscription message is send in response to content + subscribe or unsubscribe message. + """ + if peer not in self.get_peers(): + self.logger.error(ERROR_UNKNOWN_PEER) + return + + subscription = ContentSubscription(identifier, subscribed) + packet = self.create_message_packet(MSG_SUBSCRIPTION, subscription) + self.broadcast_message(packet, peer=peer) + + def on_subscription_status(self, source_address, data): + """ + Message handler for content subscription message. Content subscription message is sent by the publisher stating + the status of the subscription in response to subscribe or unsubscribe request. + + If the subscription message has subscribe=True, it means the subscription was successful, so the peer is added + to the subscriber. In other case, publisher is removed if it is still present in the publishers list. + """ + auth, _, payload = self._ez_unpack_auth(ContentSubscription, data) + peer = self.get_peer_from_auth(auth, source_address) + + cache = self.request_cache.pop(u'request', payload.identifier) + if not cache: + return + + if payload.subscribe: + self.publishers.add(peer) + elif peer in self.publishers: + self.publishers.remove(peer) + + def create_message_packet(self, message_type, payload): + """ + Helper method to creates a message packet of given type with provided payload. + """ + auth = BinMemberAuthenticationPayload(self.my_peer.public_key.key_to_bin()).to_pack_list() + dist = GlobalTimeDistributionPayload(self.claim_global_time()).to_pack_list() + payload = payload if isinstance(payload, list) else payload.to_pack_list() + return self._ez_pack(self._prefix, message_type, [auth, dist, payload]) + + def broadcast_message(self, packet, peer=None): + """ + Helper method to broadcast the message packet to a single peer or all the subscribers. + """ + if peer is not None: + self.endpoint.send(peer.address, packet) + return + + for _peer in self.subscribers: + self.endpoint.send(_peer.address, packet) + + def get_peer_from_auth(self, auth, source_address): + """ + Get Peer object from the message and auth and source_address. + It is used for mocking the peer in test. + """ + return Peer(auth.public_key_bin, source_address) + + def pack_sized(self, payload_list, fit_size, start_index=0): + """ + Packs a list of Payload objects to fit into given size limit. + :param payload_list: List list of payload objects + :param fit_size: The maximum allowed size for payload field to fit into UDP packet. + :param start_index: Index of list to start packing + :return: packed string + """ + assert isinstance(payload_list, list) + serialized_results = '' + size = 0 + current_index = start_index + num_payloads = len(payload_list) + while current_index < num_payloads: + item = payload_list[current_index] + packed_item = self.serializer.pack_multiple(item.to_pack_list()) + packed_item_length = len(packed_item) + if size + packed_item_length > fit_size: + break + else: + size += packed_item_length + serialized_results += packed_item + current_index += 1 + return serialized_results, current_index, current_index - start_index + + @abstractmethod + def publish_next_content(self): + """ Method responsible for publishing content during periodic push """ + pass diff --git a/Tribler/community/popular/repository.py b/Tribler/community/popularity/repository.py similarity index 71% rename from Tribler/community/popular/repository.py rename to Tribler/community/popularity/repository.py index 5d3e0925b87..97711598f34 100644 --- a/Tribler/community/popular/repository.py +++ b/Tribler/community/popularity/repository.py @@ -2,7 +2,7 @@ import time from collections import deque -from Tribler.community.popular.payload import TorrentHealthPayload, SearchResponseItemPayload, ChannelItemPayload +from Tribler.community.popularity.payload import SearchResponseItemPayload, ChannelItemPayload MAX_CACHE = 200 @@ -14,6 +14,12 @@ class ContentRepository(object): + """ + This class handles all the stuffs related to the content for PopularityCommunity. Currently, it handles all the + interactions with torrent and channel database. + + It also maintains a content queue which stores the content for publishing in the next publishing cycle. + """ def __init__(self, torrent_db, channel_db): super(ContentRepository, self).__init__() @@ -30,7 +36,7 @@ def add_content(self, content_type, content): if self.queue is not None: self.queue.append((content_type, content)) - def num_content(self): + def count_content(self): return len(self.queue) if self.queue else 0 def pop_content(self): @@ -40,7 +46,6 @@ def get_top_torrents(self, limit=DEFAULT_TORRENT_LIMIT): return self.torrent_db.getRecentlyCheckedTorrents(limit) def update_torrent_health(self, torrent_health_payload, peer_trust=0): - assert torrent_health_payload and isinstance(torrent_health_payload, TorrentHealthPayload) def update_torrent(db_handler, health_payload): db_handler.updateTorrent(infohash, notify=False, num_seeders=health_payload.num_seeders, @@ -49,7 +54,7 @@ def update_torrent(db_handler, health_payload): status=u"good" if health_payload.num_seeders > 1 else u"unknown") if not self.torrent_db: - self.logger.error("Torrent DB is None") + self.logger.error("Torrent DB is not available. Skipping torrent health update.") return infohash = torrent_health_payload.infohash @@ -88,43 +93,62 @@ def has_torrent(self, infohash): return self.get_torrent(infohash) is not None def search_torrent(self, query): - results = [] + """ + Searches for best torrents for the given query and packs them into a list of SearchResponseItemPayload. + :param query: Search query + :return: List + """ + db_results = self.torrent_db.searchNames(query, local=True, keys=['infohash', 'T.name', 'T.length', 'T.num_files', 'T.category', 'T.creation_date', 'T.num_seeders', 'T.num_leechers']) - if db_results: - for dbresult in db_results: - channel_details = dbresult[-10:] + if not db_results: + return [] - dbresult = list(dbresult[:8]) - dbresult[2] = long(dbresult[2]) # length - dbresult[3] = int(dbresult[3]) # num_files - dbresult[4] = [dbresult[4]] # category - dbresult[5] = long(dbresult[5]) # creation_date - dbresult[6] = int(dbresult[6] or 0) # num_seeders - dbresult[7] = int(dbresult[7] or 0) # num_leechers + results = [] + for dbresult in db_results: + channel_details = dbresult[-10:] - # cid - if channel_details[1]: - channel_details[1] = str(channel_details[1]) - dbresult.append(channel_details[1]) + dbresult = list(dbresult[:8]) + dbresult[2] = long(dbresult[2]) # length + dbresult[3] = int(dbresult[3]) # num_files + dbresult[4] = [dbresult[4]] # category + dbresult[5] = long(dbresult[5]) # creation_date + dbresult[6] = int(dbresult[6] or 0) # num_seeders + dbresult[7] = int(dbresult[7] or 0) # num_leechers - results.append(SearchResponseItemPayload(*tuple(dbresult))) + # cid + if channel_details[1]: + channel_details[1] = str(channel_details[1]) + dbresult.append(channel_details[1]) + + results.append(SearchResponseItemPayload(*tuple(dbresult))) return results def search_channels(self, query): - results = [] + """ + Search best channels for the given query. + :param query: Search query + :return: List + """ db_channels = self.channel_db.search_in_local_channels_db(query) + if not db_channels: + return [] + results = [] if db_channels: for channel in db_channels: - channel_payload = channel[:7] + channel_payload = channel[:8] channel_payload[7] = channel[8] # modified results.append(ChannelItemPayload(*channel_payload)) return results def update_from_torrent_search_results(self, search_results): + """ + Updates the torrent database with the provided search results. It also checks for conflicting torrents, meaning + if torrent already exists in the database, we simply ignore the search result. + """ for result in search_results: (infohash, name, length, num_files, category_list, creation_date, seeders, leechers, cid) = result torrent_item = SearchResponseItemPayload(infohash, name, length, num_files, category_list, @@ -143,4 +167,8 @@ def update_from_torrent_search_results(self, search_results): comment='') def update_from_channel_search_results(self, all_items): + """ + TODO: updates the channel database with the search results. + Waiting for all channel 2.0 + """ pass diff --git a/Tribler/community/popular/request.py b/Tribler/community/popularity/request.py similarity index 70% rename from Tribler/community/popular/request.py rename to Tribler/community/popularity/request.py index 3fefb85f996..09ec22f4426 100644 --- a/Tribler/community/popular/request.py +++ b/Tribler/community/popularity/request.py @@ -1,16 +1,15 @@ from twisted.internet.defer import Deferred -from twisted.python.failure import Failure from Tribler.pyipv8.ipv8.requestcache import RandomNumberCache -class SearchRequest(RandomNumberCache): +class ContentRequest(RandomNumberCache): """ - This request cache keeps track of all outstanding search requests + This request cache keeps track of all outstanding search requests. """ def __init__(self, request_cache, search_type, query): - super(SearchRequest, self).__init__(request_cache, u"request") + super(ContentRequest, self).__init__(request_cache, u"request") self.query = query self.search_type = search_type self.response = None @@ -27,4 +26,4 @@ def finish(self): self.deferred.callback(self.response) def on_timeout(self): - self.deferred.errback(Failure(RuntimeError("Search timeout for query: %s" % self.query))) + self.deferred.errback(self.response)