Skip to content

Commit

Permalink
- Basic content popular community
Browse files Browse the repository at this point in the history
- Peers selected based on trust score
- New torrent check results are shared to subscribers
  • Loading branch information
xoriole committed Jun 1, 2018
1 parent 79c646b commit 6511102
Show file tree
Hide file tree
Showing 12 changed files with 602 additions and 0 deletions.
16 changes: 16 additions & 0 deletions Tribler/Core/APIImplementation/LaunchManyCore.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ def __init__(self):
self.torrent_checker = None
self.tunnel_community = None
self.triblerchain_community = None
self.popular_community = None

self.startup_deferred = Deferred()

Expand Down Expand Up @@ -303,6 +304,21 @@ def load_ipv8_overlays(self):

self.ipv8.strategies.append((RandomWalk(self.market_community), 20))

# Popular Community
if self.session.config.get_popular_community_enabled() or True:
from Tribler.community.popular.community import PopularCommunity

local_peer = Peer(self.session.trustchain_keypair)

self.popular_community = PopularCommunity(local_peer, self.ipv8.endpoint, self.ipv8.network,
torrent_db=self.session.lm.torrent_db)

self.ipv8.overlays.append(self.popular_community)

self.ipv8.strategies.append((RandomWalk(self.popular_community), 20))

self.popular_community.start()

@blocking_call_on_reactor_thread
def load_dispersy_communities(self):
self._logger.info("tribler: Preparing Dispersy communities...")
Expand Down
10 changes: 10 additions & 0 deletions Tribler/Core/CacheDB/SqliteCacheDBHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,16 @@ def getRecentlyCollectedTorrents(self, limit):
results = self._db.fetchall(sql, (limit,))
return [[str2bin(result[0]), result[1], result[2], result[3] or 0, result[4]] for result in results]

def getRecentlyCheckedTorrents(self, limit):
sql = u"""
SELECT T.infohash, T.num_seeders, T.num_leechers, T.last_tracker_check
FROM Torrent T
WHERE T.is_collected = 0 AND T.num_seeders > 1
AND T.secret is not 1 ORDER BY T.last_tracker_check, T.num_seeders DESC LIMIT ?
"""
results = self._db.fetchall(sql, (limit,))
return [[str2bin(result[0]), result[1], result[2], result[3] or 0] for result in results]

