diff --git a/Tribler/Core/APIImplementation/LaunchManyCore.py b/Tribler/Core/APIImplementation/LaunchManyCore.py index 916ab7a9d59..9e98f6d2eb7 100644 --- a/Tribler/Core/APIImplementation/LaunchManyCore.py +++ b/Tribler/Core/APIImplementation/LaunchManyCore.py @@ -103,6 +103,7 @@ def __init__(self): self.torrent_checker = None self.tunnel_community = None self.triblerchain_community = None + self.popular_community = None self.startup_deferred = Deferred() @@ -303,6 +304,21 @@ def load_ipv8_overlays(self): self.ipv8.strategies.append((RandomWalk(self.market_community), 20)) + # Popular Community + if self.session.config.get_popular_community_enabled(): + from Tribler.community.popular.community import PopularCommunity + + local_peer = Peer(self.session.trustchain_keypair) + + self.popular_community = PopularCommunity(local_peer, self.ipv8.endpoint, self.ipv8.network, + torrent_db=self.session.lm.torrent_db) + + self.ipv8.overlays.append(self.popular_community) + + self.ipv8.strategies.append((RandomWalk(self.popular_community), 20)) + + self.popular_community.start() + @blocking_call_on_reactor_thread def load_dispersy_communities(self): self._logger.info("tribler: Preparing Dispersy communities...") diff --git a/Tribler/Core/CacheDB/SqliteCacheDBHandler.py b/Tribler/Core/CacheDB/SqliteCacheDBHandler.py index c8d11be8418..a7f35725989 100644 --- a/Tribler/Core/CacheDB/SqliteCacheDBHandler.py +++ b/Tribler/Core/CacheDB/SqliteCacheDBHandler.py @@ -799,6 +799,16 @@ def getRecentlyCollectedTorrents(self, limit): results = self._db.fetchall(sql, (limit,)) return [[str2bin(result[0]), result[1], result[2], result[3] or 0, result[4]] for result in results] + def getRecentlyCheckedTorrents(self, limit): + sql = u""" + SELECT T.infohash, T.num_seeders, T.num_leechers, T.last_tracker_check + FROM Torrent T + WHERE T.is_collected = 0 AND T.num_seeders > 1 + AND T.secret is not 1 ORDER BY T.last_tracker_check, T.num_seeders DESC LIMIT ? + """ + results = self._db.fetchall(sql, (limit,)) + return [[str2bin(result[0]), result[1], result[2], result[3] or 0] for result in results] + def getRandomlyCollectedTorrents(self, insert_time, limit): sql = u""" SELECT CT.infohash, CT.num_seeders, CT.num_leechers, T.last_tracker_check diff --git a/Tribler/Core/Config/config.spec b/Tribler/Core/Config/config.spec index a3cf5bfe7ea..7f1244ff0d6 100644 --- a/Tribler/Core/Config/config.spec +++ b/Tribler/Core/Config/config.spec @@ -114,3 +114,7 @@ history_size = integer(min=1, default=20) enabled = boolean(default=True) sources = string_list(default=list()) max_disk_space = integer(min=0, default=53687091200) + +[popular_community] +enabled = boolean(default=True) +cache_dir = string(default=health_cache) \ No newline at end of file diff --git a/Tribler/Core/Config/tribler_config.py b/Tribler/Core/Config/tribler_config.py index e7149a06925..21ce9bdbf1f 100644 --- a/Tribler/Core/Config/tribler_config.py +++ b/Tribler/Core/Config/tribler_config.py @@ -531,6 +531,22 @@ def set_dummy_wallets_enabled(self, value): def get_dummy_wallets_enabled(self): return self.config['wallets']['dummy_wallets_enabled'] + # Popular Community + + def get_popular_community_enabled(self): + return self.config['popular_community']['enabled'] or True + + def set_popular_community_enabled(self, value): + self.config['popular_community']['enabled'] = value + + def get_health_cache_dir(self): + cache_dir = self.config['popular_community']['cache_dir'] \ + if 'cache_dir' in self.config['popular_community'] else 'health_cache' + return os.path.join(self.get_state_dir(), cache_dir) + + def set_health_cache_dir(self, cache_dir): + self.config['popular_community']['cache_dir'] = cache_dir + # Torrent store def get_torrent_store_enabled(self): diff --git a/Tribler/Core/TorrentChecker/torrent_checker.py b/Tribler/Core/TorrentChecker/torrent_checker.py index 32b04e06971..3faaf3ca95f 100644 --- a/Tribler/Core/TorrentChecker/torrent_checker.py +++ b/Tribler/Core/TorrentChecker/torrent_checker.py @@ -13,6 +13,7 @@ from Tribler.Core.TorrentChecker.session import create_tracker_session, FakeDHTSession, UdpSocketManager from Tribler.Core.Utilities.tracker_utils import MalformedTrackerURLException from Tribler.Core.simpledefs import NTFY_TORRENTS +from Tribler.community.popular.repository import TYPE_TORRENT_HEALTH from Tribler.dispersy.util import blocking_call_on_reactor_thread, call_on_reactor_thread from Tribler.pyipv8.ipv8.taskmanager import TaskManager @@ -198,6 +199,9 @@ def on_gui_request_completed(self, infohash, result): self._update_torrent_result(torrent_update_dict) + # Add this result to popular community to publish to subscribers + self._publish_torrent_result(torrent_update_dict) + return final_response @call_on_reactor_thread @@ -326,3 +330,13 @@ def _update_torrent_result(self, response): self._torrent_db.updateTorrentCheckResult(torrent_id, infohash, seeders, leechers, last_check, next_check, status, retries) + + def _publish_torrent_result(self, response): + if response['seeders'] == 0: + self._logger.info("Not publishing zero seeded torrents") + return + content = (response['infohash'], response['seeders'], response['leechers'], response['last_check']) + if self.tribler_session.lm.popular_community: + self.tribler_session.lm.popular_community.queue_content(TYPE_TORRENT_HEALTH, content) + else: + self._logger.info("Popular community not available to publish torrent checker result") diff --git a/Tribler/Test/Community/popular/__init__.py b/Tribler/Test/Community/popular/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/Tribler/Test/Community/popular/test_community.py b/Tribler/Test/Community/popular/test_community.py new file mode 100644 index 00000000000..7ed4578092f --- /dev/null +++ b/Tribler/Test/Community/popular/test_community.py @@ -0,0 +1,81 @@ +import random + +from Tribler.Test.Core.base_test import MockObject +from Tribler.community.popular.community import PopularCommunity, MSG_TORRENT_HEALTH_RESPONSE +from Tribler.community.popular.repository import TYPE_TORRENT_HEALTH +from Tribler.pyipv8.ipv8.test.base import TestBase +from Tribler.pyipv8.ipv8.test.mocking.ipv8 import MockIPv8 +from Tribler.pyipv8.ipv8.test.util import twisted_wrapper + + +class TestPopularCommunityBase(TestBase): + NUM_NODES = 2 + + def setUp(self): + super(TestPopularCommunityBase, self).setUp() + self.initialize(PopularCommunity, self.NUM_NODES) + + def create_node(self): + def load_random_torrents(limit): + return [ + ['\xfdC\xf9+V\x11A\xe7QG\xfb\xb1*6\xef\xa5\xaeu\xc2\xe0', + random.randint(200, 250), random.randint(1, 10), 1525704192.166107] for _ in range(limit) + ] + + torrent_db = MockObject() + torrent_db.getRecentlyCheckedTorrents = lambda limit: load_random_torrents(limit) + + return MockIPv8(u"curve25519", PopularCommunity, torrent_db=torrent_db) + + +class TestPopularCommunity(TestPopularCommunityBase): + __testing__ = False + NUM_NODES = 2 + + def setUp(self): + super(TestPopularCommunity, self).setUp() + + @twisted_wrapper + def test_subscribe_peers(self): + yield self.introduce_nodes() + self.nodes[0].overlay.subscribe_peers() + yield self.deliver_messages() + + # Node 0 should have a publisher added + self.assertEqual(len(self.nodes[0].overlay.publishers), 1, "Expected 1 publisher") + # Node 1 should have a subscriber added + self.assertEqual(len(self.nodes[1].overlay.subscribers), 1, "Expected 1 subscriber") + + @twisted_wrapper + def test_start(self): + yield self.introduce_nodes() + self.nodes[0].overlay.start() + yield self.deliver_messages() + + # Node 0 should have a publisher added + self.assertEqual(len(self.nodes[0].overlay.publishers), 1, "Expected 1 publisher") + # Node 1 should have a subscriber added + self.assertEqual(len(self.nodes[1].overlay.subscribers), 1, "Expected 1 subscriber") + + @twisted_wrapper + def test_content_publishing(self): + def on_torrent_health_response(peer, source_address, data): + peer.torrent_health_response_received = True + + self.nodes[0].torrent_health_response_received = False + self.nodes[0].overlay.decode_map[chr(MSG_TORRENT_HEALTH_RESPONSE)] = lambda source_address, data: \ + on_torrent_health_response(self.nodes[0], source_address, data) + + yield self.introduce_nodes() + self.nodes[0].overlay.subscribe_peers() + yield self.deliver_messages() + + # Add something to queue + health_info = ('a' * 20, random.randint(1, 100), random.randint(1, 10), random.randint(1, 111111)) + self.nodes[1].overlay.queue_content(TYPE_TORRENT_HEALTH, health_info) + + self.nodes[1].overlay.publish_next_content() + + yield self.deliver_messages() + + self.assertTrue(self.nodes[0].torrent_health_response_received, "Expected to receive torrent response") diff --git a/Tribler/Test/Community/popular/test_repository.py b/Tribler/Test/Community/popular/test_repository.py new file mode 100644 index 00000000000..599183f9450 --- /dev/null +++ b/Tribler/Test/Community/popular/test_repository.py @@ -0,0 +1,102 @@ +import time +import unittest + +from Tribler.Test.Core.base_test import MockObject +from Tribler.community.popular.payload import TorrentHealthPayload +from Tribler.community.popular.repository import ContentRepository, DEFAULT_FRESHNESS_LIMIT + + +class TestContentRepository(unittest.TestCase): + + def setUp(self): + torrent_db = MockObject() + self.content_repository = ContentRepository(torrent_db) + + def test_add_content(self): + """ + Test adding and removing content works as expected. + """ + # Initial content queue is zero + self.assertEqual(self.content_repository.num_content(), 0, "No item expected in queue initially") + + # Add a sample content and check the size + sample_content = ('a' * 20, 6, 3, 123456789) + sample_content_type = 1 + self.content_repository.add_content(sample_content_type, sample_content) + self.assertEqual(self.content_repository.num_content(), 1, "One item expected in queue") + + # Pop an item + (content_type, content) = self.content_repository.pop_content() + self.assertEqual(content_type, sample_content_type, "Content type should be equal") + self.assertEqual(content, sample_content, "Content should be equal") + + # Check size again + self.assertEqual(self.content_repository.num_content(), 0, "No item expected in queue") + + def test_get_top_torrents(self): + """ + Test if content repository returns expected top torrents. + """ + + def get_fake_torrents(limit): + return [[chr(x) * 20, x, 0, 1525704192] for x in range(limit)] + + self.content_repository.torrent_db.getRecentlyCheckedTorrents = lambda limit: get_fake_torrents(limit) + + limit = 10 + self.assertEqual(self.content_repository.get_top_torrents(limit=limit), get_fake_torrents(limit)) + + def test_update_torrent_with_higher_trust(self): + """ + Scenario: The database torrent has still fresh last_check_time and you receive a new response from + peer with trust > 1. + Expect: Torrent in database is updated. + """ + # last_check_time for existing torrent in database + db_last_time_check = time.time() - 10 + # Peer trust, higher than 1 in this scenario + peer_trust = 10 + + # Database record is expected to be updated + self.assertTrue(self.try_torrent_update_with_options(db_last_time_check, peer_trust)) + + def test_update_torrent_with_stale_check_time(self): + """ + Scenario: The database torrent has stale last_check_time and you receive a new response from + peer with no previous trust. + Expect: Torrent in database is still updated. + """ + # last_check_time for existing torrent in database + db_last_time_check = time.time() - DEFAULT_FRESHNESS_LIMIT + # Peer trust, higher than 1 in this scenario + peer_trust = 0 + + # Database record is expected to be updated + self.assertTrue(self.try_torrent_update_with_options(db_last_time_check, peer_trust)) + + def try_torrent_update_with_options(self, db_last_check_time, peer_trust): + """ + Tries updating torrent considering the given last check time of existing torrent and a new response + obtained from a peer with given peer_trust value. + """ + sample_infohash, seeders, leechers, timestamp = 'a' * 20, 10, 5, db_last_check_time + sample_payload = TorrentHealthPayload(sample_infohash, seeders, leechers, timestamp) + + def update_torrent(content_repo, infohash, *args, **kw): + content_repo.update_torrent_called = True + + def get_torrent(infohash, keys=None, include_mypref=False): + return {'infohash': infohash, 'num_seeders': seeders, + 'num_leechers': leechers, 'last_tracker_check': timestamp} + + self.content_repository.torrent_db.getTorrent = lambda infohash, **kw: get_torrent(infohash, **kw) + self.content_repository.torrent_db.hasTorrent = lambda infohash: infohash == sample_infohash + self.content_repository.torrent_db.updateTorrent = \ + lambda infohash, *args, **kw: update_torrent(self.content_repository, infohash, *args, **kw) + + self.content_repository.update_torrent_called = False + self.content_repository.update_torrent(sample_payload, peer_trust=peer_trust) + + return self.content_repository.update_torrent_called + + self.assertTrue(self.content_repository.update_torrent_called) diff --git a/Tribler/community/popular/__init__.py b/Tribler/community/popular/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/Tribler/community/popular/community.py b/Tribler/community/popular/community.py new file mode 100644 index 00000000000..30fa93f7ce7 --- /dev/null +++ b/Tribler/community/popular/community.py @@ -0,0 +1,220 @@ +from twisted.internet.defer import inlineCallbacks +from twisted.internet.task import LoopingCall + +from Tribler.community.popular.payload import TorrentHealthPayload, ContentSubscription +from Tribler.community.popular.repository import ContentRepository, TYPE_TORRENT_HEALTH +from Tribler.pyipv8.ipv8.deprecated.community import Community +from Tribler.pyipv8.ipv8.deprecated.payload_headers import BinMemberAuthenticationPayload, GlobalTimeDistributionPayload +from Tribler.pyipv8.ipv8.keyvault.crypto import ECCrypto +from Tribler.pyipv8.ipv8.peer import Peer + +MSG_POPULAR_CONTENT_SUBSCRIBE = 1 +MSG_POPULAR_CONTENT_SUBSCRIPTION = 2 +MSG_TORRENT_HEALTH_RESPONSE = 3 +MSG_CHANNEL_HEALTH_RESPONSE = 4 +MSG_TORRENT_HEALTH_UPDATE = 5 + +MAX_SUBSCRIBERS = 25 +MAX_PUBLISHERS = 25 +PUBLISH_INTERVAL = 5 + + +class PopularCommunity(Community): + master_peer = Peer(ECCrypto().generate_key(u"medium")) + + def __init__(self, *args, **kwargs): + self.torrent_db = kwargs.pop('torrent_db', None) + self.trustchain = kwargs.pop('trustchain_community', None) + super(PopularCommunity, self).__init__(*args, **kwargs) + + self.content_repository = ContentRepository(self.torrent_db) + + # Register messages + self.decode_map.update({ + chr(MSG_POPULAR_CONTENT_SUBSCRIBE): self.on_popular_content_subscribe, + chr(MSG_POPULAR_CONTENT_SUBSCRIPTION): self.on_popular_content_subscription, + chr(MSG_TORRENT_HEALTH_RESPONSE): self.on_torrent_health_response, + chr(MSG_CHANNEL_HEALTH_RESPONSE): self.on_channel_health_response + }) + + self.subscribers = set() + self.publishers = set() + + def start(self): + + # Subscribe peers + self.subscribe_peers() + + def start_publishing(): + # Update the publisher and subscriber list + peer_list = self.get_peers() + for peer in self.publishers: + if peer not in peer_list: + self.publishers.remove(peer) + for peer in self.subscribers: + if peer not in peer_list: + self.subscribers.remove(peer) + + # publish from the queue + self.publish_next_content() + + self.register_task("start_publishing", LoopingCall(start_publishing)).start(PUBLISH_INTERVAL, False) + + @inlineCallbacks + def unload(self): + self.content_repository.cleanup() + self.content_repository = None + + yield super(PopularCommunity, self).unload() + + def subscribe_peers(self): + """ + Subscribe the peers. Prefer peers with higher trust score. + """ + sorted_peers = sorted(self.get_peers(), + key=lambda _peer: self.trustchain.get_trust(_peer) if self.trustchain else 1, + reverse=True) + for peer in sorted_peers[: MAX_PUBLISHERS - len(self.publishers)]: + self.send_popular_content_subscribe(peer, subscribe=True) + + def unsubscribe_peers(self): + for peer in self.publishers: + self.send_popular_content_subscribe(peer, subscribe=False) + self.publishers.clear() + + def refresh_peer_list(self): + for publisher in self.publishers: + if publisher not in self.get_peers(): + self.publishers.remove(publisher) + for subscriber in self.subscribers: + if subscriber not in self.get_peers(): + self.subscribers.remove(subscriber) + + def on_popular_content_subscribe(self, source_address, data): + auth, _, payload = self._ez_unpack_auth(ContentSubscription, data) + peer = Peer(auth.public_key_bin, source_address) + + # Subscribe or unsubscribe peer + subscribed = peer in self.subscribers + if payload.subscribe and not subscribed: + if len(self.subscribers) < MAX_SUBSCRIBERS: + self.subscribers.add(peer) + subscribed = True + elif not payload.subscribed and subscribed: + self.subscribers.remove(peer) + subscribed = False + + self.send_popular_content_subscription(peer, subscribed=subscribed) + if subscribed: + self.publish_latest_torrents(peer=peer) + + def on_popular_content_subscription(self, source_address, data): + auth, _, payload = self._ez_unpack_auth(ContentSubscription, data) + peer = Peer(auth.public_key_bin, source_address) + + if payload.subscribe: + self.publishers.add(peer) + + def on_torrent_health_response(self, source_address, data): + self._logger.debug("Got torrent health response from %s", source_address) + auth, _, payload = self._ez_unpack_auth(TorrentHealthPayload, data) + peer = Peer(auth.public_key_bin, source_address) + + if peer not in self.publishers: + self._logger.error("Got torrent health response from non-subscribed peer. Dropping the response.") + return + + peer_trust = self.trustchain.get_trust(peer) if self.trustchain else 0 + self.content_repository.update_torrent(payload, peer_trust) + + def on_channel_health_response(self, source_address, data): + pass + + def send_popular_content_subscribe(self, peer, subscribe=True): + if peer not in self.get_peers(): + self._logger.error("Cannot send to unknown peer") + return + + # Add or remove the publisher peer + if subscribe: + self.publishers.add(peer) + else: + self.publishers.remove(peer) + + # Create subscription packet and send it + subscription = ContentSubscription(subscribe) + packet = self._create_message_packet(MSG_POPULAR_CONTENT_SUBSCRIBE, subscription) + self._broadcast_message(packet, peer=peer) + + def send_popular_content_subscription(self, peer, subscribed=True): + if peer not in self.get_peers(): + self._logger.error("Cannot send to unknown peer") + return + subscription = ContentSubscription(subscribed) + packet = self._create_message_packet(MSG_POPULAR_CONTENT_SUBSCRIPTION, subscription) + self._broadcast_message(packet, peer=peer) + + def send_torrent_health_response(self, payload, peer=None): + packet = self._create_message_packet(MSG_TORRENT_HEALTH_RESPONSE, payload) + self._broadcast_message(packet, peer=peer) + + def send_channel_health_response(self, payload, peer=None): + packet = self._create_message_packet(MSG_CHANNEL_HEALTH_RESPONSE, payload) + self._broadcast_message(packet, peer=peer) + + def _create_message_packet(self, message_type, payload): + auth = BinMemberAuthenticationPayload(self.my_peer.public_key.key_to_bin()).to_pack_list() + dist = GlobalTimeDistributionPayload(self.claim_global_time()).to_pack_list() + payload = payload if isinstance(payload, list) else payload.to_pack_list() + return self._ez_pack(self._prefix, message_type, [auth, dist, payload]) + + def _broadcast_message(self, packet, peer=None): + if peer is not None: + self.endpoint.send(peer.address, packet) + return + for _peer in self.subscribers: + self.endpoint.send(_peer.address, packet) + + def _add_or_ignore_subscriber(self, peer): + if len(self.subscribers) < MAX_SUBSCRIBERS: + self.subscribers.add(peer) + return True + return False + + # Content Repository operations + + def queue_content(self, content_type, content): + """ + Basically add the given content to the queue of content repository. + """ + self.content_repository.add_content(content_type, content) + + def publish_next_content(self): + """ + Publishes the next content of the queue to subscribers. + Does nothing if there are none subscribers. + """ + if not self.subscribers: + self._logger.info("No subscribers found. Not publishing anything") + return + + content_type, content = self.content_repository.pop_content() + if content_type is None: + self._logger.info("Nothing to publish") + return + self._logger.info("Publishing content[type:%d]", content_type) + if content_type == TYPE_TORRENT_HEALTH: + infohash, seeders, leechers, timestamp = content + payload = TorrentHealthPayload(infohash, seeders, leechers, timestamp) + self.send_torrent_health_response(payload) + + def publish_latest_torrents(self, peer): + """ + Publishes the latest torrents in local database to the given peer. + """ + torrents = self.content_repository.get_top_torrents() + self._logger.info("Publishing %d torrents to peer %s", len(torrents), peer) + for torrent in torrents: + infohash, seeders, leechers, timestamp = torrent[:4] + payload = TorrentHealthPayload(infohash, seeders, leechers, timestamp) + self.send_torrent_health_response(payload, peer=peer) diff --git a/Tribler/community/popular/payload.py b/Tribler/community/popular/payload.py new file mode 100644 index 00000000000..9a80039ff26 --- /dev/null +++ b/Tribler/community/popular/payload.py @@ -0,0 +1,108 @@ +from Tribler.pyipv8.ipv8.deprecated.payload import Payload + + +class ContentSubscription(Payload): + + format_list = ['I'] + + def __init__(self, subscribe): + super(ContentSubscription, self).__init__() + self.subscribe = subscribe + + def to_pack_list(self): + data = [('I', self.subscribe)] + + return data + + @classmethod + def from_unpack_list(cls, subscribe): + return ContentSubscription(subscribe) + + +class TorrentHealthPayload(Payload): + + format_list = ['20s', 'I', 'I', 'f'] + + def __init__(self, infohash, num_seeders, num_leechers, timestamp): + super(TorrentHealthPayload, self).__init__() + self._infohash = infohash + self._num_seeders = num_seeders + self._num_leechers = num_leechers + self._timestamp = timestamp + + def to_pack_list(self): + data = [('20s', str(self.infohash)), + ('I', self.num_seeders), + ('I', self.num_leechers), + ('f', self.timestamp)] + + return data + + @classmethod + def from_unpack_list(cls, infohash, num_seeders, num_leechers, timestamp): + return TorrentHealthPayload(infohash, num_seeders, num_leechers, timestamp) + + @property + def infohash(self): + return self._infohash + + @property + def num_seeders(self): + return self._num_seeders + + @property + def num_leechers(self): + return self._num_leechers + + @property + def timestamp(self): + return self._timestamp + + +class ChannelHealthPayload(Payload): + """ + Payload for a channel popularity message in the popular community. + """ + + format_list = ['varlenI', 'I', 'I', 'I', 'f'] + + def __init__(self, channel_id, num_votes, num_torrents, swarm_size_sum, timestamp): + super(ChannelHealthPayload, self).__init__() + self._channel_id = channel_id + self._num_votes = num_votes + self._num_torrents = num_torrents + self._swarm_size_sum = swarm_size_sum + self._timestamp = timestamp + + def to_pack_list(self): + data = [('varlenI', str(self._channel_id)), + ('I', self._num_votes), + ('I', self._num_torrents), + ('I', self._swarm_size_sum), + ('f', self._timestamp)] + + return data + + @classmethod + def from_unpack_list(cls, channel_id, num_votes, num_torrents, swarm_size_sum, timestamp): + return ChannelHealthPayload(channel_id, num_votes, num_torrents, swarm_size_sum, timestamp) + + @property + def channel_id(self): + return self._infohash + + @property + def num_votes(self): + return self._num_seeders + + @property + def num_torrents(self): + return self._num_leechers + + @property + def swarm_size_sum(self): + return self._timestamp + + @property + def timestamp(self): + return self._timestamp diff --git a/Tribler/community/popular/repository.py b/Tribler/community/popular/repository.py new file mode 100644 index 00000000000..997590ff284 --- /dev/null +++ b/Tribler/community/popular/repository.py @@ -0,0 +1,60 @@ +import time +from collections import deque + +from Tribler.community.popular.payload import TorrentHealthPayload + +MAX_CACHE = 200 + +DEFAULT_TORRENT_LIMIT = 25 +DEFAULT_FRESHNESS_LIMIT = 60 + +TYPE_TORRENT_HEALTH = 1 +TYPE_CHANNEL_HEALTH = 2 + + +class ContentRepository(object): + + def __init__(self, torrent_db): + super(ContentRepository, self).__init__() + self.torrent_db = torrent_db + self.queue = deque(maxlen=MAX_CACHE) + + def cleanup(self): + self.torrent_db = None + self.queue = None + + def add_content(self, content_type, content): + if self.queue is not None: + self.queue.append((content_type, content)) + + def num_content(self): + return len(self.queue) if self.queue else 0 + + def pop_content(self): + return self.queue.pop() if self.queue else (None, None) + + def get_top_torrents(self, limit=DEFAULT_TORRENT_LIMIT): + return self.torrent_db.getRecentlyCheckedTorrents(limit) + + def update_torrent(self, torrent_health_payload, peer_trust=0): + if not self.torrent_db: + self._logger.error("Torrent DB is None") + return + assert torrent_health_payload and isinstance(torrent_health_payload, TorrentHealthPayload) + + infohash = torrent_health_payload.infohash + + if self.torrent_db.hasTorrent(infohash): + db_torrent = self.get_torrent(infohash) + is_stale = time.time() - db_torrent['last_tracker_check'] > DEFAULT_FRESHNESS_LIMIT + if is_stale or peer_trust > 1: + self.torrent_db.updateTorrent(self, infohash, notify=False, + seeder=torrent_health_payload.num_seeders, + leecher=torrent_health_payload.num_leechers, + last_torrent_check=int(torrent_health_payload.timestamp)) + else: + self._logger.info("Torrent[%s] is unknown", infohash.decode('hex')) + + def get_torrent(self, infohash): + keys = ('num_seeders', 'num_leechers', 'last_tracker_check') + return self.torrent_db.getTorrent(infohash, keys=keys, include_mypref=False)