From e271c849c725e95409ddd71cfc3bd74551980488 Mon Sep 17 00:00:00 2001 From: Martijn de Vos Date: Tue, 26 Mar 2019 12:48:32 +0100 Subject: [PATCH] Refactored PopularityCommunity --- .../Core/APIImplementation/LaunchManyCore.py | 3 +- .../Core/TorrentChecker/torrent_checker.py | 41 ++- .../Community/popularity/test_community.py | 79 +++--- .../Test/Community/popularity/test_payload.py | 53 ---- .../popularity/test_pubsub_community.py | 140 ---------- .../Community/popularity/test_repository.py | 84 ------ .../TorrentChecker/test_torrentchecker.py | 60 ++--- Tribler/community/popularity/community.py | 144 ++++------ Tribler/community/popularity/constants.py | 16 -- Tribler/community/popularity/payload.py | 73 +++--- Tribler/community/popularity/pubsub.py | 246 ------------------ Tribler/community/popularity/repository.py | 94 ------- Tribler/community/popularity/request.py | 32 --- 13 files changed, 176 insertions(+), 889 deletions(-) delete mode 100644 Tribler/Test/Community/popularity/test_payload.py delete mode 100644 Tribler/Test/Community/popularity/test_pubsub_community.py delete mode 100644 Tribler/Test/Community/popularity/test_repository.py delete mode 100644 Tribler/community/popularity/constants.py delete mode 100644 Tribler/community/popularity/pubsub.py delete mode 100644 Tribler/community/popularity/repository.py delete mode 100644 Tribler/community/popularity/request.py diff --git a/Tribler/Core/APIImplementation/LaunchManyCore.py b/Tribler/Core/APIImplementation/LaunchManyCore.py index bb2a0674a00..c77dc6af92d 100644 --- a/Tribler/Core/APIImplementation/LaunchManyCore.py +++ b/Tribler/Core/APIImplementation/LaunchManyCore.py @@ -241,7 +241,8 @@ def load_ipv8_overlays(self): from Tribler.community.popularity.community import PopularityCommunity self.popularity_community = PopularityCommunity(peer, self.ipv8.endpoint, self.ipv8.network, - metadata_store=self.session.lm.mds, session=self.session) + metadata_store=self.session.lm.mds, + torrent_checker=self.torrent_checker) self.ipv8.overlays.append(self.popularity_community) diff --git a/Tribler/Core/TorrentChecker/torrent_checker.py b/Tribler/Core/TorrentChecker/torrent_checker.py index f0a3b93a28c..848ea9563ce 100644 --- a/Tribler/Core/TorrentChecker/torrent_checker.py +++ b/Tribler/Core/TorrentChecker/torrent_checker.py @@ -3,6 +3,7 @@ import logging import random import socket +import sys import time from binascii import hexlify from random import choice @@ -52,6 +53,10 @@ def __init__(self, session): self.socket_mgr = self.udp_port = None self.connection_pool = None + # We keep track of the results of popular torrents checked by you. + # The popularity community gossips this information around. + self.popular_torrents_checked = set() + def initialize(self): self.tracker_check_lc.start(TRACKER_SELECTION_INTERVAL, now=False) self.torrent_check_lc.start(TORRENT_SELECTION_INTERVAL, now=False) @@ -197,6 +202,28 @@ def get_valid_trackers_of_torrent(self, torrent_id): return set([str(tracker.url) for tracker in db_tracker_list if is_valid_url(str(tracker.url)) and not self.is_blacklisted_tracker(str(tracker.url))]) + def update_torrents_checked(self, new_result): + """ + Update the set with torrents that we have checked ourselves. + """ + new_result_tuple = (new_result['infohash'], new_result['seeders'], + new_result['leechers'], new_result['last_check']) + + if len(self.popular_torrents_checked) < 5: + self.popular_torrents_checked.add(new_result_tuple) + return + + min_seeders = sys.maxsize + most_unpopular_torrent = None + for torrent_health_tuple in self.popular_torrents_checked: + if torrent_health_tuple[1] < min_seeders: + min_seeders = torrent_health_tuple[1] + most_unpopular_torrent = torrent_health_tuple + + if new_result['seeders'] > min_seeders: + self.popular_torrents_checked.remove(most_unpopular_torrent) + self.popular_torrents_checked.add(new_result_tuple) + def on_torrent_health_check_completed(self, infohash, result): final_response = {} if not result or not isinstance(result, list): @@ -220,9 +247,7 @@ def on_torrent_health_check_completed(self, infohash, result): torrent_update_dict['leechers'] = l self._update_torrent_result(torrent_update_dict) - - # Add this result to popularity community to publish to subscribers - self.publish_torrent_result(torrent_update_dict) + self.update_torrents_checked(torrent_update_dict) # TODO: DRY! Stop doing lots of formats, just make REST endpoint automatically encode binary data to hex! self.tribler_session.notifier.notify(NTFY_TORRENT, NTFY_UPDATE, infohash, @@ -345,13 +370,3 @@ def _update_torrent_result(self, response): torrent.seeders = seeders torrent.leechers = leechers torrent.last_check = last_check - - def publish_torrent_result(self, response): - if response['seeders'] == 0: - self._logger.info("Not publishing zero seeded torrents") - return - content = (response['infohash'], response['seeders'], response['leechers'], response['last_check']) - if self.tribler_session.lm.popularity_community: - self.tribler_session.lm.popularity_community.queue_content(content) - else: - self._logger.info("Popular community not available to publish torrent checker result") diff --git a/Tribler/Test/Community/popularity/test_community.py b/Tribler/Test/Community/popularity/test_community.py index 8634b2cd4e0..d1f79e3786f 100644 --- a/Tribler/Test/Community/popularity/test_community.py +++ b/Tribler/Test/Community/popularity/test_community.py @@ -1,7 +1,7 @@ from __future__ import absolute_import import os -import random +import time from pony.orm import db_session @@ -10,7 +10,8 @@ from twisted.internet.defer import inlineCallbacks from Tribler.Core.Modules.MetadataStore.store import MetadataStore -from Tribler.community.popularity.community import MSG_TORRENT_HEALTH_RESPONSE, PopularityCommunity +from Tribler.Test.Core.base_test import MockObject +from Tribler.community.popularity.community import PopularityCommunity from Tribler.pyipv8.ipv8.keyvault.crypto import default_eccrypto from Tribler.pyipv8.ipv8.test.base import TestBase from Tribler.pyipv8.ipv8.test.mocking.ipv8 import MockIPv8 @@ -21,67 +22,61 @@ class TestPopularityCommunity(TestBase): def setUp(self): super(TestPopularityCommunity, self).setUp() - self.shared_key = default_eccrypto.generate_key(u"curve25519") + self.count = 0 self.initialize(PopularityCommunity, self.NUM_NODES) def create_node(self, *args, **kwargs): - mds = MetadataStore(os.path.join(self.temporary_directory(), 'test.db'), self.temporary_directory(), - self.shared_key) + mds = MetadataStore(os.path.join(self.temporary_directory(), "%d.db" % self.count), self.temporary_directory(), + default_eccrypto.generate_key(u"curve25519")) - # Add some content to the metadata database - with db_session: - mds.ChannelMetadata.create_channel('test', 'test') - for torrent_ind in xrange(5): - torrent = mds.TorrentMetadata(title='torrent%d' % torrent_ind, infohash=('%d' % torrent_ind) * 20) - torrent.health.seeders = torrent_ind + 1 + torrent_checker = MockObject() + torrent_checker.popular_torrents_checked = set() + + return MockIPv8(u"curve25519", PopularityCommunity, metadata_store=mds, torrent_checker=torrent_checker) - return MockIPv8(u"curve25519", PopularityCommunity, metadata_store=mds) + @db_session + def fill_database(self, metadata_store, last_check_now=False): + for torrent_ind in xrange(5): + last_check = int(time.time()) if last_check_now else 0 + metadata_store.TorrentState( + infohash=('%d' % torrent_ind) * 20, seeders=torrent_ind + 1, last_check=last_check) @inlineCallbacks - def test_content_publishing(self): + def test_torrents_health_gossip(self): """ - Tests publishing next available content. - :return: + Test whether torrent health information is correctly gossiped around """ + self.fill_database(self.nodes[0].overlay.metadata_store) - def on_torrent_health_response(peer, source_address, data): - peer.torrent_health_response_received = True - - self.nodes[0].torrent_health_response_received = False - self.nodes[0].overlay.decode_map[chr(MSG_TORRENT_HEALTH_RESPONSE)] = lambda source_address, data: \ - on_torrent_health_response(self.nodes[0], source_address, data) - + checked_torrent_info = ('a' * 20, 200, 0, int(time.time())) + self.nodes[0].overlay.torrent_checker.popular_torrents_checked.add(checked_torrent_info) yield self.introduce_nodes() - self.nodes[0].overlay.subscribe_peers() - yield self.deliver_messages() - - # Add something to queue - health_info = ('a' * 20, random.randint(1, 100), random.randint(1, 10), random.randint(1, 111111)) - self.nodes[1].overlay.queue_content(health_info) - self.nodes[1].overlay.publish_next_content() + self.nodes[0].overlay.gossip_torrents_health() yield self.deliver_messages() - self.assertTrue(self.nodes[0].torrent_health_response_received, "Expected to receive torrent response") + # Check whether node 1 has new torrent health information + with db_session: + self.assertEqual(len(self.nodes[1].overlay.metadata_store.TorrentState.select()), 6) @inlineCallbacks - def test_publish_latest_torrents(self): + def test_torrents_health_override(self): """ - Test publishing all latest torrents + Test whether torrent health information is overridden when it's more fresh """ + self.fill_database(self.nodes[0].overlay.metadata_store, last_check_now=True) + self.fill_database(self.nodes[1].overlay.metadata_store) + yield self.introduce_nodes() - self.nodes[1].overlay.subscribe_peers() - yield self.deliver_messages() - # Update the health of some torrents - with db_session: - torrents = self.nodes[0].overlay.content_repository.get_top_torrents() - torrents[0].health.seeders = 500 + self.nodes[0].overlay.gossip_torrents_health() - self.nodes[0].overlay.publish_latest_torrents(self.nodes[1].overlay.my_peer) - yield self.deliver_messages() + yield self.deliver_messages(timeout=0.5) + # Check whether node 1 has new torrent health information with db_session: - torrents = self.nodes[1].overlay.content_repository.get_top_torrents() - self.assertEqual(torrents[0].health.seeders, 500) + states = self.nodes[1].overlay.metadata_store.TorrentState.select()[:] + self.assertEqual(len(states), 5) + for state in states: + self.assertIsNot(state.last_check, 0) diff --git a/Tribler/Test/Community/popularity/test_payload.py b/Tribler/Test/Community/popularity/test_payload.py deleted file mode 100644 index c3919f438de..00000000000 --- a/Tribler/Test/Community/popularity/test_payload.py +++ /dev/null @@ -1,53 +0,0 @@ -from __future__ import absolute_import - -import random -import string -from unittest import TestCase - -from Tribler.community.popularity.payload import ContentSubscription, TorrentHealthPayload -from Tribler.pyipv8.ipv8.messaging.serialization import Serializer - - -class TestSerializer(TestCase): - - def setUp(self): - self.serializer = Serializer() - - def random_string(self, size=6, chars=string.ascii_uppercase + string.digits): - return ''.join(random.choice(chars) for _ in range(size)) - - def random_infohash(self): - return ''.join(random.choice('0123456789abcdef') for _ in range(20)) - - def test_content_subscription(self): - """ Test serialization/deserialization of Content subscription """ - subscribe = True - identifier = 123123 - subscription = ContentSubscription(identifier, subscribe) - serialized = self.serializer.pack_multiple(subscription.to_pack_list())[0] - - # Deserialize and test it - (deserialized, _) = self.serializer.unpack_multiple(ContentSubscription.format_list, serialized) - deserialized_subscription = ContentSubscription.from_unpack_list(*deserialized) - - self.assertEqual(deserialized_subscription.identifier, identifier) - self.assertTrue(deserialized_subscription.subscribe) - - def test_torrent_health_payload(self): - """ Test serialization/deserialization of Torrent health payload """ - infohash = b'a' * 20 - num_seeders = 10 - num_leechers = 5 - timestamp = 123123123 - - health_payload = TorrentHealthPayload(infohash, num_seeders, num_leechers, timestamp) - serialized = self.serializer.pack_multiple(health_payload.to_pack_list())[0] - - # Deserialize and test it - (deserialized, _) = self.serializer.unpack_multiple(TorrentHealthPayload.format_list, serialized) - deserialized_payload = TorrentHealthPayload.from_unpack_list(*deserialized) - - self.assertEqual(infohash, deserialized_payload.infohash) - self.assertEqual(num_seeders, deserialized_payload.num_seeders) - self.assertEqual(num_leechers, deserialized_payload.num_leechers) - self.assertEqual(timestamp, deserialized_payload.timestamp) diff --git a/Tribler/Test/Community/popularity/test_pubsub_community.py b/Tribler/Test/Community/popularity/test_pubsub_community.py deleted file mode 100644 index d3f9f281d1a..00000000000 --- a/Tribler/Test/Community/popularity/test_pubsub_community.py +++ /dev/null @@ -1,140 +0,0 @@ -from __future__ import absolute_import - -from twisted.internet.defer import inlineCallbacks - -from Tribler.Test.tools import trial_timeout -from Tribler.community.popularity.constants import PUBLISH_INTERVAL -from Tribler.community.popularity.pubsub import PubSubCommunity -from Tribler.pyipv8.ipv8.test.base import TestBase -from Tribler.pyipv8.ipv8.test.mocking.ipv8 import MockIPv8 - - -class TestPubSubCommunity(TestBase): - NUM_NODES = 2 - - def setUp(self): - super(TestPubSubCommunity, self).setUp() - self.initialize(PubSubCommunity, self.NUM_NODES) - - def create_node(self, *args, **kwargs): - return MockIPv8(u"curve25519", PubSubCommunity) - - @inlineCallbacks - def test_subscribe_peers(self): - """ - Tests subscribing to peers populate publishers and subscribers list. - """ - self.nodes[1].overlay.send_torrent_info_response = lambda infohash, peer: None - yield self.introduce_nodes() - self.nodes[0].overlay.subscribe_peers() - yield self.deliver_messages() - - # Node 0 should have a publisher added - self.assertGreater(len(self.nodes[0].overlay.publishers), 0, "Publisher expected") - # Node 1 should have a subscriber added - self.assertGreater(len(self.nodes[1].overlay.subscribers), 0, "Subscriber expected") - - @inlineCallbacks - def test_subscribe_unsubscribe_individual_peers(self): - """ - Tests subscribing/subscribing an individual peer. - """ - self.nodes[1].overlay.send_torrent_info_response = lambda infohash, peer: None - self.nodes[1].overlay.publish_latest_torrents = lambda *args, **kwargs: None - - yield self.introduce_nodes() - self.nodes[0].overlay.subscribe(self.nodes[1].my_peer, subscribe=True) - yield self.deliver_messages() - - self.assertEqual(len(self.nodes[0].overlay.publishers), 1, "Expected one publisher") - self.assertEqual(len(self.nodes[1].overlay.subscribers), 1, "Expected one subscriber") - - self.nodes[0].overlay.subscribe(self.nodes[1].my_peer, subscribe=False) - yield self.deliver_messages() - - self.assertEqual(len(self.nodes[0].overlay.publishers), 0, "Expected no publisher") - self.assertEqual(len(self.nodes[1].overlay.subscribers), 0, "Expected no subscriber") - - def test_unsubscribe_multiple_peers(self): - """ - Tests unsubscribing multiple peers works as expected. - """ - - def send_popular_content_subscribe(my_peer, _, subscribe): - if not subscribe: - my_peer.unsubsribe_called += 1 - - self.nodes[0].overlay.subscribe = lambda peer, subscribe: \ - send_popular_content_subscribe(self.nodes[0], peer, subscribe) - - # Add some peers - num_peers = 10 - default_peers = [self.create_node() for _ in range(num_peers)] - self.nodes[0].overlay.get_peers = lambda: default_peers - self.assertEqual(len(self.nodes[0].overlay.get_peers()), num_peers) - - # Add some publishers - for peer in default_peers: - self.nodes[0].overlay.publishers.add(peer) - self.assertEqual(len(self.nodes[0].overlay.publishers), num_peers) - - # Unsubscribe all the peers - self.nodes[0].unsubsribe_called = 0 - self.nodes[0].overlay.unsubscribe_peers() - - # Check if unsubscription was successful - self.assertEqual(self.nodes[0].unsubsribe_called, num_peers) - self.assertEqual(len(self.nodes[0].overlay.publishers), 0) - - def test_refresh_peers(self): - """ - Tests if refresh_peer_list() updates the publishers and subscribers list - """ - default_peers = [self.create_node() for _ in range(10)] - - for peer in default_peers: - self.nodes[0].overlay.publishers.add(peer) - self.nodes[0].overlay.subscribers.add(peer) - - self.nodes[0].overlay.get_peers = lambda: default_peers - self.assertEqual(len(self.nodes[0].overlay.get_peers()), 10) - - # Remove half of the peers and refresh peer list - default_peers = default_peers[:5] - self.nodes[0].overlay.refresh_peer_list() - - # List of publishers and subscribers should be updated - self.assertEqual(len(self.nodes[0].overlay.get_peers()), 5) - self.assertEqual(len(self.nodes[0].overlay.subscribers), 5) - self.assertEqual(len(self.nodes[0].overlay.publishers), 5) - - @trial_timeout(6) - @inlineCallbacks - def test_start(self): - """ - Tests starting of the community. Peer should start subscribing to other connected peers. - """ - self.nodes[1].overlay.send_torrent_info_response = lambda infohash, peer: None - - def fake_refresh_peer_list(peer): - peer.called_refresh_peer_list = True - - def fake_publish_next_content(peer): - peer.called_publish_next_content = True - - self.nodes[0].called_refresh_peer_list = False - self.nodes[0].called_publish_next_content = False - self.nodes[0].overlay.refresh_peer_list = lambda: fake_refresh_peer_list(self.nodes[0]) - self.nodes[0].overlay.publish_next_content = lambda: fake_publish_next_content(self.nodes[0]) - - yield self.introduce_nodes() - self.nodes[0].overlay.start() - yield self.sleep(PUBLISH_INTERVAL) - - # Node 0 should have a publisher added - self.assertEqual(len(self.nodes[0].overlay.publishers), 1, "Expected one publisher") - # Node 1 should have a subscriber added - self.assertEqual(len(self.nodes[1].overlay.subscribers), 1, "Expected one subscriber") - - self.assertTrue(self.nodes[0].called_refresh_peer_list) - self.assertTrue(self.nodes[0].called_publish_next_content) diff --git a/Tribler/Test/Community/popularity/test_repository.py b/Tribler/Test/Community/popularity/test_repository.py deleted file mode 100644 index 5a85f8fc15f..00000000000 --- a/Tribler/Test/Community/popularity/test_repository.py +++ /dev/null @@ -1,84 +0,0 @@ -from __future__ import absolute_import - -import time - -from pony.orm import db_session - -from six.moves import xrange - -from twisted.internet.defer import inlineCallbacks - -from Tribler.Core.Modules.MetadataStore.store import MetadataStore -from Tribler.Test.Core.base_test import TriblerCoreTest -from Tribler.community.popularity.payload import TorrentHealthPayload -from Tribler.community.popularity.repository import ContentRepository -from Tribler.pyipv8.ipv8.keyvault.crypto import default_eccrypto - - -class TestContentRepository(TriblerCoreTest): - - @inlineCallbacks - def setUp(self): - yield super(TestContentRepository, self).setUp() - self.my_key = default_eccrypto.generate_key(u"curve25519") - mds = MetadataStore(':memory:', self.session_base_dir, self.my_key) - self.content_repository = ContentRepository(mds) - - # Add some content to the metadata database - with db_session: - mds.ChannelMetadata.create_channel('test', 'test') - for torrent_ind in xrange(5): - torrent = mds.TorrentMetadata(title='torrent%d' % torrent_ind, infohash=('%d' % torrent_ind) * 20) - torrent.health.seeders = torrent_ind + 1 - - def test_has_get_torrent(self): - """ - Test fetching a torrent from the metadata store - """ - self.assertFalse(self.content_repository.get_torrent('9' * 20)) - self.assertTrue(self.content_repository.get_torrent('0' * 20)) - self.assertFalse(self.content_repository.has_torrent('9' * 20)) - self.assertTrue(self.content_repository.has_torrent('0' * 20)) - self.assertFalse(self.content_repository.get_torrent('\x89' * 20)) - - @db_session - def test_get_top_torrents(self): - """ - Test fetching the top torrents from the metadata store - """ - torrents = self.content_repository.get_top_torrents() - self.assertEqual(len(torrents), 5) - self.assertEqual(torrents[0].health.seeders, 5) - - self.assertEqual(len(self.content_repository.get_top_torrents(limit=1)), 1) - - def test_add_content(self): - """ - Test adding and removing content works as expected. - """ - # Initial content queue is zero - self.assertEqual(self.content_repository.queue_length(), 0, "No item expected in queue initially") - - # Add a sample content and check the size - torrent = self.content_repository.get_torrent('0' * 20) - self.content_repository.add_content_to_queue(torrent) - self.assertEqual(self.content_repository.queue_length(), 1, "One item expected in queue") - - # Pop an item - content = self.content_repository.pop_content() - self.assertEqual(content, torrent, "Content should be equal") - - # Check size again - self.assertEqual(self.content_repository.queue_length(), 0, "No item expected in queue") - - def test_update_torrent_health(self): - """ - Tests update torrent health. - """ - fake_torrent_health_payload = TorrentHealthPayload('0' * 20, 10, 4, time.time()) - self.content_repository.update_torrent_health(fake_torrent_health_payload, peer_trust=0) - - with db_session: - torrent = self.content_repository.get_torrent('0' * 20) - self.assertEqual(torrent.health.seeders, 10) - self.assertEqual(torrent.health.leechers, 4) diff --git a/Tribler/Test/Core/TorrentChecker/test_torrentchecker.py b/Tribler/Test/Core/TorrentChecker/test_torrentchecker.py index 3033e2ccb5e..f239faa7859 100644 --- a/Tribler/Test/Core/TorrentChecker/test_torrentchecker.py +++ b/Tribler/Test/Core/TorrentChecker/test_torrentchecker.py @@ -36,7 +36,6 @@ def setUp(self): self.session.lm.torrent_checker = TorrentChecker(self.session) self.session.lm.tracker_manager = TrackerManager(self.session) - self.session.lm.popularity_community = MockObject() self.torrent_checker = self.session.lm.torrent_checker self.torrent_checker.listen_on_udp = lambda: None @@ -169,51 +168,11 @@ def remove_tracker(tracker_url): self.assertEqual(len(test_tracker_list), 1) self.assertEqual(next_tracker_url, "http://announce.torrentsmd.com:8080/announce") - def test_publish_torrent_result(self): - MSG_ZERO_SEED_TORRENT = "Not publishing zero seeded torrents" - MSG_NO_popularity_community = "Popular community not available to publish torrent checker result" - - def _fake_logger_info(torrent_checker, msg): - if msg == MSG_ZERO_SEED_TORRENT: - torrent_checker.zero_seed_torrent = True - if msg == MSG_NO_popularity_community: - torrent_checker.popularity_community_not_found = True - - original_logger_info = self.torrent_checker._logger.info - self.torrent_checker._logger.info = lambda msg: _fake_logger_info(self.torrent_checker, msg) - - def popularity_community_queue_content(torrent_checker, _): - torrent_checker.popularity_community_queue_content_called = True - - self.torrent_checker.tribler_session.lm.popularity_community.queue_content = lambda _content: \ - popularity_community_queue_content(self.torrent_checker, _content) - - # Case1: Fake torrent checker response, seeders:0 - fake_response = {'infohash': 'a' * 20, 'seeders': 0, 'leechers': 0, 'last_check': time.time()} - self.torrent_checker.publish_torrent_result(fake_response) - self.assertTrue(self.torrent_checker.zero_seed_torrent) - - # Case2: Positive seeders - fake_response['seeders'] = 5 - self.torrent_checker.popularity_community_queue_content_called = False - self.torrent_checker.popularity_community_queue_content_called_type = None - - self.torrent_checker.publish_torrent_result(fake_response) - self.assertTrue(self.torrent_checker.popularity_community_queue_content_called) - - # Case3: Popular community is None - self.torrent_checker.tribler_session.lm.popularity_community = None - self.torrent_checker.publish_torrent_result(fake_response) - self.assertTrue(self.torrent_checker.popularity_community_not_found) - - self.torrent_checker._logger.info = original_logger_info - def test_on_health_check_completed(self): tracker1 = 'udp://localhost:2801' tracker2 = "http://badtracker.org/announce" infohash_bin = '\xee'*20 infohash_hex = hexlify(infohash_bin) - self.session.lm.popularity_community.queue_content = lambda _: None failure = Failure() failure.tracker_url = tracker2 @@ -250,6 +209,25 @@ def test_on_health_check_completed(self): self.assertEqual(result[2][1]['DHT'][0]['seeders'], ts.seeders) self.assertLess(previous_check, ts.last_check) + def test_update_checked_torrents(self): + """ + Test updating the list of torrents that you have checked yourself + """ + self.torrent_checker.update_torrents_checked( + {"seeders": 5, "leechers": 3, "last_check": 0, "infohash": 'a' * 20}) + self.assertEqual(len(self.torrent_checker.popular_torrents_checked), 1) + + self.torrent_checker.popular_torrents_checked = set() + for ind in xrange(10): + self.torrent_checker.update_torrents_checked( + {"seeders": ind, "leechers": 3, "last_check": 0, "infohash": 'a' * 20}) + self.assertEqual(len(self.torrent_checker.popular_torrents_checked), 5) + + # We should kick out a torrent now + torrent_dict = {"seeders": 500, "leechers": 3, "last_check": 0, "infohash": 'a' * 20} + self.torrent_checker.update_torrents_checked(torrent_dict) + self.assertIn(('a' * 20, 500, 3, 0), self.torrent_checker.popular_torrents_checked) + @db_session def test_check_random_torrent_legacy(self): """ diff --git a/Tribler/community/popularity/community.py b/Tribler/community/popularity/community.py index 26487a85c0d..666ac6c7130 100644 --- a/Tribler/community/popularity/community.py +++ b/Tribler/community/popularity/community.py @@ -1,130 +1,84 @@ from __future__ import absolute_import +import random from binascii import hexlify, unhexlify from pony.orm import db_session -from twisted.internet.defer import inlineCallbacks +from twisted.internet.task import LoopingCall -from Tribler.community.popularity.constants import (ERROR_UNKNOWN_PEER, ERROR_UNKNOWN_RESPONSE, - MSG_TORRENT_HEALTH_RESPONSE) -from Tribler.community.popularity.payload import ContentSubscription, TorrentHealthPayload -from Tribler.community.popularity.pubsub import PubSubCommunity -from Tribler.community.popularity.repository import ContentRepository +from Tribler.community.popularity.payload import TorrentsHealthPayload +from Tribler.pyipv8.ipv8.community import Community +from Tribler.pyipv8.ipv8.lazy_community import lazy_wrapper +from Tribler.pyipv8.ipv8.messaging.payload_headers import BinMemberAuthenticationPayload from Tribler.pyipv8.ipv8.peer import Peer -class PopularityCommunity(PubSubCommunity): +PUBLISH_INTERVAL = 10 + +MSG_TORRENTS_HEALTH = 1 + + +class PopularityCommunity(Community): """ Community for disseminating the content across the network. Follows publish-subscribe model. """ - MASTER_PUBLIC_KEY = ("3081a7301006072a8648ce3d020106052b8104002703819200040504278d20d6776ce7081ad57d99fe066bb2a93" - "ce7cc92405a534ef7175bab702be557d8c7d3b725ea0eb09c686e798f6c7ad85e8781a4c3b20e54c15ede38077c" - "8f5c801b71d13105f261da7ddcaa94ae14bd177bf1a05a66f595b9bb99117d11f73b4c8d3dcdcdc2b3f838b8ba3" - "5a9f600d2c543e8b3ba646083307b917bbbccfc53fc5ab6ded90b711d7eeda46f5f") + MASTER_PUBLIC_KEY = ("4c69624e61434c504b3a4fcd9aa5256e8859d38509dd53ab93e70b351ac770817acfdccd836cf766ee345ea" + "5c7f6659cc410f3447bafaec8472c40032984d197ffd565903c6e799570bc") master_peer = Peer(unhexlify(MASTER_PUBLIC_KEY)) def __init__(self, *args, **kwargs): - self.metadata_store = kwargs.pop('metadata_store', None) - self.tribler_session = kwargs.pop('session', None) + self.metadata_store = kwargs.pop('metadata_store') + self.torrent_checker = kwargs.pop('torrent_checker', None) super(PopularityCommunity, self).__init__(*args, **kwargs) - self.content_repository = ContentRepository(self.metadata_store) - self.decode_map.update({ - chr(MSG_TORRENT_HEALTH_RESPONSE): self.on_torrent_health_response + chr(MSG_TORRENTS_HEALTH): self.on_torrents_health }) - self.logger.info('Popular Community initialized (peer mid %s)', hexlify(self.my_peer.mid)) - - @inlineCallbacks - def unload(self): - self.content_repository.cleanup() - self.content_repository = None - yield super(PopularityCommunity, self).unload() - - def on_subscribe(self, source_address, data): - auth, _, _ = self._ez_unpack_auth(ContentSubscription, data) - peer = self.get_peer_from_auth(auth, source_address) + self.logger.info('Popularity Community initialized (peer mid %s)', hexlify(self.my_peer.mid)) + self.publish_lc = self.register_task("publish", LoopingCall(self.gossip_torrents_health)) + self.publish_lc.start(PUBLISH_INTERVAL, now=False) - subscribed = super(PopularityCommunity, self).on_subscribe(source_address, data) - # Publish the latest torrents to the subscriber - if subscribed: - self.publish_latest_torrents(peer=peer) - - def on_torrent_health_response(self, source_address, data): + @db_session + def gossip_torrents_health(self): """ - Message handler for torrent health response. Torrent health response is part of periodic update message from - the publisher. If the message was from an unknown publisher then we are not interested in it and it is simply - dropped. In other case, a decision to accept or reject the message is made based on freshness of the message - and the trustscore (check update_torrent in ContentRepository for the implementation). + Gossip torrent health information to another peer. """ - self.logger.debug("Got torrent health response from %s", source_address) - auth, _, payload = self._ez_unpack_auth(TorrentHealthPayload, data) - peer = self.get_peer_from_auth(auth, source_address) - - if peer not in self.publishers: - self.logger.error(ERROR_UNKNOWN_RESPONSE) + if not self.get_peers(): return - infohash = payload.infohash - if not self.content_repository.has_torrent(infohash): - # TODO(Martijn): we should probably try to fetch the torrent info from the other peer - return + random_torrents = self.metadata_store.TorrentState.select( + lambda g: g.last_check is not None and g.seeders > 0).random(5) + random_torrents_list = [(str(torrent.infohash), torrent.seeders, torrent.leechers, torrent.last_check) + for torrent in random_torrents] - peer_trust = self.trustchain.get_trust(peer) if self.trustchain else 0 - self.content_repository.update_torrent_health(payload, peer_trust) + torrents_checked = list(self.torrent_checker.popular_torrents_checked) if self.torrent_checker else [] - # MESSAGE SENDING FUNCTIONS + random_peer = random.choice(self.get_peers()) - def send_torrent_health_response(self, payload, peer=None): - """ - Method to send torrent health response. This message is sent to all the subscribers by default but if a - peer is specified then only that peer receives this message. - """ - if peer and peer not in self.get_peers(): - self.logger.debug(ERROR_UNKNOWN_PEER) - return + auth = BinMemberAuthenticationPayload(self.my_peer.public_key.key_to_bin()).to_pack_list() + payload = TorrentsHealthPayload(random_torrents_list, torrents_checked).to_pack_list() - packet = self.create_message_packet(MSG_TORRENT_HEALTH_RESPONSE, payload) - self.broadcast_message(packet, peer=peer) + packet = self._ez_pack(self._prefix, MSG_TORRENTS_HEALTH, [auth, payload]) + self.endpoint.send(random_peer.address, packet) - # CONTENT REPOSITORY STUFFS + @lazy_wrapper(TorrentsHealthPayload) + def on_torrents_health(self, _, payload): + self.logger.info("Received torrent health information for %d random torrents and %d checked torrents", + len(payload.random_torrents), len(payload.torrents_checked)) - def publish_next_content(self): - """ - Publishes the next content from the queue to the subscribers. - Does nothing if there are no subscribers. - Only Torrent health response is published at the moment. - """ - self.logger.info("Content to publish: %d", self.content_repository.queue_length()) - if not self.subscribers: - self.logger.info("No subscribers found. Not publishing anything") - return - - content = self.content_repository.pop_content() - if content: - infohash, seeders, leechers, timestamp = content - payload = TorrentHealthPayload(bytes(infohash), seeders, leechers, timestamp) - self.send_torrent_health_response(payload) - - def publish_latest_torrents(self, peer): - """ - Publishes the latest torrents in local database to the given peer. - """ + all_torrents = payload.random_torrents + payload.torrents_checked with db_session: - torrents = self.content_repository.get_top_torrents() - self.logger.info("Publishing %d torrents to peer %s", len(torrents), peer) - - to_send = [TorrentHealthPayload(bytes(torrent.infohash), torrent.health.seeders, torrent.health.leechers, - torrent.health.last_check) for torrent in torrents] - for payload in to_send: - self.send_torrent_health_response(payload, peer=peer) - - def queue_content(self, content): - """ - Basically adds a given content to the queue of content repository. - """ - self.content_repository.add_content_to_queue(content) + for infohash, seeders, leechers, last_check in all_torrents: + torrent_state = self.metadata_store.TorrentState.get(infohash=infohash) + if torrent_state and last_check > torrent_state.last_check: + # Replace current information + torrent_state.seeders = seeders + torrent_state.leechers = leechers + torrent_state.last_check = last_check + elif not torrent_state: + _ = self.metadata_store.TorrentState(infohash=infohash, seeders=seeders, + leechers=leechers, last_check=last_check) diff --git a/Tribler/community/popularity/constants.py b/Tribler/community/popularity/constants.py deleted file mode 100644 index cff151fd64f..00000000000 --- a/Tribler/community/popularity/constants.py +++ /dev/null @@ -1,16 +0,0 @@ -# Message types for different requests & response -MSG_SUBSCRIBE = 1 -MSG_SUBSCRIPTION = 2 -MSG_TORRENT_HEALTH_RESPONSE = 3 - -MAX_SUBSCRIBERS = 10 -MAX_PUBLISHERS = 10 -PUBLISH_INTERVAL = 5 - -# Maximum packet payload size in bytes -MAX_PACKET_PAYLOAD_SIZE = 500 - -# Error definitions -ERROR_UNKNOWN_PEER = "Unknown peer! No response sent" -ERROR_UNKNOWN_RESPONSE = "Received response from non-subscribed peer. Dropping it." -ERROR_NO_CONTENT = "Nothing to publish" diff --git a/Tribler/community/popularity/payload.py b/Tribler/community/popularity/payload.py index f0794e33064..00e7dd89b9e 100644 --- a/Tribler/community/popularity/payload.py +++ b/Tribler/community/popularity/payload.py @@ -1,48 +1,57 @@ from __future__ import absolute_import -from Tribler.pyipv8.ipv8.messaging.payload import Payload - - -class ContentSubscription(Payload): +import struct - format_list = ['I', '?'] +from Tribler.pyipv8.ipv8.messaging.payload import Payload - def __init__(self, identifier, subscribe): - super(ContentSubscription, self).__init__() - self.identifier = identifier - self.subscribe = subscribe - def to_pack_list(self): - data = [('I', self.identifier), - ('?', self.subscribe)] - return data - - @classmethod - def from_unpack_list(cls, *args): - (identifier, subscribe) = args - return ContentSubscription(identifier, subscribe) +TORRENT_INFO_FORMAT = '20sIIQ' # Infohash, seeders, leechers and a timestamp -class TorrentHealthPayload(Payload): +class TorrentsHealthPayload(Payload): - format_list = ['20s', 'I', 'I', 'Q'] + format_list = ['I', 'I', 'varlenI', 'raw'] # Number of random torrents, number of torrents checked by you - def __init__(self, infohash, num_seeders, num_leechers, timestamp): - super(TorrentHealthPayload, self).__init__() - self.infohash = infohash - self.num_seeders = num_seeders or 0 - self.num_leechers = num_leechers or 0 - self.timestamp = timestamp or 0 + def __init__(self, random_torrents, torrents_checked): + """ + Initialize a TorrentsHealthPayload, containing information on the health of both random torrents and popular + torrents that have been checked by you. + :param random_torrents: List of tuple of (infohash, seeders, leechers, checked_timestamp) + :param torrents_checked: List of tuple of (infohash, seeders, leechers, checked_timestamp) + """ + super(TorrentsHealthPayload, self).__init__() + self.random_torrents = random_torrents + self.torrents_checked = torrents_checked def to_pack_list(self): - data = [('20s', self.infohash), - ('I', self.num_seeders), - ('I', self.num_leechers), - ('Q', self.timestamp)] + random_torrents_items = [item for sublist in self.random_torrents for item in sublist] + checked_torrents_items = [item for sublist in self.torrents_checked for item in sublist] + data = [('I', len(self.random_torrents)), + ('I', len(self.torrents_checked)), + ('varlenI', struct.pack("!" + TORRENT_INFO_FORMAT * len(self.random_torrents), *random_torrents_items)), + ('raw', struct.pack("!" + TORRENT_INFO_FORMAT * len(self.torrents_checked), *checked_torrents_items))] return data @classmethod def from_unpack_list(cls, *args): - (infohash, num_seeders, num_leechers, timestamp) = args - return TorrentHealthPayload(infohash, num_seeders, num_leechers, timestamp) + num_random_torrents, num_checked_torrents, raw_random_torrents, raw_checked_torrents = args + + random_torrents_list = struct.unpack("!" + TORRENT_INFO_FORMAT * num_random_torrents, raw_random_torrents) + checked_torrents_list = struct.unpack("!" + TORRENT_INFO_FORMAT * num_checked_torrents, raw_checked_torrents) + + random_torrents = [] + checked_torrents = [] + for ind in range(num_random_torrents): + random_torrents.append((random_torrents_list[ind * 4], + random_torrents_list[ind * 4 + 1], + random_torrents_list[ind * 4 + 2], + random_torrents_list[ind * 4 + 3])) + + for ind in range(num_checked_torrents): + checked_torrents.append((checked_torrents_list[ind * 4], + checked_torrents_list[ind * 4 + 1], + checked_torrents_list[ind * 4 + 2], + checked_torrents_list[ind * 4 + 3])) + + return TorrentsHealthPayload(random_torrents, checked_torrents) diff --git a/Tribler/community/popularity/pubsub.py b/Tribler/community/popularity/pubsub.py deleted file mode 100644 index 5c1d8e9fe4c..00000000000 --- a/Tribler/community/popularity/pubsub.py +++ /dev/null @@ -1,246 +0,0 @@ -from __future__ import absolute_import - -import logging -from binascii import unhexlify -from copy import copy - -from twisted.internet.defer import inlineCallbacks -from twisted.internet.task import LoopingCall - -from Tribler.community.popularity.constants import ERROR_UNKNOWN_PEER, MAX_PUBLISHERS, MAX_SUBSCRIBERS, MSG_SUBSCRIBE, \ - MSG_SUBSCRIPTION, PUBLISH_INTERVAL -from Tribler.community.popularity.payload import ContentSubscription -from Tribler.community.popularity.request import ContentRequest -from Tribler.pyipv8.ipv8.community import Community -from Tribler.pyipv8.ipv8.messaging.payload_headers import BinMemberAuthenticationPayload, GlobalTimeDistributionPayload -from Tribler.pyipv8.ipv8.peer import Peer -from Tribler.pyipv8.ipv8.requestcache import RequestCache - - -class PubSubCommunity(Community): - """ - This community is designed as a base community for all othe future communities that desires publish subscribe model - for content dissemination. It provides a few basic primitives like subscribe/unsubscribe to publisher peers and - publish/broadcast content to subscriber peers. - - All the derived community should implement publish_next_content() method which is responsible for publishing the - next available content to all the subscribers. - """ - MASTER_PUBLIC_KEY = "3081a7301006072a8648ce3d020106052b8104002703819200040504278d20d6776ce7081ad57d99fe066bb2a93" \ - "ce7cc92405a534ef7175bab702be557d8c7d3b725ea0eb09c686e798f6c7ad85e8781a4c3b20e54c15ede38077c" \ - "8f5c801b71d13105f261da7ddcaa94ae14bd177bf1a05a66f595b9bb99117d11f73b4c8d3dcdcdc2b3f838b8ba3" \ - "5a9f600d2c543e8b3ba646083307b917bbbccfc53fc5ab6ded90b711d7eeda46f5f" - - master_peer = Peer(unhexlify(MASTER_PUBLIC_KEY)) - - def __init__(self, *args, **kwargs): - super(PubSubCommunity, self).__init__(*args, **kwargs) - self.trustchain = kwargs.pop('trustchain_community', None) - self.logger = logging.getLogger(self.__class__.__name__) - self.request_cache = RequestCache() - - # Register messages - self.decode_map.update({ - chr(MSG_SUBSCRIBE): self.on_subscribe, - chr(MSG_SUBSCRIPTION): self.on_subscription_status - }) - - # A set of publisher and subscriber. - # Sends data updates to subscribers, and receives updates from subscribers. - self.subscribers = set() - self.publishers = set() - - def start(self): - """ - Starts the community by subscribing to peers, and periodically publishing the content updates to - the subscribers. - """ - # Subscribe peers - self.subscribe_peers() - - def start_publishing(): - # Update the publisher and subscriber list - self.refresh_peer_list() - - # publish the new cotent from the content repository - self.publish_next_content() - - self.register_task("start_publishing", LoopingCall(start_publishing)).start(PUBLISH_INTERVAL, False) - - @inlineCallbacks - def unload(self): - self.request_cache.clear() - self.cancel_pending_task("start_publishing") - yield super(PubSubCommunity, self).unload() - - def subscribe_peers(self): - """ - Subscribes to the connected peers. First, the peers are sorted based on the trust score on descending order and - content subscribe request is sent to the top peers. - This method is called periodically through refresh_peer_list() in start_publishing() loop so it can fill up for - the disconnected peers by connecting to new peers. - Note that, existing publisher peers are not disconnected even if we find new peers with higher trust score but - only fill up the remaining publisher slots with new top peers. - """ - num_publishers = len(self.publishers) - num_peers = len(self.get_peers()) - # If we have some free publisher slots and there are peers available - if num_publishers < MAX_PUBLISHERS and num_publishers < num_peers: - available_publishers = [peer for peer in self.get_peers() if peer not in self.publishers] - sorted_peers = sorted(available_publishers, - key=lambda _peer: self.trustchain.get_trust(_peer) if self.trustchain else 1, - reverse=True) - for peer in sorted_peers[: MAX_PUBLISHERS - num_publishers]: - self.subscribe(peer, subscribe=True) - - def refresh_peer_list(self): - """ - Updates the publishers and subscribers list by filtering out the disconnected peers. It also calls subscribe - peers to replenish the available publisher slots if necessary. - """ - peers = self.get_peers() - self.publishers = set([peer for peer in self.publishers if peer in peers]) - self.subscribers = set([peer for peer in self.subscribers if peer in peers]) - - # subscribe peers if necessary - self.subscribe_peers() - - def unsubscribe_peers(self): - """ - Unsubscribes from the existing publishers by sending content subscribe request with subscribe=False. It then - clears up its publishers list. - - Called at community unload. - """ - for peer in copy(self.publishers): - self.subscribe(peer, subscribe=False) - self.publishers.clear() - - def subscribe(self, peer, subscribe=True): - """ - Method to send content subscribe/unsubscribe message. This message is sent to each individual publisher peer we - want to subscribe/unsubscribe. - """ - cache = self.request_cache.add(ContentRequest(self.request_cache, MSG_SUBSCRIBE, None)) - # Remove the publisher peer already if user is trying to unsubscribe - if not subscribe: - self.publishers.remove(peer) - - # Create subscription packet and send it - subscription = ContentSubscription(cache.number, subscribe) - packet = self.create_message_packet(MSG_SUBSCRIBE, subscription) - self.broadcast_message(packet, peer=peer) - - def on_subscribe(self, source_address, data): - """ - Message handler for content subscribe message. It handles both subscribe and unsubscribe requests. - Upon successful subscription or unsubscription, it send the confirmation subscription message with status. - In case of subscription, it also publishes a list of recently checked torrents to the subscriber. - """ - auth, _, payload = self._ez_unpack_auth(ContentSubscription, data) - peer = self.get_peer_from_auth(auth, source_address) - - # Subscribe or unsubscribe peer - subscribed = peer in self.subscribers - - if payload.subscribe and not subscribed: - if len(self.subscribers) < MAX_SUBSCRIBERS: - self.subscribers.add(peer) - subscribed = True - - elif not payload.subscribe and subscribed: - self.subscribers.remove(peer) - subscribed = False - - # Send subscription response - self.send_subscription_status(peer, payload.identifier, subscribed=subscribed) - - return subscribed - - def send_subscription_status(self, peer, identifier, subscribed=True): - """ - Method to send content subscription message. Content subscription message is send in response to content - subscribe or unsubscribe message. - """ - if peer not in self.get_peers(): - self.logger.error(ERROR_UNKNOWN_PEER) - return - - subscription = ContentSubscription(identifier, subscribed) - packet = self.create_message_packet(MSG_SUBSCRIPTION, subscription) - self.broadcast_message(packet, peer=peer) - - def on_subscription_status(self, source_address, data): - """ - Message handler for content subscription message. Content subscription message is sent by the publisher stating - the status of the subscription in response to subscribe or unsubscribe request. - - If the subscription message has subscribe=True, it means the subscription was successful, so the peer is added - to the subscriber. In other case, publisher is removed if it is still present in the publishers list. - """ - auth, _, payload = self._ez_unpack_auth(ContentSubscription, data) - peer = self.get_peer_from_auth(auth, source_address) - - if not self.request_cache.has(u'request', payload.identifier): - return - self.request_cache.pop(u'request', payload.identifier) - - if payload.subscribe: - self.publishers.add(peer) - elif peer in self.publishers: - self.publishers.remove(peer) - - def create_message_packet(self, message_type, payload): - """ - Helper method to creates a message packet of given type with provided payload. - """ - auth = BinMemberAuthenticationPayload(self.my_peer.public_key.key_to_bin()).to_pack_list() - dist = GlobalTimeDistributionPayload(self.claim_global_time()).to_pack_list() - payload = payload if isinstance(payload, list) else payload.to_pack_list() - return self._ez_pack(self._prefix, message_type, [auth, dist, payload]) - - def broadcast_message(self, packet, peer=None): - """ - Helper method to broadcast the message packet to a single peer or all the subscribers. - """ - if peer is not None: - self.endpoint.send(peer.address, packet) - return - - for _peer in self.subscribers: - self.endpoint.send(_peer.address, packet) - - def get_peer_from_auth(self, auth, source_address): - """ - Get Peer object from the message and auth and source_address. - It is used for mocking the peer in test. - """ - return Peer(auth.public_key_bin, source_address) - - def pack_sized(self, payload_list, fit_size, start_index=0): - """ - Packs a list of Payload objects to fit into given size limit. - :param payload_list: List list of payload objects - :param fit_size: The maximum allowed size for payload field to fit into UDP packet. - :param start_index: Index of list to start packing - :return: packed string - """ - assert isinstance(payload_list, list) - serialized_results = '' - size = 0 - current_index = start_index - num_payloads = len(payload_list) - while current_index < num_payloads: - item = payload_list[current_index] - packed_item = self.serializer.pack_multiple(item.to_pack_list())[0] - packed_item_length = len(packed_item) - if size + packed_item_length > fit_size: - break - else: - size += packed_item_length - serialized_results += packed_item - current_index += 1 - return serialized_results, current_index, current_index - start_index - - def publish_next_content(self): - """ Method responsible for publishing content during periodic push """ - pass diff --git a/Tribler/community/popularity/repository.py b/Tribler/community/popularity/repository.py deleted file mode 100644 index 65bc2fcb544..00000000000 --- a/Tribler/community/popularity/repository.py +++ /dev/null @@ -1,94 +0,0 @@ -from __future__ import absolute_import - -import logging -import time -from collections import deque - -from pony.orm import db_session, desc - -from Tribler.Core.Modules.MetadataStore.serialization import REGULAR_TORRENT -from Tribler.pyipv8.ipv8.database import database_blob - -try: - long # pylint: disable=long-builtin -except NameError: - long = int # pylint: disable=redefined-builtin - -MAX_CACHE = 200 - -DEFAULT_TORRENT_LIMIT = 25 -DEFAULT_FRESHNESS_LIMIT = 60 - -TYPE_TORRENT_HEALTH = 1 - - -class ContentRepository(object): - """ - This class handles all the stuffs related to the content for PopularityCommunity. Currently, it handles all the - interactions with torrent and channel database. - - It also maintains a content queue which stores the content for publishing in the next publishing cycle. - """ - - def __init__(self, metadata_store): - super(ContentRepository, self).__init__() - self.logger = logging.getLogger(self.__class__.__name__) - self.metadata_store = metadata_store - self.queue = deque(maxlen=MAX_CACHE) - - def cleanup(self): - self.queue = None - - def add_content_to_queue(self, content): - if self.queue is not None: - self.queue.append(content) - - def queue_length(self): - return len(self.queue) if self.queue else 0 - - def pop_content(self): - return self.queue.pop() if self.queue else None - - @db_session - def get_top_torrents(self, limit=DEFAULT_TORRENT_LIMIT): - return list(self.metadata_store.TorrentMetadata.select( - lambda g: g.metadata_type == REGULAR_TORRENT).sort_by(desc("g.health.seeders")).limit(limit)) - - @db_session - def update_torrent_health(self, torrent_health_payload, peer_trust=0): - """ - Update the health of a torrent in the database. - """ - if not self.metadata_store: - self.logger.error("Metadata store is not available. Skipping torrent health update.") - return - - infohash = torrent_health_payload.infohash - if not self.has_torrent(infohash): - return - - torrent = self.get_torrent(infohash) - is_fresh = time.time() - torrent.health.last_check < DEFAULT_FRESHNESS_LIMIT - if is_fresh and peer_trust < 2: - self.logger.info("Database record is already fresh and the sending peer trust " - "score is too low so we just ignore the response.") - else: - # Update the torrent health anyway. A torrent info request should be sent separately - # to request additional info. - torrent.health.seeders = torrent_health_payload.num_seeders - torrent.health.leechers = torrent_health_payload.num_leechers - torrent.health.last_check = int(torrent_health_payload.timestamp) - - @db_session - def get_torrent(self, infohash): - """ - Return a torrent with a specific infohash from the database. - """ - results = list(self.metadata_store.TorrentMetadata.select( - lambda g: g.infohash == database_blob(infohash) and g.metadata_type == REGULAR_TORRENT).limit(1)) - if results: - return results[0] - return None - - def has_torrent(self, infohash): - return self.get_torrent(infohash) is not None diff --git a/Tribler/community/popularity/request.py b/Tribler/community/popularity/request.py deleted file mode 100644 index 14d58ed497d..00000000000 --- a/Tribler/community/popularity/request.py +++ /dev/null @@ -1,32 +0,0 @@ -from __future__ import absolute_import - -from twisted.internet.defer import Deferred - -from Tribler.pyipv8.ipv8.requestcache import RandomNumberCache - - -class ContentRequest(RandomNumberCache): - """ - This request cache keeps track of all outstanding search requests. - """ - CONTENT_TIMEOUT = 30.0 - - def __init__(self, request_cache, search_type, query): - super(ContentRequest, self).__init__(request_cache, u"request") - self.query = query - self.search_type = search_type - self.response = [] - self.deferred = Deferred() - - @property - def timeout_delay(self): - return ContentRequest.CONTENT_TIMEOUT - - def append_response(self, response): - self.response.extend(response) - - def finish(self): - self.deferred.callback(self.response) - - def on_timeout(self): - self.deferred.callback(self.response)