def getRandomlyCollectedTorrents(self, insert_time, limit):
sql = u"""
SELECT CT.infohash, CT.num_seeders, CT.num_leechers, T.last_tracker_check
Expand Down
4 changes: 4 additions & 0 deletions Tribler/Core/Config/config.spec
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,7 @@ history_size = integer(min=1, default=20)
enabled = boolean(default=True)
sources = string_list(default=list())
max_disk_space = integer(min=0, default=53687091200)

[popular_community]
enabled = boolean(default=True)
cache_dir = string(default=health_cache)
16 changes: 16 additions & 0 deletions Tribler/Core/Config/tribler_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,22 @@ def set_dummy_wallets_enabled(self, value):
def get_dummy_wallets_enabled(self):
return self.config['wallets']['dummy_wallets_enabled']

# Popular Community

def get_popular_community_enabled(self):
return self.config['popular_community']['enabled'] or True

def set_popular_community_enabled(self, value):
self.config['popular_community']['enabled'] = value

def get_health_cache_dir(self):
cache_dir = self.config['popular_community']['cache_dir'] \
if 'cache_dir' in self.config['popular_community'] else 'health_cache'
return os.path.join(self.get_state_dir(), cache_dir)

def set_health_cache_dir(self, cache_dir):
self.config['popular_community']['cache_dir'] = cache_dir

# Torrent store

def get_torrent_store_enabled(self):
Expand Down
14 changes: 14 additions & 0 deletions Tribler/Core/TorrentChecker/torrent_checker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from Tribler.Core.TorrentChecker.session import create_tracker_session, FakeDHTSession, UdpSocketManager
from Tribler.Core.Utilities.tracker_utils import MalformedTrackerURLException
from Tribler.Core.simpledefs import NTFY_TORRENTS
from Tribler.community.popular.repository import TYPE_TORRENT_HEALTH
from Tribler.dispersy.util import blocking_call_on_reactor_thread, call_on_reactor_thread
from Tribler.pyipv8.ipv8.taskmanager import TaskManager

Expand Down Expand Up @@ -198,6 +199,9 @@ def on_gui_request_completed(self, infohash, result):

self._update_torrent_result(torrent_update_dict)

# Add this result to popular community to publish to subscribers
self._publish_torrent_result(torrent_update_dict)

return final_response

@call_on_reactor_thread
Expand Down Expand Up @@ -326,3 +330,13 @@ def _update_torrent_result(self, response):
self._torrent_db.updateTorrentCheckResult(torrent_id,
infohash, seeders, leechers, last_check, next_check,
status, retries)

def _publish_torrent_result(self, response):
if response['seeders'] == 0:
self._logger.info("Not publishing zero seeded torrents")
return
content = (response['infohash'], response['seeders'], response['leechers'], response['last_check'])
if self.tribler_session.lm.popular_community:
self.tribler_session.lm.popular_community.queue_content(TYPE_TORRENT_HEALTH, content)
else:
self._logger.info("Popular community not available to publish torrent checker result")
Empty file.
81 changes: 81 additions & 0 deletions Tribler/Test/Community/popular/test_community.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import random

from Tribler.Test.Core.base_test import MockObject
from Tribler.community.popular.community import PopularCommunity, MSG_TORRENT_HEALTH_RESPONSE
from Tribler.community.popular.repository import TYPE_TORRENT_HEALTH
from Tribler.pyipv8.ipv8.test.base import TestBase
from Tribler.pyipv8.ipv8.test.mocking.ipv8 import MockIPv8
from Tribler.pyipv8.ipv8.test.util import twisted_wrapper


class TestPopularCommunityBase(TestBase):
NUM_NODES = 2

def setUp(self):
super(TestPopularCommunityBase, self).setUp()
self.initialize(PopularCommunity, self.NUM_NODES)

def create_node(self):
def load_random_torrents(limit):
return [
['\xfdC\xf9+V\x11A\xe7QG\xfb\xb1*6\xef\xa5\xaeu\xc2\xe0',
random.randint(200, 250), random.randint(1, 10), 1525704192.166107] for _ in range(limit)
]

torrent_db = MockObject()
torrent_db.getRecentlyCheckedTorrents = lambda limit: load_random_torrents(limit)

return MockIPv8(u"curve25519", PopularCommunity, torrent_db=torrent_db)


class TestPopularCommunity(TestPopularCommunityBase):
__testing__ = False
NUM_NODES = 2

def setUp(self):
super(TestPopularCommunity, self).setUp()

@twisted_wrapper
def test_subscribe_peers(self):
yield self.introduce_nodes()
self.nodes[0].overlay.subscribe_peers()
yield self.deliver_messages()

# Node 0 should have a publisher added
self.assertEqual(len(self.nodes[0].overlay.publishers), 1, "Expected 1 publisher")
# Node 1 should have a subscriber added
self.assertEqual(len(self.nodes[1].overlay.subscribers), 1, "Expected 1 subscriber")

@twisted_wrapper
def test_start(self):
yield self.introduce_nodes()
self.nodes[0].overlay.start()
yield self.deliver_messages()

# Node 0 should have a publisher added
self.assertEqual(len(self.nodes[0].overlay.publishers), 1, "Expected 1 publisher")
# Node 1 should have a subscriber added
self.assertEqual(len(self.nodes[1].overlay.subscribers), 1, "Expected 1 subscriber")

@twisted_wrapper
def test_content_publishing(self):
def on_torrent_health_response(peer, source_address, data):
peer.torrent_health_response_received = True

self.nodes[0].torrent_health_response_received = False
self.nodes[0].overlay.decode_map[chr(MSG_TORRENT_HEALTH_RESPONSE)] = lambda source_address, data: \
on_torrent_health_response(self.nodes[0], source_address, data)

yield self.introduce_nodes()
self.nodes[0].overlay.subscribe_peers()
yield self.deliver_messages()

# Add something to queue
health_info = ('a' * 20, random.randint(1, 100), random.randint(1, 10), random.randint(1, 111111))
self.nodes[1].overlay.queue_content(TYPE_TORRENT_HEALTH, health_info)

self.nodes[1].overlay.publish_next_content()

yield self.deliver_messages()

self.assertTrue(self.nodes[0].torrent_health_response_received, "Expected to receive torrent response")
102 changes: 102 additions & 0 deletions Tribler/Test/Community/popular/test_repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import time
import unittest

from Tribler.Test.Core.base_test import MockObject
from Tribler.community.popular.payload import TorrentHealthPayload
from Tribler.community.popular.repository import ContentRepository, DEFAULT_FRESHNESS_LIMIT


class TestContentRepository(unittest.TestCase):

def setUp(self):
torrent_db = MockObject()
self.content_repository = ContentRepository(torrent_db)

def test_add_content(self):
"""
Test adding and removing content works as expected.
"""
# Initial content queue is zero
self.assertEqual(self.content_repository.num_content(), 0, "No item expected in queue initially")

# Add a sample content and check the size
sample_content = ('a' * 20, 6, 3, 123456789)
sample_content_type = 1
self.content_repository.add_content(sample_content_type, sample_content)
self.assertEqual(self.content_repository.num_content(), 1, "One item expected in queue")

# Pop an item
(content_type, content) = self.content_repository.pop_content()
self.assertEqual(content_type, sample_content_type, "Content type should be equal")
self.assertEqual(content, sample_content, "Content should be equal")

# Check size again
self.assertEqual(self.content_repository.num_content(), 0, "No item expected in queue")

def test_get_top_torrents(self):
"""
Test if content repository returns expected top torrents.
"""

def get_fake_torrents(limit):
return [[chr(x) * 20, x, 0, 1525704192] for x in range(limit)]

self.content_repository.torrent_db.getRecentlyCheckedTorrents = lambda limit: get_fake_torrents(limit)

limit = 10
self.assertEqual(self.content_repository.get_top_torrents(limit=limit), get_fake_torrents(limit))

def test_update_torrent_with_higher_trust(self):
"""
Scenario: The database torrent has still fresh last_check_time and you receive a new response from
peer with trust > 1.
Expect: Torrent in database is updated.
"""
# last_check_time for existing torrent in database
db_last_time_check = time.time() - 10
# Peer trust, higher than 1 in this scenario
peer_trust = 10

# Database record is expected to be updated
self.assertTrue(self.try_torrent_update_with_options(db_last_time_check, peer_trust))

def test_update_torrent_with_stale_check_time(self):
"""
Scenario: The database torrent has stale last_check_time and you receive a new response from
peer with no previous trust.
Expect: Torrent in database is still updated.
"""
# last_check_time for existing torrent in database
db_last_time_check = time.time() - DEFAULT_FRESHNESS_LIMIT
# Peer trust, higher than 1 in this scenario
peer_trust = 0

# Database record is expected to be updated
self.assertTrue(self.try_torrent_update_with_options(db_last_time_check, peer_trust))

def try_torrent_update_with_options(self, db_last_check_time, peer_trust):
"""
Tries updating torrent considering the given last check time of existing torrent and a new response
obtained from a peer with given peer_trust value.
"""
sample_infohash, seeders, leechers, timestamp = 'a' * 20, 10, 5, db_last_check_time
sample_payload = TorrentHealthPayload(sample_infohash, seeders, leechers, timestamp)

def update_torrent(content_repo, infohash, *args, **kw):
content_repo.update_torrent_called = True

def get_torrent(infohash, keys=None, include_mypref=False):
return {'infohash': infohash, 'num_seeders': seeders,
'num_leechers': leechers, 'last_tracker_check': timestamp}

self.content_repository.torrent_db.getTorrent = lambda infohash, **kw: get_torrent(infohash, **kw)
self.content_repository.torrent_db.hasTorrent = lambda infohash: infohash == sample_infohash
self.content_repository.torrent_db.updateTorrent = \
lambda infohash, *args, **kw: update_torrent(self.content_repository, infohash, *args, **kw)

self.content_repository.update_torrent_called = False
self.content_repository.update_torrent(sample_payload, peer_trust=peer_trust)

return self.content_repository.update_torrent_called

self.assertTrue(self.content_repository.update_torrent_called)
Empty file.
Loading

0 comments on commit 6511102

Please sign in to comment.