Skip to content

Commit

Permalink
Refactored PopularityCommunity
Browse files Browse the repository at this point in the history
  • Loading branch information
devos50 committed Mar 26, 2019
1 parent dfd104f commit e271c84
Show file tree
Hide file tree
Showing 13 changed files with 176 additions and 889 deletions.
3 changes: 2 additions & 1 deletion Tribler/Core/APIImplementation/LaunchManyCore.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ def load_ipv8_overlays(self):
from Tribler.community.popularity.community import PopularityCommunity

self.popularity_community = PopularityCommunity(peer, self.ipv8.endpoint, self.ipv8.network,
metadata_store=self.session.lm.mds, session=self.session)
metadata_store=self.session.lm.mds,
torrent_checker=self.torrent_checker)

self.ipv8.overlays.append(self.popularity_community)

Expand Down
41 changes: 28 additions & 13 deletions Tribler/Core/TorrentChecker/torrent_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import random
import socket
import sys
import time
from binascii import hexlify
from random import choice
Expand Down Expand Up @@ -52,6 +53,10 @@ def __init__(self, session):
self.socket_mgr = self.udp_port = None
self.connection_pool = None

# We keep track of the results of popular torrents checked by you.
# The popularity community gossips this information around.
self.popular_torrents_checked = set()

def initialize(self):
self.tracker_check_lc.start(TRACKER_SELECTION_INTERVAL, now=False)
self.torrent_check_lc.start(TORRENT_SELECTION_INTERVAL, now=False)
Expand Down Expand Up @@ -197,6 +202,28 @@ def get_valid_trackers_of_torrent(self, torrent_id):
return set([str(tracker.url) for tracker in db_tracker_list
if is_valid_url(str(tracker.url)) and not self.is_blacklisted_tracker(str(tracker.url))])

def update_torrents_checked(self, new_result):
"""
Update the set with torrents that we have checked ourselves.
"""
new_result_tuple = (new_result['infohash'], new_result['seeders'],
new_result['leechers'], new_result['last_check'])

if len(self.popular_torrents_checked) < 5:
self.popular_torrents_checked.add(new_result_tuple)
return

min_seeders = sys.maxsize
most_unpopular_torrent = None
for torrent_health_tuple in self.popular_torrents_checked:
if torrent_health_tuple[1] < min_seeders:
min_seeders = torrent_health_tuple[1]
most_unpopular_torrent = torrent_health_tuple

if new_result['seeders'] > min_seeders:
self.popular_torrents_checked.remove(most_unpopular_torrent)
self.popular_torrents_checked.add(new_result_tuple)

def on_torrent_health_check_completed(self, infohash, result):
final_response = {}
if not result or not isinstance(result, list):
Expand All @@ -220,9 +247,7 @@ def on_torrent_health_check_completed(self, infohash, result):
torrent_update_dict['leechers'] = l

self._update_torrent_result(torrent_update_dict)

# Add this result to popularity community to publish to subscribers
self.publish_torrent_result(torrent_update_dict)
self.update_torrents_checked(torrent_update_dict)

