diff --git a/Tribler/Test/Community/popular/test_community.py b/Tribler/Test/Community/popular/test_community.py index 78839b499cd..0b529f47e22 100644 --- a/Tribler/Test/Community/popular/test_community.py +++ b/Tribler/Test/Community/popular/test_community.py @@ -191,7 +191,7 @@ def on_torrent_health_response(peer, source_address, data): health_info = ('a' * 20, random.randint(1, 100), random.randint(1, 10), random.randint(1, 111111)) self.nodes[1].overlay._queue_content(TYPE_TORRENT_HEALTH, health_info) - self.nodes[1].overlay._publish_next_content() + self.nodes[1].overlay.publish_next_content() yield self.deliver_messages() @@ -212,7 +212,7 @@ def test_publish_no_content(self): # Try publishing the next available content self.nodes[0].no_content = False - self.nodes[0].overlay._publish_next_content() + self.nodes[0].overlay.publish_next_content() yield self.deliver_messages() # Expect no content found to be logged @@ -416,7 +416,7 @@ def fake_get_peer_from_auth(peer): return peer self.nodes[0].overlay._ez_unpack_auth = lambda payload_class, data: fake_unpack_auth() - self.nodes[0].overlay._get_peer_from_auth = lambda auth, address: fake_get_peer_from_auth(self.nodes[1]) + self.nodes[0].overlay.get_peer_from_auth = lambda auth, address: fake_get_peer_from_auth(self.nodes[1]) source_address = MockObject() data = MockObject() @@ -452,7 +452,7 @@ def fake_publish_latest_torrents(my_peer, _peer): self.nodes[0].overlay._publish_latest_torrents = lambda peer: fake_publish_latest_torrents(self.nodes[1], peer) self.nodes[0].overlay._ez_unpack_auth = lambda payload_class, data: fake_unpack_auth() - self.nodes[0].overlay._get_peer_from_auth = lambda auth, address: fake_get_peer_from_auth(self.nodes[1]) + self.nodes[0].overlay.get_peer_from_auth = lambda auth, address: fake_get_peer_from_auth(self.nodes[1]) source_address = MockObject() data = MockObject() @@ -491,7 +491,7 @@ def fake_send_popular_content_subscription(my_peer): self.nodes[0].overlay._publish_latest_torrents = lambda peer: fake_publish_latest_torrents(self.nodes[0], peer) self.nodes[0].overlay._ez_unpack_auth = lambda payload_class, data: fake_unpack_auth() - self.nodes[0].overlay._get_peer_from_auth = lambda auth, address: fake_get_peer_from_auth(self.nodes[1]) + self.nodes[0].overlay.get_peer_from_auth = lambda auth, address: fake_get_peer_from_auth(self.nodes[1]) self.nodes[0].overlay.send_popular_content_subscription = lambda peer, subscribed: \ fake_send_popular_content_subscription(self.nodes[1]) diff --git a/Tribler/community/popular/community.py b/Tribler/community/popular/community.py index 8e23f4f1650..635f4ec431f 100644 --- a/Tribler/community/popular/community.py +++ b/Tribler/community/popular/community.py @@ -21,37 +21,17 @@ from Tribler.pyipv8.ipv8.requestcache import RequestCache -class PopularCommunity(Community): - """ - Community for disseminating the content across the network. Follows publish-subscribe model. - """ - - master_peer = Peer(MASTER_PUBLIC_KEY.decode('hex')) +class PubSubCommunity(Community): def __init__(self, *args, **kwargs): - self.torrent_db = kwargs.pop('torrent_db', None) - self.channel_db = kwargs.pop('channel_db', None) - self.trustchain = kwargs.pop('trustchain_community', None) - self.tribler_session = kwargs.pop('session', None) - super(PopularCommunity, self).__init__(*args, **kwargs) + super(PubSubCommunity, self).__init__(*args, **kwargs) self.logger = logging.getLogger(self.__class__.__name__) - - # Handles database stuffs - self.content_repository = ContentRepository(self.torrent_db, self.channel_db) self.request_cache = RequestCache() # Register messages self.decode_map.update({ chr(MSG_POPULAR_CONTENT_SUBSCRIBE): self.on_popular_content_subscribe, - chr(MSG_POPULAR_CONTENT_SUBSCRIPTION): self.on_popular_content_subscription, - chr(MSG_TORRENT_HEALTH_RESPONSE): self.on_torrent_health_response, - chr(MSG_CHANNEL_HEALTH_RESPONSE): self.on_channel_health_response, - chr(MSG_TORRENT_INFO_REQUEST): self.on_torrent_info_request, - chr(MSG_TORRENT_INFO_RESPONSE): self.on_torrent_info_response, - chr(MSG_SEARCH_REQUEST): self.on_search_request, - chr(MSG_SEARCH_RESPONSE): self.on_search_response, - chr(MSG_CONTENT_INFO_REQUEST): self.on_content_info_request, - chr(MSG_CONTENT_INFO_RESPONSE): self.on_content_info_response + chr(MSG_POPULAR_CONTENT_SUBSCRIPTION): self.on_popular_content_subscription }) # A set of publisher and subscriber. @@ -59,8 +39,6 @@ def __init__(self, *args, **kwargs): self.subscribers = set() self.publishers = set() - self.logger.info('Popular Community initialized (peer mid %s)', self.my_peer.mid.encode('HEX')) - def start(self): """ Starts the community by subscribing to peers, and periodically publishing the content updates to @@ -74,17 +52,14 @@ def start_publishing(): self.refresh_peer_list() # publish the new cotent from the content repository - self._publish_next_content() + self.publish_next_content() self.register_task("start_publishing", LoopingCall(start_publishing)).start(PUBLISH_INTERVAL, False) @inlineCallbacks def unload(self): self.request_cache.clear() - self.content_repository.cleanup() - self.content_repository = None - - yield super(PopularCommunity, self).unload() + yield super(PubSubCommunity, self).unload() def subscribe_peers(self): """ @@ -105,6 +80,22 @@ def subscribe_peers(self): for peer in sorted_peers[: MAX_PUBLISHERS - num_publishers]: self.send_popular_content_subscribe(peer, subscribe=True) + def refresh_peer_list(self): + """ + Updates the publishers and subscribers list by filtering out the disconnected peers. It also calls subscribe + peers to replenish the available publisher slots if necessary. + """ + peers = self.get_peers() + self.logger.info("Num peers: %d", len(peers)) + self.publishers = set([peer for peer in self.publishers if peer in peers]) + self.subscribers = set([peer for peer in self.subscribers if peer in peers]) + + # Log the number of subscribers and publishers + self.logger.info("Publishers: %d, Subscribers: %d", len(self.publishers), len(self.subscribers)) + + # subscribe peers if necessary + self.subscribe_peers() + def unsubscribe_peers(self): """ Unsubscribes from the existing publishers by sending content subscribe request with subscribe=False. It then @@ -115,7 +106,25 @@ def unsubscribe_peers(self): self.send_popular_content_subscribe(peer, subscribe=False) self.publishers.clear() - # MESSAGE HANDLERS + def send_popular_content_subscribe(self, peer, subscribe=True): + """ + Method to send content subscribe/unsubscribe message. This message is sent to each individual publisher peer we + want to subscribe/unsubscribe. + """ + if peer not in self.get_peers(): + self.logger.error(ERROR_UNKNOWN_PEER) + return + + # Add or remove the publisher peer + if subscribe: + self.publishers.add(peer) + else: + self.publishers.remove(peer) + + # Create subscription packet and send it + subscription = ContentSubscription(subscribe) + packet = self.create_message_packet(MSG_POPULAR_CONTENT_SUBSCRIBE, subscription) + self.broadcast_message(packet, peer=peer) def on_popular_content_subscribe(self, source_address, data): """ @@ -124,7 +133,7 @@ def on_popular_content_subscribe(self, source_address, data): In case of subscription, it also publishes a list of recently checked torrents to the subscriber. """ auth, _, payload = self._ez_unpack_auth(ContentSubscription, data) - peer = self._get_peer_from_auth(auth, source_address) + peer = self.get_peer_from_auth(auth, source_address) # Subscribe or unsubscribe peer subscribed = peer in self.subscribers @@ -141,9 +150,20 @@ def on_popular_content_subscribe(self, source_address, data): # Send subscription response self.send_popular_content_subscription(peer, subscribed=subscribed) - # Also publish latest torrents in case the peer is subscribing - if subscribed: - self._publish_latest_torrents(peer=peer) + return subscribed + + def send_popular_content_subscription(self, peer, subscribed=True): + """ + Method to send content subscription message. Content subscription message is send in response to content + subscribe or unsubscribe message. + """ + if peer not in self.get_peers(): + self.logger.error(ERROR_UNKNOWN_PEER) + return + + subscription = ContentSubscription(subscribed) + packet = self.create_message_packet(MSG_POPULAR_CONTENT_SUBSCRIPTION, subscription) + self.broadcast_message(packet, peer=peer) def on_popular_content_subscription(self, source_address, data): """ @@ -154,13 +174,120 @@ def on_popular_content_subscription(self, source_address, data): to the subscriber. In other case, publisher is removed if it is still present in the publishers list. """ auth, _, payload = self._ez_unpack_auth(ContentSubscription, data) - peer = self._get_peer_from_auth(auth, source_address) + peer = self.get_peer_from_auth(auth, source_address) if payload.subscribe: self.publishers.add(peer) elif peer in self.publishers: self.publishers.remove(peer) + def create_message_packet(self, message_type, payload): + """ + Helper method to creates a message packet of given type with provided payload. + """ + auth = BinMemberAuthenticationPayload(self.my_peer.public_key.key_to_bin()).to_pack_list() + dist = GlobalTimeDistributionPayload(self.claim_global_time()).to_pack_list() + payload = payload if isinstance(payload, list) else payload.to_pack_list() + return self._ez_pack(self._prefix, message_type, [auth, dist, payload]) + + def broadcast_message(self, packet, peer=None): + """ + Helper method to broadcast the message packet to a single peer or all the subscribers. + """ + if peer is not None: + self.endpoint.send(peer.address, packet) + return + + for _peer in self.subscribers: + self.endpoint.send(_peer.address, packet) + + def add_or_ignore_subscriber(self, peer): + """ + Helper method to add or ignore new subscriber peer. If we already have max subscriber, the peer is not able to + subscribe. + """ + if len(self.subscribers) < MAX_SUBSCRIBERS: + self.subscribers.add(peer) + return True + return False + + def get_peer_from_auth(self, auth, source_address): + """ + Get Peer object from the message and auth and source_address. + It is used for mocking the peer in test. + """ + return Peer(auth.public_key_bin, source_address) + + def pack_sized(self, payload_list, fit_size, start_index=0): + assert isinstance(payload_list, list) + serialized_results = '' + size = 0 + current_index = start_index + num_payloads = len(payload_list) + while current_index < num_payloads: + item = payload_list[current_index] + packed_item = self.serializer.pack_multiple(item.to_pack_list()) + packed_item_length = len(packed_item) + if size + packed_item_length > fit_size: + break + else: + size += packed_item_length + serialized_results += packed_item + current_index += 1 + return serialized_results, current_index, current_index - start_index + + # Abstract methods + def publish_next_content(self): + pass + + +class PopularCommunity(PubSubCommunity): + """ + Community for disseminating the content across the network. Follows publish-subscribe model. + """ + + master_peer = Peer(MASTER_PUBLIC_KEY.decode('hex')) + + def __init__(self, *args, **kwargs): + self.torrent_db = kwargs.pop('torrent_db', None) + self.channel_db = kwargs.pop('channel_db', None) + self.trustchain = kwargs.pop('trustchain_community', None) + self.tribler_session = kwargs.pop('session', None) + + super(PopularCommunity, self).__init__(*args, **kwargs) + + # Handles database stuffs + self.content_repository = ContentRepository(self.torrent_db, self.channel_db) + + # Register messages + self.decode_map.update({ + chr(MSG_TORRENT_HEALTH_RESPONSE): self.on_torrent_health_response, + chr(MSG_CHANNEL_HEALTH_RESPONSE): self.on_channel_health_response, + chr(MSG_TORRENT_INFO_REQUEST): self.on_torrent_info_request, + chr(MSG_TORRENT_INFO_RESPONSE): self.on_torrent_info_response, + chr(MSG_SEARCH_REQUEST): self.on_search_request, + chr(MSG_SEARCH_RESPONSE): self.on_search_response, + chr(MSG_CONTENT_INFO_REQUEST): self.on_content_info_request, + chr(MSG_CONTENT_INFO_RESPONSE): self.on_content_info_response + }) + + self.logger.info('Popular Community initialized (peer mid %s)', self.my_peer.mid.encode('HEX')) + + @inlineCallbacks + def unload(self): + self.content_repository.cleanup() + self.content_repository = None + yield super(PopularCommunity, self).unload() + + def on_popular_content_subscribe(self, source_address, data): + auth, _, _ = self._ez_unpack_auth(ContentSubscription, data) + peer = self.get_peer_from_auth(auth, source_address) + + subscribed = super(PopularCommunity, self).on_popular_content_subscribe(source_address, data) + # Publish the latest torrents to the subscriber + if subscribed: + self._publish_latest_torrents(peer=peer) + def on_torrent_health_response(self, source_address, data): """ Message handler for torrent health response. Torrent health response is part of periodic update message from @@ -170,7 +297,7 @@ def on_torrent_health_response(self, source_address, data): """ self.logger.info("Got torrent health response from %s", source_address) auth, _, payload = self._ez_unpack_auth(TorrentHealthPayload, data) - peer = self._get_peer_from_auth(auth, source_address) + peer = self.get_peer_from_auth(auth, source_address) if peer not in self.publishers: self.logger.error(ERROR_UNKNOWN_RESPONSE) @@ -194,7 +321,7 @@ def on_torrent_info_request(self, source_address, data): """ self.logger.info("Got torrent info request from %s", source_address) auth, _, payload = self._ez_unpack_auth(TorrentInfoRequestPayload, data) - peer = self._get_peer_from_auth(auth, source_address) + peer = self.get_peer_from_auth(auth, source_address) if peer not in self.publishers: self.logger.error(ERROR_UNKNOWN_RESPONSE) @@ -208,7 +335,7 @@ def on_torrent_info_response(self, source_address, data): """ self.logger.info("Got torrent info response from %s", source_address) auth, _, payload = self._ez_unpack_auth(TorrentHealthPayload, data) - peer = self._get_peer_from_auth(auth, source_address) + peer = self.get_peer_from_auth(auth, source_address) if peer not in self.publishers: self.logger.error(ERROR_UNKNOWN_RESPONSE) @@ -220,7 +347,7 @@ def on_search_request(self, source_address, data): """ Message handler for search request """ self.logger.info("Got search request from %s", source_address) auth, _, payload = self._ez_unpack_auth(SearchRequestPayload, data) - peer = self._get_peer_from_auth(auth, source_address) + peer = self.get_peer_from_auth(auth, source_address) self.logger.info("Search query:%s", payload.query) self.logger.info("Search type:%s", "torrent" if payload.search_type == 0 else "channel") @@ -260,7 +387,7 @@ def on_search_response(self, source_address, data): def on_content_info_request(self, source_address, data): auth, _, payload = self._ez_unpack_auth(ContentInfoRequest, data) - peer = self._get_peer_from_auth(auth, source_address) + peer = self.get_peer_from_auth(auth, source_address) if payload.content_type == SEARCH_TORRENT_REQUEST: db_results = self.content_repository.search_torrent(payload.query_list) @@ -298,39 +425,6 @@ def process_torrent_search_response(self, query, payload): # MESSAGE SENDING FUNCTIONS - def send_popular_content_subscribe(self, peer, subscribe=True): - """ - Method to send content subscribe/unsubscribe message. This message is sent to each individual publisher peer we - want to subscribe/unsubscribe. - """ - if peer not in self.get_peers(): - self.logger.error(ERROR_UNKNOWN_PEER) - return - - # Add or remove the publisher peer - if subscribe: - self.publishers.add(peer) - else: - self.publishers.remove(peer) - - # Create subscription packet and send it - subscription = ContentSubscription(subscribe) - packet = self.create_message_packet(MSG_POPULAR_CONTENT_SUBSCRIBE, subscription) - self.broadcast_message(packet, peer=peer) - - def send_popular_content_subscription(self, peer, subscribed=True): - """ - Method to send content subscription message. Content subscription message is send in response to content - subscribe or unsubscribe message. - """ - if peer not in self.get_peers(): - self.logger.error(ERROR_UNKNOWN_PEER) - return - - subscription = ContentSubscription(subscribed) - packet = self.create_message_packet(MSG_POPULAR_CONTENT_SUBSCRIPTION, subscription) - self.broadcast_message(packet, peer=peer) - def send_torrent_health_response(self, payload, peer=None): """ Method to send torrent health response. This message is sent to all the subscribers by default but if a @@ -392,7 +486,7 @@ def send_content_info_request(self, content_type, request_list, limit=25, peer=N if peer: self.broadcast_message(packet, peer=peer) else: - for connected_peer in self._get_search_peers(): + for connected_peer in self.get_peers(): self.broadcast_message(packet, peer=connected_peer) def send_content_info_response(self, peer, identifier, content_type, response_list): @@ -428,7 +522,7 @@ def _send_search_request(self, search_type, query): packet = self.create_message_packet(MSG_SEARCH_REQUEST, search_request_payload) # Send the request to search peers - for peer in self._get_search_peers(): + for peer in self.get_peers(): self.broadcast_message(packet, peer=peer) def send_search_response(self, peer, identifier, response_type, results): @@ -449,91 +543,9 @@ def send_search_response(self, peer, identifier, response_type, results): packet = self.create_message_packet(MSG_SEARCH_RESPONSE, search_response_payload) self.broadcast_message(packet, peer=peer) - def pack_sized(self, payload_list, fit_size, start_index=0): - assert isinstance(payload_list, list) - serialized_results = '' - size = 0 - current_index = start_index - num_payloads = len(payload_list) - while current_index < num_payloads: - item = payload_list[current_index] - packed_item = self.serializer.pack_multiple(item.to_pack_list()) - packed_item_length = len(packed_item) - if size + packed_item_length > fit_size: - break - else: - size += packed_item_length - serialized_results += packed_item - current_index += 1 - return serialized_results, current_index, current_index - start_index - - # HELPER METHODS - - def refresh_peer_list(self): - """ - Updates the publishers and subscribers list by filtering out the disconnected peers. It also calls subscribe - peers to replenish the available publisher slots if necessary. - """ - peers = self.get_peers() - self.logger.info("Num peers: %d", len(peers)) - self.publishers = set([peer for peer in self.publishers if peer in peers]) - self.subscribers = set([peer for peer in self.subscribers if peer in peers]) - - # Log the number of subscribers and publishers - self.logger.info("Publishers: %d, Subscribers: %d", len(self.publishers), len(self.subscribers)) - - # subscribe peers if necessary - self.subscribe_peers() - - def create_message_packet(self, message_type, payload): - """ - Helper method to creates a message packet of given type with provided payload. - """ - auth = BinMemberAuthenticationPayload(self.my_peer.public_key.key_to_bin()).to_pack_list() - dist = GlobalTimeDistributionPayload(self.claim_global_time()).to_pack_list() - payload = payload if isinstance(payload, list) else payload.to_pack_list() - return self._ez_pack(self._prefix, message_type, [auth, dist, payload]) - - def broadcast_message(self, packet, peer=None): - """ - Helper method to broadcast the message packet to a single peer or all the subscribers. - """ - if peer is not None: - self.endpoint.send(peer.address, packet) - return - - for _peer in self.subscribers: - self.endpoint.send(_peer.address, packet) - - def add_or_ignore_subscriber(self, peer): - """ - Helper method to add or ignore new subscriber peer. If we already have max subscriber, the peer is not able to - subscribe. - """ - if len(self.subscribers) < MAX_SUBSCRIBERS: - self.subscribers.add(peer) - return True - return False - - def _get_peer_from_auth(self, auth, source_address): - """ - Get Peer object from the message and auth and source_address. - It is used for mocking the peer in test. - """ - return Peer(auth.public_key_bin, source_address) - - def _get_search_peers(self): - return self.get_peers() - # CONTENT REPOSITORY STUFFS - def _queue_content(self, content_type, content): - """ - Basically addS a given content to the queue of content repository. - """ - self.content_repository.add_content(content_type, content) - - def _publish_next_content(self): + def publish_next_content(self): """ Publishes the next content from the queue to the subscribers. Does nothing if there are none subscribers. @@ -565,3 +577,9 @@ def _publish_latest_torrents(self, peer): infohash, seeders, leechers, timestamp = torrent[:4] payload = TorrentHealthPayload(infohash, seeders, leechers, timestamp) self.send_torrent_health_response(payload, peer=peer) + + def _queue_content(self, content_type, content): + """ + Basically addS a given content to the queue of content repository. + """ + self.content_repository.add_content(content_type, content)