diff --git a/Tribler/Core/APIImplementation/LaunchManyCore.py b/Tribler/Core/APIImplementation/LaunchManyCore.py index ec5ab765634..6f7aa62dfb6 100644 --- a/Tribler/Core/APIImplementation/LaunchManyCore.py +++ b/Tribler/Core/APIImplementation/LaunchManyCore.py @@ -103,7 +103,7 @@ def __init__(self): self.tunnel_community = None self.trustchain_community = None self.wallets = {} - self.popular_community = None + self.popularity_community = None self.startup_deferred = Deferred() @@ -290,18 +290,18 @@ def load_ipv8_overlays(self): # Popular Community if self.session.config.get_popular_community_enabled(): - from Tribler.community.popular.community import PopularCommunity + from Tribler.community.popular.community import PopularityCommunity local_peer = Peer(self.session.trustchain_keypair) - self.popular_community = PopularCommunity(local_peer, self.ipv8.endpoint, self.ipv8.network, - torrent_db=self.session.lm.torrent_db, session=self.session) + self.popularity_community = PopularityCommunity(local_peer, self.ipv8.endpoint, self.ipv8.network, + torrent_db=self.session.lm.torrent_db, session=self.session) - self.ipv8.overlays.append(self.popular_community) + self.ipv8.overlays.append(self.popularity_community) - self.ipv8.strategies.append((RandomWalk(self.popular_community), 20)) + self.ipv8.strategies.append((RandomWalk(self.popularity_community), 20)) - self.popular_community.start() + self.popularity_community.start() @blocking_call_on_reactor_thread def load_dispersy_communities(self): diff --git a/Tribler/Test/Community/popular/test_community.py b/Tribler/Test/Community/popular/test_community.py index a4734820238..6e57085ab20 100644 --- a/Tribler/Test/Community/popular/test_community.py +++ b/Tribler/Test/Community/popular/test_community.py @@ -3,11 +3,12 @@ from Tribler.Test.Core.base_test import MockObject from Tribler.community.popular import constants -from Tribler.community.popular.community import PopularCommunity, MSG_TORRENT_HEALTH_RESPONSE, \ - MSG_CHANNEL_HEALTH_RESPONSE, ERROR_UNKNOWN_PEER, MSG_SUBSCRIPTION, ERROR_NO_CONTENT, \ +from Tribler.community.popular.community import PopularityCommunity, MSG_TORRENT_HEALTH_RESPONSE, \ + MSG_CHANNEL_HEALTH_RESPONSE, ERROR_UNKNOWN_PEER, ERROR_NO_CONTENT, \ ERROR_UNKNOWN_RESPONSE -from Tribler.community.popular.constants import SEARCH_TORRENT_REQUEST, MSG_TORRENT_INFO_RESPONSE -from Tribler.community.popular.payload import SearchResponseItemPayload +from Tribler.community.popular.constants import SEARCH_TORRENT_REQUEST, MSG_TORRENT_INFO_RESPONSE, MSG_SUBSCRIPTION +from Tribler.community.popular.payload import SearchResponseItemPayload, TorrentInfoResponsePayload, \ + TorrentHealthPayload, ContentSubscription from Tribler.community.popular.repository import TYPE_TORRENT_HEALTH from Tribler.pyipv8.ipv8.test.base import TestBase from Tribler.pyipv8.ipv8.test.mocking.ipv8 import MockIPv8 @@ -19,7 +20,7 @@ class TestPopularCommunityBase(TestBase): def setUp(self): super(TestPopularCommunityBase, self).setUp() - self.initialize(PopularCommunity, self.NUM_NODES) + self.initialize(PopularityCommunity, self.NUM_NODES) def create_node(self, *args, **kwargs): def load_random_torrents(limit): @@ -35,7 +36,7 @@ def load_random_torrents(limit): channel_db = MockObject() - return MockIPv8(u"curve25519", PopularCommunity, torrent_db=torrent_db, channel_db=channel_db) + return MockIPv8(u"curve25519", PopularityCommunity, torrent_db=torrent_db, channel_db=channel_db) class MockRepository(object): @@ -466,21 +467,16 @@ def test_on_torrent_health_response_from_unknown_peer(self): original_logger = self.nodes[0].overlay.logger self.nodes[0].overlay.logger.error = lambda *args, **kw: self.fake_logger_error(self.nodes[0], *args) - def fake_unpack_auth(): - mock_auth = MockObject() - mock_payload = MockObject() - return mock_auth, None, mock_payload + infohash = 'a' * 20 + num_seeders = 10 + num_leechers = 5 + timestamp = 123123123 - def fake_get_peer_from_auth(peer): - return peer + payload = TorrentHealthPayload(infohash, num_seeders, num_leechers, timestamp) + source_address = ('1.1.1.1', 1024) + data = self.nodes[0].overlay.create_message_packet(MSG_TORRENT_HEALTH_RESPONSE, payload) - self.nodes[0].overlay._ez_unpack_auth = lambda payload_class, data: fake_unpack_auth() - self.nodes[0].overlay.get_peer_from_auth = lambda auth, address: self.nodes[1] - - source_address = MockObject() - data = MockObject() - - self.nodes[0].unknown_response = True + self.nodes[0].unknown_response = False self.nodes[0].overlay.on_torrent_health_response(source_address, data) yield self.deliver_messages() @@ -489,113 +485,131 @@ def fake_get_peer_from_auth(peer): # Restore logger self.nodes[0].overlay.logger = original_logger - @twisted_wrapper - def test_on_torrent_info_response_from_unknown_publisher(self): + @twisted_wrapper(500) + def test_on_torrent_health_response(self): """ - Tests receiving torrent info response from unknown publisher + Tests receiving torrent health response from unknown peer """ - original_logger = self.nodes[0].overlay.logger - self.nodes[0].overlay.logger.error = lambda *args, **kw: self.fake_logger_error(self.nodes[0], *args) + def fake_update_torrent(peer): + peer.called_update_torrent = True - def fake_unpack_auth(): - mock_auth = MockObject() - mock_payload = MockObject() - return mock_auth, None, mock_payload + self.nodes[0].overlay.content_repository = MockRepository() + self.nodes[0].overlay.content_repository.update_torrent_health = lambda payload, peer_trust: \ + fake_update_torrent(self.nodes[0]) - self.nodes[0].overlay._ez_unpack_auth = lambda payload_class, data: fake_unpack_auth() - self.nodes[0].overlay.get_peer_from_auth = lambda auth, address: self.nodes[1] + infohash = 'a' * 20 + num_seeders = 10 + num_leechers = 5 + timestamp = 123123123 - source_address = MockObject() - data = MockObject() + payload = TorrentHealthPayload(infohash, num_seeders, num_leechers, timestamp) + data = self.nodes[1].overlay.create_message_packet(MSG_TORRENT_HEALTH_RESPONSE, payload) - self.nodes[0].unknown_response = False - self.nodes[0].overlay.on_torrent_info_response(source_address, data) + yield self.introduce_nodes() + + # Add node 1 in publisher list of node 0 + self.nodes[0].overlay.publishers.add(self.nodes[1].my_peer) + self.nodes[0].overlay.on_torrent_health_response(self.nodes[1].my_peer.address, data) yield self.deliver_messages() - self.assertTrue(self.nodes[0].unknown_response) - - # Restore logger - self.nodes[0].overlay.logger = original_logger + self.assertTrue(self.nodes[0].called_update_torrent) @twisted_wrapper def test_on_torrent_info_response(self): """ - Tests receiving torrent info response from unknown publisher + Tests receiving torrent health response. """ - original_logger = self.nodes[0].overlay.logger - self.nodes[0].overlay.logger.error = lambda *args, **kw: self.fake_logger_error(self.nodes[0], *args) - def fake_update_torrent_info(peer): - peer.called_update_torrent_info = True + peer.called_update_torrent = True - def fake_unpack_auth(): - mock_auth = MockObject() - mock_payload = MockObject() - return mock_auth, None, mock_payload - - self.nodes[0].called_update_torrent_info = False self.nodes[0].overlay.content_repository = MockRepository() self.nodes[0].overlay.content_repository.update_torrent_info = lambda payload: \ fake_update_torrent_info(self.nodes[0]) - self.nodes[0].overlay._ez_unpack_auth = lambda payload_class, data: fake_unpack_auth() - self.nodes[0].overlay.get_peer_from_auth = lambda auth, address: self.nodes[1].my_peer - source_address = MockObject() - data = MockObject() + infohash = 'a' * 20 + name = "ubuntu" + length = 100 + creation_date = 123123123 + num_files = 33 + comment = '' - self.nodes[0].unknown_response = False - # Add the node to publisher - self.nodes[0].overlay.publishers.add(self.nodes[1].my_peer) + payload = TorrentInfoResponsePayload(infohash, name, length, creation_date, num_files, comment) + data = self.nodes[1].overlay.create_message_packet(MSG_TORRENT_INFO_RESPONSE, payload) - self.nodes[0].overlay.on_torrent_info_response(source_address, data) + yield self.introduce_nodes() + + # Add node 1 in publisher list of node 0 + self.nodes[0].overlay.publishers.add(self.nodes[1].my_peer) + self.nodes[0].overlay.on_torrent_info_response(self.nodes[1].my_peer.address, data) yield self.deliver_messages() - self.assertFalse(self.nodes[0].unknown_response) - self.assertTrue(self.nodes[0].called_update_torrent_info) - - # Restore logger - self.nodes[0].overlay.logger = original_logger + self.assertTrue(self.nodes[0].called_update_torrent) @twisted_wrapper - def test_on_subscription_status(self): - original_logger = self.nodes[0].overlay.logger - self.nodes[0].overlay.logger.error = lambda *args, **kw: self.fake_logger_error(self.nodes[0], *args) + def test_on_torrent_info_response_from_unknown_peer(self): + """ + Tests receiving torrent health response from unknown peer. + """ def fake_update_torrent_info(peer): - peer.called_update_torrent_info = True - - def fake_unpack_auth(subscribe): - mock_auth = MockObject() - mock_payload = MockObject() - mock_payload.subscribe = subscribe - return mock_auth, None, mock_payload + peer.called_update_torrent = True - self.nodes[0].called_update_torrent_info = False self.nodes[0].overlay.content_repository = MockRepository() self.nodes[0].overlay.content_repository.update_torrent_info = lambda payload: \ fake_update_torrent_info(self.nodes[0]) - self.nodes[0].overlay.get_peer_from_auth = lambda auth, address: self.nodes[1].my_peer - source_address = MockObject() - data = MockObject() + infohash = 'a' * 20 + name = "ubuntu" + length = 100 + creation_date = 123123123 + num_files = 33 + comment = '' + + payload = TorrentInfoResponsePayload(infohash, name, length, creation_date, num_files, comment) + data = self.nodes[1].overlay.create_message_packet(MSG_TORRENT_INFO_RESPONSE, payload) + + yield self.introduce_nodes() + + self.nodes[0].called_update_torrent = False + self.nodes[0].overlay.on_torrent_info_response(self.nodes[1].my_peer.address, data) + yield self.deliver_messages() + + self.assertFalse(self.nodes[0].called_update_torrent) + + @twisted_wrapper + def test_on_subscription_status1(self): + """ + Tests receiving subscription status. + """ subscribe = True - self.nodes[0].overlay._ez_unpack_auth = lambda payload_class, data: fake_unpack_auth(subscribe) + payload = ContentSubscription(subscribe) + data = self.nodes[1].overlay.create_message_packet(MSG_SUBSCRIPTION, payload) - # If subscribe=True in subscription status, peer is added to the publisher list + yield self.introduce_nodes() self.assertEqual(len(self.nodes[0].overlay.publishers), 0) - self.nodes[0].overlay.on_subscription_status(source_address, data) + + self.nodes[0].overlay.on_subscription_status(self.nodes[1].my_peer.address, data) yield self.deliver_messages() + + self.assertEqual(len(self.nodes[0].overlay.publishers), 1) + + @twisted_wrapper + def test_on_subscription_status_with_unsubscribe(self): + """ + Tests receiving subscription status with unsubscribe status. + """ + yield self.introduce_nodes() + self.nodes[0].overlay.publishers.add(self.nodes[1].my_peer) self.assertEqual(len(self.nodes[0].overlay.publishers), 1) - # If subscribe=False in subscription status, peer is removed from the publisher list subscribe = False - self.nodes[0].overlay._ez_unpack_auth = lambda payload_class, data: fake_unpack_auth(subscribe) - self.nodes[0].overlay.on_subscription_status(source_address, data) + payload = ContentSubscription(subscribe) + data = self.nodes[1].overlay.create_message_packet(MSG_SUBSCRIPTION, payload) + + self.nodes[0].overlay.on_subscription_status(self.nodes[1].my_peer.address, data) yield self.deliver_messages() - self.assertEqual(len(self.nodes[0].overlay.publishers), 0) - # Restore logger - self.nodes[0].overlay.logger = original_logger + self.assertEqual(len(self.nodes[0].overlay.publishers), 0) @twisted_wrapper(5) def test_search_request_response(self): diff --git a/Tribler/Test/Community/popular/test_repository.py b/Tribler/Test/Community/popular/test_repository.py index cd853d89697..5cb5a03ac69 100644 --- a/Tribler/Test/Community/popular/test_repository.py +++ b/Tribler/Test/Community/popular/test_repository.py @@ -20,13 +20,13 @@ def test_add_content(self): Test adding and removing content works as expected. """ # Initial content queue is zero - self.assertEqual(self.content_repository.num_content(), 0, "No item expected in queue initially") + self.assertEqual(self.content_repository.count_content(), 0, "No item expected in queue initially") # Add a sample content and check the size sample_content = ('a' * 20, 6, 3, 123456789) sample_content_type = 1 self.content_repository.add_content(sample_content_type, sample_content) - self.assertEqual(self.content_repository.num_content(), 1, "One item expected in queue") + self.assertEqual(self.content_repository.count_content(), 1, "One item expected in queue") # Pop an item (content_type, content) = self.content_repository.pop_content() @@ -34,7 +34,7 @@ def test_add_content(self): self.assertEqual(content, sample_content, "Content should be equal") # Check size again - self.assertEqual(self.content_repository.num_content(), 0, "No item expected in queue") + self.assertEqual(self.content_repository.count_content(), 0, "No item expected in queue") def test_get_top_torrents(self): """ @@ -49,46 +49,25 @@ def get_fake_torrents(limit): limit = 10 self.assertEqual(self.content_repository.get_top_torrents(limit=limit), get_fake_torrents(limit)) - def test_update_torrent(self): - - MSG_TORRENT_DB_NONE = "Torrent DB is None" - - def fake_logger_error(repo, *args): - if args[0] == MSG_TORRENT_DB_NONE: - repo.torrent_db_none = True - elif 'unknown' in args[0].lower(): - repo.unknown_torrent = True - repo.logger_error_called = True - + def test_update_torrent_health(self): + """ + Tests update torrent health. + """ def update_torrent(repo, _): repo.update_torrent_called = True - original_logger = self.content_repository.logger - self.content_repository.logger.error = lambda *args, **kw: fake_logger_error(self.content_repository, *args) - # Assume a fake torrent response fake_torrent_health_payload = TorrentHealthPayload('a' * 20, 10, 4, time.time()) - # Case1: torrent db is none - self.content_repository.torrent_db = None - self.content_repository.logger_error_called = False - - self.content_repository.update_torrent_health(fake_torrent_health_payload, peer_trust=0) - self.assertTrue(self.content_repository.torrent_db_none) - self.assertTrue(self.content_repository.logger_error_called) - - # Case2: torrent db does not have torrent self.content_repository.torrent_db = MockObject() self.content_repository.torrent_db.updateTorrent = lambda infohash, *args, **kw: \ update_torrent(self.content_repository, infohash) - self.content_repository.logger_error_called = False - self.content_repository.has_torrent = lambda infohash: False + # If torrent does not exist in the database, then it should be added to the database + self.content_repository.has_torrent = lambda infohash: False self.content_repository.update_torrent_health(fake_torrent_health_payload, peer_trust=0) - self.assertTrue(self.content_repository.update_torrent_called) - # restore logger - self.content_repository.logger = original_logger + self.assertTrue(self.content_repository.update_torrent_called) def test_update_torrent_with_higher_trust(self): """ @@ -232,3 +211,105 @@ def fake_torrentdb_search_names(_): self.assertEqual(db_torrent[3], search_result.num_files) self.assertEqual(db_torrent[6], search_result.seeders) self.assertEqual(db_torrent[7], search_result.leechers) + + def test_search_channel(self): + """ Test channel search """ + def random_string(size=6, chars=string.ascii_uppercase + string.digits): + return ''.join(random.choice(chars) for _ in range(size)) + + def random_infohash(): + return ''.join(random.choice('0123456789abcdef') for _ in range(20)) + + sample_channels = [] + for index in range(10): + dbid = index + cid = random_string(size=20) + name = random_string() + description = random_string(20) + nr_torrents = random.randint(1, 10) + nr_favorite = random.randint(1,10) + nr_spam = random.randint(1, 10) + my_vote = 1 + modified = random.randint(1, 10000000) + relevance_score = 0.0 + + sample_channels.append([dbid, cid, name, description, nr_torrents, nr_favorite, nr_spam, my_vote, modified, relevance_score]) + + def fake_torrentdb_search_channels(_): + return sample_channels + + self.content_repository.channel_db.search_in_local_channels_db = lambda query, **kw: \ + fake_torrentdb_search_channels(query) + + search_query = "Ubuntu" + search_results = self.content_repository.search_channels(search_query) + + for index in range(10): + db_channel = sample_channels[index] + search_result = search_results[index] + + self.assertEqual(db_channel[0], search_result.id) + self.assertEqual(db_channel[1], search_result.cid) + self.assertEqual(db_channel[2], search_result.name) + self.assertEqual(db_channel[3], search_result.description) + self.assertEqual(db_channel[4], search_result.nr_torrents) + self.assertEqual(db_channel[5], search_result.nr_favorite) + self.assertEqual(db_channel[6], search_result.nr_spam) + self.assertEqual(db_channel[8], search_result.modified) + + def test_update_torrent_from_search_results(self): + """ Tests updating database from the search results """ + def random_string(size=6, chars=string.ascii_uppercase + string.digits): + return ''.join(random.choice(chars) for _ in range(size)) + + def random_infohash(): + return ''.join(random.choice('0123456789abcdef') for _ in range(20)) + + search_results = dict() + for _ in range(10): + infohash = random_infohash() + name = random_string() + length = random.randint(1000, 9999) + num_files = random.randint(1, 10) + category_list = ['video', 'audio'] + creation_date = random.randint(1000000, 111111111) + seeders = random.randint(10, 200) + leechers = random.randint(5, 1000) + cid = random_string(size=20) + + search_results[infohash] = [infohash, name, length, num_files, category_list, creation_date, + seeders, leechers, cid] + + def get_torrent(torrent_as_list): + return {'infohash': torrent_as_list[0], + 'name': torrent_as_list[1], + 'length': torrent_as_list[2], + 'num_files': torrent_as_list[3], + 'category_list': torrent_as_list[4], + 'creation_date': torrent_as_list[5], + 'seeders': torrent_as_list[6], + 'leechers': torrent_as_list[7], + 'cid': torrent_as_list[8], + } + + def fake_update_torrent(ref): + ref.called_update_torrent = True + + self.content_repository.torrent_db.updateTorrent = lambda infohash, **kw: fake_update_torrent( + self.content_repository) + + # Case 1: Assume torrent does not exist in the database + self.content_repository.has_torrent = lambda infohash: False + self.content_repository.get_torrent = lambda infohash: None + + self.content_repository.called_update_torrent = False + self.content_repository.update_from_torrent_search_results(search_results.values()) + self.assertTrue(self.content_repository.called_update_torrent) + + # Case 2: Torrent already exist in the database + self.content_repository.has_torrent = lambda infohash: infohash in search_results + self.content_repository.get_torrent = lambda infohash: get_torrent(search_results[infohash]) + + self.content_repository.called_update_torrent = False + self.content_repository.update_from_torrent_search_results(search_results.values()) + self.assertFalse(self.content_repository.called_update_torrent) diff --git a/Tribler/community/popular/community.py b/Tribler/community/popular/community.py index 3dcae6a82f4..31368744acd 100644 --- a/Tribler/community/popular/community.py +++ b/Tribler/community/popular/community.py @@ -1,239 +1,28 @@ -import logging -from abc import abstractmethod -from copy import copy from twisted.internet.defer import inlineCallbacks -from twisted.internet.task import LoopingCall from Tribler.Core.simpledefs import SIGNAL_SEARCH_COMMUNITY, SIGNAL_ON_SEARCH_RESULTS -from Tribler.community.popular.constants import MSG_SUBSCRIBE, MSG_SUBSCRIPTION, \ - MSG_TORRENT_HEALTH_RESPONSE, MSG_CHANNEL_HEALTH_RESPONSE, MSG_TORRENT_INFO_REQUEST, MSG_TORRENT_INFO_RESPONSE, \ - MAX_PUBLISHERS, PUBLISH_INTERVAL, MAX_SUBSCRIBERS, \ - ERROR_UNKNOWN_RESPONSE, MAX_PACKET_PAYLOAD_SIZE, ERROR_UNKNOWN_PEER, ERROR_NO_CONTENT, MASTER_PUBLIC_KEY, \ +from Tribler.community.popular.constants import MSG_TORRENT_HEALTH_RESPONSE, MSG_CHANNEL_HEALTH_RESPONSE, \ + MSG_TORRENT_INFO_REQUEST, MSG_TORRENT_INFO_RESPONSE, \ + ERROR_UNKNOWN_RESPONSE, MAX_PACKET_PAYLOAD_SIZE, ERROR_UNKNOWN_PEER, ERROR_NO_CONTENT, \ MSG_CONTENT_INFO_REQUEST, \ SEARCH_TORRENT_REQUEST, MSG_CONTENT_INFO_RESPONSE, SEARCH_TORRENT_RESPONSE from Tribler.community.popular.payload import TorrentHealthPayload, ContentSubscription, TorrentInfoRequestPayload, \ TorrentInfoResponsePayload, SearchResponseItemPayload, \ ContentInfoRequest, Pagination, ContentInfoResponse, decode_values +from Tribler.community.popular.pubsub import PubSubCommunity from Tribler.community.popular.repository import ContentRepository, TYPE_TORRENT_HEALTH from Tribler.community.popular.request import SearchRequest -from Tribler.pyipv8.ipv8.deprecated.community import Community -from Tribler.pyipv8.ipv8.deprecated.payload_headers import BinMemberAuthenticationPayload, GlobalTimeDistributionPayload from Tribler.pyipv8.ipv8.peer import Peer -from Tribler.pyipv8.ipv8.requestcache import RequestCache -class PubSubCommunity(Community): - - def __init__(self, *args, **kwargs): - super(PubSubCommunity, self).__init__(*args, **kwargs) - self.logger = logging.getLogger(self.__class__.__name__) - self.request_cache = RequestCache() - - # Register messages - self.decode_map.update({ - chr(MSG_SUBSCRIBE): self.on_subscribe, - chr(MSG_SUBSCRIPTION): self.on_subscription_status - }) - - # A set of publisher and subscriber. - # Sends data updates to subscribers, and receives updates from subscribers. - self.subscribers = set() - self.publishers = set() - - def start(self): - """ - Starts the community by subscribing to peers, and periodically publishing the content updates to - the subscribers. - """ - # Subscribe peers - self.subscribe_peers() - - def start_publishing(): - # Update the publisher and subscriber list - self.refresh_peer_list() - - # publish the new cotent from the content repository - self.publish_next_content() - - self.register_task("start_publishing", LoopingCall(start_publishing)).start(PUBLISH_INTERVAL, False) - - @inlineCallbacks - def unload(self): - self.request_cache.clear() - self.cancel_pending_task("start_publishing") - yield super(PubSubCommunity, self).unload() - - def subscribe_peers(self): - """ - Subscribes to the connected peers. First, the peers are sorted based on the trust score on descending order and - content subscribe request is sent to the top peers. - This method is called periodically so it can fill up for the disconnected peers by connecting to new peers. - Note that, existing publisher peers are not disconnected even if we find new peers with higher trust score but - only fill up the remaining publisher slots with new top peers. - """ - num_publishers = len(self.publishers) - num_peers = len(self.get_peers()) - # If we have some free publisher slots and there are peers available - if num_publishers < MAX_PUBLISHERS and num_publishers < num_peers: - available_publishers = [peer for peer in self.get_peers() if peer not in self.publishers] - sorted_peers = sorted(available_publishers, - key=lambda _peer: self.trustchain.get_trust(_peer) if self.trustchain else 1, - reverse=True) - for peer in sorted_peers[: MAX_PUBLISHERS - num_publishers]: - self.subscribe(peer, subscribe=True) - - def refresh_peer_list(self): - """ - Updates the publishers and subscribers list by filtering out the disconnected peers. It also calls subscribe - peers to replenish the available publisher slots if necessary. - """ - peers = self.get_peers() - self.logger.info("Num peers: %d", len(peers)) - self.publishers = set([peer for peer in self.publishers if peer in peers]) - self.subscribers = set([peer for peer in self.subscribers if peer in peers]) - - # Log the number of subscribers and publishers - self.logger.info("Publishers: %d, Subscribers: %d", len(self.publishers), len(self.subscribers)) - - # subscribe peers if necessary - self.subscribe_peers() - - def unsubscribe_peers(self): - """ - Unsubscribes from the existing publishers by sending content subscribe request with subscribe=False. It then - clears up its publishers list. - - Called at community unload. - """ - for peer in copy(self.publishers): - self.subscribe(peer, subscribe=False) - self.publishers.clear() - - def subscribe(self, peer, subscribe=True): - """ - Method to send content subscribe/unsubscribe message. This message is sent to each individual publisher peer we - want to subscribe/unsubscribe. - """ - # Add or remove the publisher peer - if subscribe: - self.publishers.add(peer) - else: - self.publishers.remove(peer) - - # Create subscription packet and send it - subscription = ContentSubscription(subscribe) - packet = self.create_message_packet(MSG_SUBSCRIBE, subscription) - self.broadcast_message(packet, peer=peer) - - def on_subscribe(self, source_address, data): - """ - Message handler for content subscribe message. It handles both subscribe and unsubscribe requests. - Upon successful subscription or unsubscription, it send the confirmation subscription message with status. - In case of subscription, it also publishes a list of recently checked torrents to the subscriber. - """ - auth, _, payload = self._ez_unpack_auth(ContentSubscription, data) - peer = self.get_peer_from_auth(auth, source_address) - - # Subscribe or unsubscribe peer - subscribed = peer in self.subscribers - - if payload.subscribe and not subscribed: - if len(self.subscribers) < MAX_SUBSCRIBERS: - self.subscribers.add(peer) - subscribed = True - - elif not payload.subscribe and subscribed: - self.subscribers.remove(peer) - subscribed = False - - # Send subscription response - self.send_subscription_status(peer, subscribed=subscribed) - - return subscribed - - def send_subscription_status(self, peer, subscribed=True): - """ - Method to send content subscription message. Content subscription message is send in response to content - subscribe or unsubscribe message. - """ - if peer not in self.get_peers(): - self.logger.error(ERROR_UNKNOWN_PEER) - return - - subscription = ContentSubscription(subscribed) - packet = self.create_message_packet(MSG_SUBSCRIPTION, subscription) - self.broadcast_message(packet, peer=peer) - - def on_subscription_status(self, source_address, data): - """ - Message handler for content subscription message. Content subscription message is sent by the publisher stating - the status of the subscription in response to subscribe or unsubscribe request. - - If the subscription message has subscribe=True, it means the subscription was successful, so the peer is added - to the subscriber. In other case, publisher is removed if it is still present in the publishers list. - """ - auth, _, payload = self._ez_unpack_auth(ContentSubscription, data) - peer = self.get_peer_from_auth(auth, source_address) - - if payload.subscribe: - self.publishers.add(peer) - elif peer in self.publishers: - self.publishers.remove(peer) - - def create_message_packet(self, message_type, payload): - """ - Helper method to creates a message packet of given type with provided payload. - """ - auth = BinMemberAuthenticationPayload(self.my_peer.public_key.key_to_bin()).to_pack_list() - dist = GlobalTimeDistributionPayload(self.claim_global_time()).to_pack_list() - payload = payload if isinstance(payload, list) else payload.to_pack_list() - return self._ez_pack(self._prefix, message_type, [auth, dist, payload]) - - def broadcast_message(self, packet, peer=None): - """ - Helper method to broadcast the message packet to a single peer or all the subscribers. - """ - if peer is not None: - self.endpoint.send(peer.address, packet) - return - - for _peer in self.subscribers: - self.endpoint.send(_peer.address, packet) - - def get_peer_from_auth(self, auth, source_address): - """ - Get Peer object from the message and auth and source_address. - It is used for mocking the peer in test. - """ - return Peer(auth.public_key_bin, source_address) - - def pack_sized(self, payload_list, fit_size, start_index=0): - assert isinstance(payload_list, list) - serialized_results = '' - size = 0 - current_index = start_index - num_payloads = len(payload_list) - while current_index < num_payloads: - item = payload_list[current_index] - packed_item = self.serializer.pack_multiple(item.to_pack_list()) - packed_item_length = len(packed_item) - if size + packed_item_length > fit_size: - break - else: - size += packed_item_length - serialized_results += packed_item - current_index += 1 - return serialized_results, current_index, current_index - start_index - - @abstractmethod - def publish_next_content(self): - """ Method responsible for publishing content during periodic push """ - pass - - -class PopularCommunity(PubSubCommunity): +class PopularityCommunity(PubSubCommunity): """ Community for disseminating the content across the network. Follows publish-subscribe model. """ + MASTER_PUBLIC_KEY = "3081a7301006072a8648ce3d020106052b810400270381920004073beff7002b6a9fc2824a3b1bbb1c4fc32546261" \ + "e3ef7537874560346c5fdc0c17fe654f67d23b08cbb44141879f79a7a4c8deddf9beb4fbc7a0f02ee1586ccebedb6" \ + "23eeef51710108d702f9250361c071482e83c0a4a86c8f45a0b13a19ef83eacb6267b4bfccf220ae5f6d1db7125ea1" \ + "d10da3744b65679828f23376e28b76ab33132b7fa984a77f159dba7351a7" master_peer = Peer(MASTER_PUBLIC_KEY.decode('hex')) @@ -243,12 +32,10 @@ def __init__(self, *args, **kwargs): self.trustchain = kwargs.pop('trustchain_community', None) self.tribler_session = kwargs.pop('session', None) - super(PopularCommunity, self).__init__(*args, **kwargs) + super(PopularityCommunity, self).__init__(*args, **kwargs) - # Handles database stuffs self.content_repository = ContentRepository(self.torrent_db, self.channel_db) - # Register messages self.decode_map.update({ chr(MSG_TORRENT_HEALTH_RESPONSE): self.on_torrent_health_response, chr(MSG_CHANNEL_HEALTH_RESPONSE): self.on_channel_health_response, @@ -264,13 +51,13 @@ def __init__(self, *args, **kwargs): def unload(self): self.content_repository.cleanup() self.content_repository = None - yield super(PopularCommunity, self).unload() + yield super(PopularityCommunity, self).unload() def on_subscribe(self, source_address, data): auth, _, _ = self._ez_unpack_auth(ContentSubscription, data) peer = self.get_peer_from_auth(auth, source_address) - subscribed = super(PopularCommunity, self).on_subscribe(source_address, data) + subscribed = super(PopularityCommunity, self).on_subscribe(source_address, data) # Publish the latest torrents to the subscriber if subscribed: self.publish_latest_torrents(peer=peer) @@ -321,7 +108,7 @@ def on_torrent_info_response(self, source_address, data): Message handler for torrent info response. """ self.logger.info("Got torrent info response from %s", source_address) - auth, _, payload = self._ez_unpack_auth(TorrentHealthPayload, data) + auth, _, payload = self._ez_unpack_auth(TorrentInfoResponsePayload, data) peer = self.get_peer_from_auth(auth, source_address) if peer not in self.publishers: @@ -355,7 +142,7 @@ def on_content_info_response(self, source_address, data): def process_torrent_search_response(self, query, payload): item_format = SearchResponseItemPayload.format_list - (response, _) = self.serializer.unpack_multiple_as_list(item_format, payload.response) + response, _ = self.serializer.unpack_multiple_as_list(item_format, payload.response) # Decode the category string to list for response_item in response: response_item[4] = decode_values(response_item[4]) @@ -425,6 +212,13 @@ def send_torrent_info_response(self, infohash, peer): self.broadcast_message(packet, peer=peer) def send_content_info_request(self, content_type, request_list, limit=25, peer=None): + """ + Sends the generic content request of givent content_type. + :param content_type: request content type + :param request_list: List request queries + :param limit: Number of expected responses + :param peer: Peer to send this request to + """ cache = self.request_cache.add(SearchRequest(self.request_cache, content_type, request_list)) self.logger.info("Sending search request query:%s, identifier:%s", request_list, cache.number) @@ -438,6 +232,13 @@ def send_content_info_request(self, content_type, request_list, limit=25, peer=N self.broadcast_message(packet, peer=connected_peer) def send_content_info_response(self, peer, identifier, content_type, response_list): + """ + Sends the generic content info response with payload response list. + :param peer: Receiving peer + :param identifier: Request identifier + :param content_type: Message content type + :param response_list: Content response + """ num_results = len(response_list) current_index = 0 page_num = 1 @@ -454,10 +255,16 @@ def send_content_info_response(self, peer, identifier, content_type, response_li self.broadcast_message(packet, peer=peer) def send_torrent_search_request(self, query): + """ + Sends torrent search query as a content info request with content_type as SEARCH_TORRENT_REQUEST. + """ self.send_content_info_request(SEARCH_TORRENT_REQUEST, query) def send_channel_search_request(self, query): - raise NotImplementedError("Not implemented yet. Waiting for all channel 2.0") + """ + Sends channel search query to All Channel 2.0 to get a list of channels. + """ + # TODO: Not implemented yet. Waiting for All Channel 2.0 # CONTENT REPOSITORY STUFFS @@ -467,7 +274,7 @@ def publish_next_content(self): Does nothing if there are none subscribers. Only Torrent health response is published at the moment. """ - self.logger.info("Content to publish: %d", self.content_repository.num_content()) + self.logger.info("Content to publish: %d", self.content_repository.count_content()) if not self.subscribers: self.logger.info("No subscribers found. Not publishing anything") return @@ -496,6 +303,6 @@ def publish_latest_torrents(self, peer): def queue_content(self, content_type, content): """ - Basically addS a given content to the queue of content repository. + Basically adds a given content to the queue of content repository. """ self.content_repository.add_content(content_type, content) diff --git a/Tribler/community/popular/constants.py b/Tribler/community/popular/constants.py index f6163857129..70542090734 100644 --- a/Tribler/community/popular/constants.py +++ b/Tribler/community/popular/constants.py @@ -1,9 +1,3 @@ -# Master peer -MASTER_PUBLIC_KEY = "3081a7301006072a8648ce3d020106052b810400270381920004073beff7002b6a9fc2824a3b1bbb1c4fc32546261" \ - "e3ef7537874560346c5fdc0c17fe654f67d23b08cbb44141879f79a7a4c8deddf9beb4fbc7a0f02ee1586ccebedb6" \ - "23eeef51710108d702f9250361c071482e83c0a4a86c8f45a0b13a19ef83eacb6267b4bfccf220ae5f6d1db7125ea1" \ - "d10da3744b65679828f23376e28b76ab33132b7fa984a77f159dba7351a7" - # Message types for different requests & response MSG_SUBSCRIBE = 1 MSG_SUBSCRIPTION = 2 diff --git a/Tribler/community/popular/payload.py b/Tribler/community/popular/payload.py index bdddc691937..a0f3cdbf9fc 100644 --- a/Tribler/community/popular/payload.py +++ b/Tribler/community/popular/payload.py @@ -230,8 +230,7 @@ def to_pack_list(self): return data @classmethod - def from_unpack_list(cls, *args): - (dbid, dispersy_cid, name, description, nr_torrents, nr_favorite, nr_spam, modified) = args[:8] + def from_unpack_list(cls, dbid, dispersy_cid, name, description, nr_torrents, nr_favorite, nr_spam, modified): return ChannelItemPayload(dbid, dispersy_cid, name, description, nr_torrents, nr_favorite, nr_spam, modified) diff --git a/Tribler/community/popular/pubsub.py b/Tribler/community/popular/pubsub.py new file mode 100644 index 00000000000..4418569d437 --- /dev/null +++ b/Tribler/community/popular/pubsub.py @@ -0,0 +1,236 @@ +import logging +from Tribler.pyipv8.ipv8.deprecated.community import Community + +from Tribler.pyipv8.ipv8.requestcache import RequestCache +from copy import copy +from twisted.internet.defer import inlineCallbacks +from twisted.internet.task import LoopingCall + +from Tribler.community.popular.constants import MSG_SUBSCRIPTION, ERROR_UNKNOWN_PEER, MAX_SUBSCRIBERS, MSG_SUBSCRIBE, \ + MAX_PUBLISHERS, PUBLISH_INTERVAL +from Tribler.community.popular.payload import ContentSubscription +from Tribler.pyipv8.ipv8.deprecated.payload_headers import BinMemberAuthenticationPayload, GlobalTimeDistributionPayload + +from Tribler.pyipv8.ipv8.peer import Peer +from abc import abstractmethod + + +class PubSubCommunity(Community): + """ + This community is designed as a base community for all othe future communities that desires publish subscribe model + for content dissemination. It provides a few basic primitives like subscribe/unsubscribe to publisher peers and + publish/broadcast content to subscriber peers. + + All the derived community should implement publish_next_content() method which is responsible for publishing the + next available content to all the subscribers. + """ + + def __init__(self, *args, **kwargs): + super(PubSubCommunity, self).__init__(*args, **kwargs) + self.logger = logging.getLogger(self.__class__.__name__) + self.request_cache = RequestCache() + + # Register messages + self.decode_map.update({ + chr(MSG_SUBSCRIBE): self.on_subscribe, + chr(MSG_SUBSCRIPTION): self.on_subscription_status + }) + + # A set of publisher and subscriber. + # Sends data updates to subscribers, and receives updates from subscribers. + self.subscribers = set() + self.publishers = set() + + def start(self): + """ + Starts the community by subscribing to peers, and periodically publishing the content updates to + the subscribers. + """ + # Subscribe peers + self.subscribe_peers() + + def start_publishing(): + # Update the publisher and subscriber list + self.refresh_peer_list() + + # publish the new cotent from the content repository + self.publish_next_content() + + self.register_task("start_publishing", LoopingCall(start_publishing)).start(PUBLISH_INTERVAL, False) + + @inlineCallbacks + def unload(self): + self.request_cache.clear() + self.cancel_pending_task("start_publishing") + yield super(PubSubCommunity, self).unload() + + def subscribe_peers(self): + """ + Subscribes to the connected peers. First, the peers are sorted based on the trust score on descending order and + content subscribe request is sent to the top peers. + This method is called periodically through refresh_peer_list() in start_publishing() loop so it can fill up for + the disconnected peers by connecting to new peers. + Note that, existing publisher peers are not disconnected even if we find new peers with higher trust score but + only fill up the remaining publisher slots with new top peers. + """ + num_publishers = len(self.publishers) + num_peers = len(self.get_peers()) + # If we have some free publisher slots and there are peers available + if num_publishers < MAX_PUBLISHERS and num_publishers < num_peers: + available_publishers = [peer for peer in self.get_peers() if peer not in self.publishers] + sorted_peers = sorted(available_publishers, + key=lambda _peer: self.trustchain.get_trust(_peer) if self.trustchain else 1, + reverse=True) + for peer in sorted_peers[: MAX_PUBLISHERS - num_publishers]: + self.subscribe(peer, subscribe=True) + + def refresh_peer_list(self): + """ + Updates the publishers and subscribers list by filtering out the disconnected peers. It also calls subscribe + peers to replenish the available publisher slots if necessary. + """ + peers = self.get_peers() + self.publishers = set([peer for peer in self.publishers if peer in peers]) + self.subscribers = set([peer for peer in self.subscribers if peer in peers]) + + # subscribe peers if necessary + self.subscribe_peers() + + def unsubscribe_peers(self): + """ + Unsubscribes from the existing publishers by sending content subscribe request with subscribe=False. It then + clears up its publishers list. + - Called at community unload. + """ + for peer in copy(self.publishers): + self.subscribe(peer, subscribe=False) + self.publishers.clear() + + def subscribe(self, peer, subscribe=True): + """ + Method to send content subscribe/unsubscribe message. This message is sent to each individual publisher peer we + want to subscribe/unsubscribe. + """ + # Add or remove the publisher peer + if subscribe: + self.publishers.add(peer) + else: + self.publishers.remove(peer) + + # Create subscription packet and send it + subscription = ContentSubscription(subscribe) + packet = self.create_message_packet(MSG_SUBSCRIBE, subscription) + self.broadcast_message(packet, peer=peer) + + def on_subscribe(self, source_address, data): + """ + Message handler for content subscribe message. It handles both subscribe and unsubscribe requests. + Upon successful subscription or unsubscription, it send the confirmation subscription message with status. + In case of subscription, it also publishes a list of recently checked torrents to the subscriber. + """ + auth, _, payload = self._ez_unpack_auth(ContentSubscription, data) + peer = self.get_peer_from_auth(auth, source_address) + + # Subscribe or unsubscribe peer + subscribed = peer in self.subscribers + + if payload.subscribe and not subscribed: + if len(self.subscribers) < MAX_SUBSCRIBERS: + self.subscribers.add(peer) + subscribed = True + + elif not payload.subscribe and subscribed: + self.subscribers.remove(peer) + subscribed = False + + # Send subscription response + self.send_subscription_status(peer, subscribed=subscribed) + + return subscribed + + def send_subscription_status(self, peer, subscribed=True): + """ + Method to send content subscription message. Content subscription message is send in response to content + subscribe or unsubscribe message. + """ + if peer not in self.get_peers(): + self.logger.error(ERROR_UNKNOWN_PEER) + return + + subscription = ContentSubscription(subscribed) + packet = self.create_message_packet(MSG_SUBSCRIPTION, subscription) + self.broadcast_message(packet, peer=peer) + + def on_subscription_status(self, source_address, data): + """ + Message handler for content subscription message. Content subscription message is sent by the publisher stating + the status of the subscription in response to subscribe or unsubscribe request. + + If the subscription message has subscribe=True, it means the subscription was successful, so the peer is added + to the subscriber. In other case, publisher is removed if it is still present in the publishers list. + """ + auth, _, payload = self._ez_unpack_auth(ContentSubscription, data) + peer = self.get_peer_from_auth(auth, source_address) + + if payload.subscribe: + self.publishers.add(peer) + elif peer in self.publishers: + self.publishers.remove(peer) + + def create_message_packet(self, message_type, payload): + """ + Helper method to creates a message packet of given type with provided payload. + """ + auth = BinMemberAuthenticationPayload(self.my_peer.public_key.key_to_bin()).to_pack_list() + dist = GlobalTimeDistributionPayload(self.claim_global_time()).to_pack_list() + payload = payload if isinstance(payload, list) else payload.to_pack_list() + return self._ez_pack(self._prefix, message_type, [auth, dist, payload]) + + def broadcast_message(self, packet, peer=None): + """ + Helper method to broadcast the message packet to a single peer or all the subscribers. + """ + if peer is not None: + self.endpoint.send(peer.address, packet) + return + + for _peer in self.subscribers: + self.endpoint.send(_peer.address, packet) + + def get_peer_from_auth(self, auth, source_address): + """ + Get Peer object from the message and auth and source_address. + It is used for mocking the peer in test. + """ + return Peer(auth.public_key_bin, source_address) + + def pack_sized(self, payload_list, fit_size, start_index=0): + """ + Packs a list of Payload objects to fit into given size limit. + :param payload_list: List list of payload objects + :param fit_size: The maximum allowed size for payload field to fit into UDP packet. + :param start_index: Index of list to start packing + :return: packed string + """ + assert isinstance(payload_list, list) + serialized_results = '' + size = 0 + current_index = start_index + num_payloads = len(payload_list) + while current_index < num_payloads: + item = payload_list[current_index] + packed_item = self.serializer.pack_multiple(item.to_pack_list()) + packed_item_length = len(packed_item) + if size + packed_item_length > fit_size: + break + else: + size += packed_item_length + serialized_results += packed_item + current_index += 1 + return serialized_results, current_index, current_index - start_index + + @abstractmethod + def publish_next_content(self): + """ Method responsible for publishing content during periodic push """ + pass + diff --git a/Tribler/community/popular/repository.py b/Tribler/community/popular/repository.py index 5d3e0925b87..a85866b100e 100644 --- a/Tribler/community/popular/repository.py +++ b/Tribler/community/popular/repository.py @@ -14,6 +14,12 @@ class ContentRepository(object): + """ + This class handles all the stuffs related to the content for PopularityCommunity. Currently, it handles all the + interactions with torrent and channel database. + + It also maintains a content queue which stores the content for publishing in the next publishing cycle. + """ def __init__(self, torrent_db, channel_db): super(ContentRepository, self).__init__() @@ -30,7 +36,7 @@ def add_content(self, content_type, content): if self.queue is not None: self.queue.append((content_type, content)) - def num_content(self): + def count_content(self): return len(self.queue) if self.queue else 0 def pop_content(self): @@ -40,7 +46,6 @@ def get_top_torrents(self, limit=DEFAULT_TORRENT_LIMIT): return self.torrent_db.getRecentlyCheckedTorrents(limit) def update_torrent_health(self, torrent_health_payload, peer_trust=0): - assert torrent_health_payload and isinstance(torrent_health_payload, TorrentHealthPayload) def update_torrent(db_handler, health_payload): db_handler.updateTorrent(infohash, notify=False, num_seeders=health_payload.num_seeders, @@ -49,7 +54,7 @@ def update_torrent(db_handler, health_payload): status=u"good" if health_payload.num_seeders > 1 else u"unknown") if not self.torrent_db: - self.logger.error("Torrent DB is None") + self.logger.error("Torrent DB is not available. Skipping torrent health update.") return infohash = torrent_health_payload.infohash @@ -88,43 +93,62 @@ def has_torrent(self, infohash): return self.get_torrent(infohash) is not None def search_torrent(self, query): - results = [] + """ + Searches for best torrents for the given query and packs them into a list of SearchResponseItemPayload. + :param query: Search query + :return: List + """ + db_results = self.torrent_db.searchNames(query, local=True, keys=['infohash', 'T.name', 'T.length', 'T.num_files', 'T.category', 'T.creation_date', 'T.num_seeders', 'T.num_leechers']) - if db_results: - for dbresult in db_results: - channel_details = dbresult[-10:] + if not db_results: + return [] - dbresult = list(dbresult[:8]) - dbresult[2] = long(dbresult[2]) # length - dbresult[3] = int(dbresult[3]) # num_files - dbresult[4] = [dbresult[4]] # category - dbresult[5] = long(dbresult[5]) # creation_date - dbresult[6] = int(dbresult[6] or 0) # num_seeders - dbresult[7] = int(dbresult[7] or 0) # num_leechers + results = [] + for dbresult in db_results: + channel_details = dbresult[-10:] - # cid - if channel_details[1]: - channel_details[1] = str(channel_details[1]) - dbresult.append(channel_details[1]) + dbresult = list(dbresult[:8]) + dbresult[2] = long(dbresult[2]) # length + dbresult[3] = int(dbresult[3]) # num_files + dbresult[4] = [dbresult[4]] # category + dbresult[5] = long(dbresult[5]) # creation_date + dbresult[6] = int(dbresult[6] or 0) # num_seeders + dbresult[7] = int(dbresult[7] or 0) # num_leechers - results.append(SearchResponseItemPayload(*tuple(dbresult))) + # cid + if channel_details[1]: + channel_details[1] = str(channel_details[1]) + dbresult.append(channel_details[1]) + + results.append(SearchResponseItemPayload(*tuple(dbresult))) return results def search_channels(self, query): - results = [] + """ + Search best channels for the given query. + :param query: Search query + :return: List + """ db_channels = self.channel_db.search_in_local_channels_db(query) + if not db_channels: + return [] + results = [] if db_channels: for channel in db_channels: - channel_payload = channel[:7] + channel_payload = channel[:8] channel_payload[7] = channel[8] # modified results.append(ChannelItemPayload(*channel_payload)) return results def update_from_torrent_search_results(self, search_results): + """ + Updates the torrent database with the provided search results. It also checks for conflicting torrents, meaning + if torrent already exists in the database, we simply ignore the search result. + """ for result in search_results: (infohash, name, length, num_files, category_list, creation_date, seeders, leechers, cid) = result torrent_item = SearchResponseItemPayload(infohash, name, length, num_files, category_list, @@ -143,4 +167,8 @@ def update_from_torrent_search_results(self, search_results): comment='') def update_from_channel_search_results(self, all_items): + """ + TODO: updates the channel database with the search results. + Waiting for all channel 2.0 + """ pass diff --git a/Tribler/community/popular/request.py b/Tribler/community/popular/request.py index 3fefb85f996..b4baaae3b20 100644 --- a/Tribler/community/popular/request.py +++ b/Tribler/community/popular/request.py @@ -6,7 +6,7 @@ class SearchRequest(RandomNumberCache): """ - This request cache keeps track of all outstanding search requests + This request cache keeps track of all outstanding search requests. """ def __init__(self, request_cache, search_type, query): @@ -27,4 +27,4 @@ def finish(self): self.deferred.callback(self.response) def on_timeout(self): - self.deferred.errback(Failure(RuntimeError("Search timeout for query: %s" % self.query))) + self.deferred.errback(self.response)