# TODO: DRY! Stop doing lots of formats, just make REST endpoint automatically encode binary data to hex!
self.tribler_session.notifier.notify(NTFY_TORRENT, NTFY_UPDATE, infohash,
Expand Down Expand Up @@ -345,13 +370,3 @@ def _update_torrent_result(self, response):
torrent.seeders = seeders
torrent.leechers = leechers
torrent.last_check = last_check

def publish_torrent_result(self, response):
if response['seeders'] == 0:
self._logger.info("Not publishing zero seeded torrents")
return
content = (response['infohash'], response['seeders'], response['leechers'], response['last_check'])
if self.tribler_session.lm.popularity_community:
self.tribler_session.lm.popularity_community.queue_content(content)
else:
self._logger.info("Popular community not available to publish torrent checker result")
79 changes: 37 additions & 42 deletions Tribler/Test/Community/popularity/test_community.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import absolute_import

import os
import random
import time

from pony.orm import db_session

Expand All @@ -10,7 +10,8 @@
from twisted.internet.defer import inlineCallbacks

from Tribler.Core.Modules.MetadataStore.store import MetadataStore
from Tribler.community.popularity.community import MSG_TORRENT_HEALTH_RESPONSE, PopularityCommunity
from Tribler.Test.Core.base_test import MockObject
from Tribler.community.popularity.community import PopularityCommunity
from Tribler.pyipv8.ipv8.keyvault.crypto import default_eccrypto
from Tribler.pyipv8.ipv8.test.base import TestBase
from Tribler.pyipv8.ipv8.test.mocking.ipv8 import MockIPv8
Expand All @@ -21,67 +22,61 @@ class TestPopularityCommunity(TestBase):

def setUp(self):
super(TestPopularityCommunity, self).setUp()
self.shared_key = default_eccrypto.generate_key(u"curve25519")
self.count = 0
self.initialize(PopularityCommunity, self.NUM_NODES)

def create_node(self, *args, **kwargs):
mds = MetadataStore(os.path.join(self.temporary_directory(), 'test.db'), self.temporary_directory(),
self.shared_key)
mds = MetadataStore(os.path.join(self.temporary_directory(), "%d.db" % self.count), self.temporary_directory(),
default_eccrypto.generate_key(u"curve25519"))

# Add some content to the metadata database
with db_session:
mds.ChannelMetadata.create_channel('test', 'test')
for torrent_ind in xrange(5):
torrent = mds.TorrentMetadata(title='torrent%d' % torrent_ind, infohash=('%d' % torrent_ind) * 20)
torrent.health.seeders = torrent_ind + 1
torrent_checker = MockObject()
torrent_checker.popular_torrents_checked = set()

return MockIPv8(u"curve25519", PopularityCommunity, metadata_store=mds, torrent_checker=torrent_checker)

return MockIPv8(u"curve25519", PopularityCommunity, metadata_store=mds)
@db_session
def fill_database(self, metadata_store, last_check_now=False):
for torrent_ind in xrange(5):
last_check = int(time.time()) if last_check_now else 0
metadata_store.TorrentState(
infohash=('%d' % torrent_ind) * 20, seeders=torrent_ind + 1, last_check=last_check)

@inlineCallbacks
def test_content_publishing(self):
def test_torrents_health_gossip(self):
"""
Tests publishing next available content.
:return:
Test whether torrent health information is correctly gossiped around
"""
self.fill_database(self.nodes[0].overlay.metadata_store)

def on_torrent_health_response(peer, source_address, data):
peer.torrent_health_response_received = True

self.nodes[0].torrent_health_response_received = False
self.nodes[0].overlay.decode_map[chr(MSG_TORRENT_HEALTH_RESPONSE)] = lambda source_address, data: \
on_torrent_health_response(self.nodes[0], source_address, data)

checked_torrent_info = ('a' * 20, 200, 0, int(time.time()))
self.nodes[0].overlay.torrent_checker.popular_torrents_checked.add(checked_torrent_info)
yield self.introduce_nodes()
self.nodes[0].overlay.subscribe_peers()
yield self.deliver_messages()

# Add something to queue
health_info = ('a' * 20, random.randint(1, 100), random.randint(1, 10), random.randint(1, 111111))
self.nodes[1].overlay.queue_content(health_info)

self.nodes[1].overlay.publish_next_content()
self.nodes[0].overlay.gossip_torrents_health()

yield self.deliver_messages()

self.assertTrue(self.nodes[0].torrent_health_response_received, "Expected to receive torrent response")
# Check whether node 1 has new torrent health information
with db_session:
self.assertEqual(len(self.nodes[1].overlay.metadata_store.TorrentState.select()), 6)

@inlineCallbacks
def test_publish_latest_torrents(self):
def test_torrents_health_override(self):
"""
Test publishing all latest torrents
Test whether torrent health information is overridden when it's more fresh
"""
self.fill_database(self.nodes[0].overlay.metadata_store, last_check_now=True)
self.fill_database(self.nodes[1].overlay.metadata_store)

yield self.introduce_nodes()
self.nodes[1].overlay.subscribe_peers()
yield self.deliver_messages()

# Update the health of some torrents
with db_session:
torrents = self.nodes[0].overlay.content_repository.get_top_torrents()
torrents[0].health.seeders = 500
self.nodes[0].overlay.gossip_torrents_health()

self.nodes[0].overlay.publish_latest_torrents(self.nodes[1].overlay.my_peer)
yield self.deliver_messages()
yield self.deliver_messages(timeout=0.5)

# Check whether node 1 has new torrent health information
with db_session:
torrents = self.nodes[1].overlay.content_repository.get_top_torrents()
self.assertEqual(torrents[0].health.seeders, 500)
states = self.nodes[1].overlay.metadata_store.TorrentState.select()[:]
self.assertEqual(len(states), 5)
for state in states:
self.assertIsNot(state.last_check, 0)
53 changes: 0 additions & 53 deletions Tribler/Test/Community/popularity/test_payload.py

This file was deleted.

140 changes: 0 additions & 140 deletions Tribler/Test/Community/popularity/test_pubsub_community.py

This file was deleted.

Loading

0 comments on commit e271c84

Please sign in to comment.