From 7a1970e3b35021b17b0e76a2fe2f29497e993955 Mon Sep 17 00:00:00 2001 From: Ardhi Putra Pratama H Date: Sat, 2 Jul 2016 06:21:47 +0200 Subject: [PATCH 1/4] Remove GUIDBTuples from Credit mining Closes #2356 Squashed : - Refactor to use deferred --- Tribler/Policies/BoostingSource.py | 85 ++++++++----- Tribler/Policies/credit_mining_util.py | 120 ------------------ .../CreditMining/test_creditmining_sys.py | 63 +++------ 3 files changed, 68 insertions(+), 200 deletions(-) diff --git a/Tribler/Policies/BoostingSource.py b/Tribler/Policies/BoostingSource.py index 7fe6dc39ec4..a0d95db9bb1 100644 --- a/Tribler/Policies/BoostingSource.py +++ b/Tribler/Policies/BoostingSource.py @@ -8,11 +8,12 @@ import os import re import urllib -from binascii import hexlify +from binascii import hexlify, unhexlify from hashlib import sha1 -import feedparser +import feedparser import libtorrent as lt + from twisted.internet import defer from twisted.internet import reactor from twisted.internet.defer import CancelledError @@ -24,8 +25,7 @@ from Tribler.Core.TorrentDef import TorrentDef from Tribler.Core.simpledefs import NTFY_INSERT, NTFY_TORRENTS, NTFY_UPDATE from Tribler.Core.version import version_id -from Tribler.Main.Utility.GuiDBTuples import Torrent, Channel -from Tribler.Policies.credit_mining_util import TorrentManagerCM, ent2chr +from Tribler.Policies.credit_mining_util import ent2chr from Tribler.community.allchannel.community import AllChannelCommunity from Tribler.community.channel.community import ChannelCommunity from Tribler.dispersy.exception import CommunityNotFoundException @@ -59,8 +59,6 @@ def __init__(self, session, source, boost_settings, torrent_insert_cb): self.min_connection = boost_settings.min_connection_start self.min_channels = boost_settings.min_channels_start - self.torrent_mgr = TorrentManagerCM(session) - self._logger = logging.getLogger(BoostingSource.__name__) self.boosting_manager = self.session.lm.boosting_manager @@ -150,16 +148,18 @@ def __init__(self, session, dispersy_cid, boost_settings, torrent_insert_cb): self.channel_id = None - self.channel = None + self.channel_dict = None self.community = None self.database_updated = True self.check_torrent_interval = 10 self.dispersy_cid = dispersy_cid + self.torrent_db = self.session.open_dbhandler(NTFY_TORRENTS) self.session.add_observer(self._on_database_updated, NTFY_TORRENTS, [NTFY_INSERT, NTFY_UPDATE]) self.unavail_torrent = {} + self.loaded_torrent = {} def kill_tasks(self): BoostingSource.kill_tasks(self) @@ -200,9 +200,7 @@ def get_channel_id(): if self.community and self.community._channel_id: self.channel_id = self.community._channel_id - channel_dict = self.channelcast_db.getChannel(self.channel_id) - self.channel = Channel(*channel_dict) - + self.channel_dict = self.channelcast_db.getChannel(self.channel_id) task_call = self.check_and_register_task(str(self.source) + "_update", LoopingCall(self._update)).start(self.interval, now=True) if task_call: @@ -225,22 +223,23 @@ def showtorrent(torrent): """ assembly torrent data, call the callback """ - if torrent.files: + infohash = torrent.infohash + if torrent.get_files() and infohash in self.unavail_torrent: if len(self.torrents) >= self.max_torrents: self._logger.debug("Max torrents in source reached. Not adding %s", torrent.infohash) del self.unavail_torrent[torrent.infohash] return - infohash = torrent.infohash self._logger.debug("[ChannelSource] Got torrent %s", hexlify(infohash)) self.torrents[infohash] = {} - self.torrents[infohash]['name'] = torrent.name - self.torrents[infohash]['metainfo'] = torrent.tdef - self.torrents[infohash]['creation_date'] = torrent.creation_date - self.torrents[infohash]['length'] = torrent.tdef.get_length() - self.torrents[infohash]['num_files'] = len(torrent.files) - self.torrents[infohash]['num_seeders'] = torrent.swarminfo[0] or 0 - self.torrents[infohash]['num_leechers'] = torrent.swarminfo[1] or 0 + self.torrents[infohash]['name'] = torrent.get_name() + self.torrents[infohash]['metainfo'] = torrent + self.torrents[infohash]['creation_date'] = torrent.get_creation_date() + self.torrents[infohash]['length'] = torrent.get_length() + self.torrents[infohash]['num_files'] = len(torrent.get_files()) + #TODO(ardhi) get seeder/leecher from db + self.torrents[infohash]['num_seeders'] = 0 + self.torrents[infohash]['num_leechers'] = 0 self.torrents[infohash]['enabled'] = self.enabled # seeding stats from DownloadState @@ -252,11 +251,10 @@ def showtorrent(torrent): self.torrent_insert_callback(self.source, infohash, self.torrents[infohash]) self.database_updated = False - self._logger.debug("Unavailable #torrents : %d from %s", len(self.unavail_torrent), hexlify(self.source)) - if len(self.unavail_torrent) and self.enabled: + self._logger.debug("Unavailable #torrents : %d from %s", len(self.unavail_torrent), hexlify(self.source)) for torrent in self.unavail_torrent.values(): - self.torrent_mgr.load_torrent(torrent, showtorrent) + self._load_torrent(torrent[2]).addCallback(showtorrent) def _update(self): if len(self.torrents) < self.max_torrents and self.database_updated: @@ -268,12 +266,8 @@ def _update(self): torrent_values = self.channelcast_db.getTorrentsFromChannelId(self.channel_id, True, CHANTOR_DB, self.max_torrents) - listtor = self.torrent_mgr.create_torrents(torrent_values, True, - {self.channel_id: self.channelcast_db.getChannel( - self.channel_id)}) - - # dict {key_infohash(binary):Torrent(object-GUIDBTuple)} - self.unavail_torrent.update({t.infohash: t for t in listtor if t.infohash not in self.torrents}) + # dict {key_infohash(binary):Torrent(tuples)} + self.unavail_torrent.update({t[2]: t for t in torrent_values if t[2] not in self.torrents}) # it's highly probable the checktor function is running at this time (if it's already running) # if not running, start the checker @@ -287,7 +281,37 @@ def _on_database_updated(self, dummy_subject, dummy_change_type, dummy_infohash) self.database_updated = True def get_source_text(self): - return self.channel.name if self.channel else None + return str(self.channel_dict[2]) if self.channel_dict else None + + def _load_torrent(self, infohash): + """ + function to download a torrent by infohash and call a callback afterwards + with TorrentDef object as parameter. + """ + + # session is quitting + if self.session is None or self.session.lm.torrent_store is None or self.session.get_torrent_store() is None: + return + + def add_to_loaded(infohash_str): + """ + function to add loaded infohash to memory + """ + self.loaded_torrent[unhexlify(infohash_str)].callback( + TorrentDef.load_from_memory(self.session.get_collected_torrent(unhexlify(infohash_str)))) + self.loaded_torrent[unhexlify(infohash_str)] = None + + if infohash not in self.loaded_torrent: + self.loaded_torrent[infohash] = defer.Deferred() + + if not self.session.has_collected_torrent(infohash): + if self.session.has_download(infohash): + return + self.session.download_torrentfile(infohash, add_to_loaded, 0) + + deferred_load = self.loaded_torrent[infohash] + + return deferred_load class RSSFeedSource(BoostingSource): @@ -386,9 +410,6 @@ def __cb_body(body_bin, item_torrent_entry): self.torrent_db.addOrGetTorrentID(real_infohash) self.torrent_db.addExternalTorrent(tdef) - # create Torrent object and store it - self.torrent_mgr.load_torrent(Torrent.fromTorrentDef(tdef)) - # Notify the BoostingManager and provide the real infohash. if self.torrent_insert_callback: self.torrent_insert_callback(self.source, real_infohash, self.torrents[real_infohash]) diff --git a/Tribler/Policies/credit_mining_util.py b/Tribler/Policies/credit_mining_util.py index fa6f3433670..52de6ba6cb9 100644 --- a/Tribler/Policies/credit_mining_util.py +++ b/Tribler/Policies/credit_mining_util.py @@ -2,18 +2,10 @@ File containing function used in credit mining module. """ - import os from binascii import hexlify, unhexlify -from Tribler.Core.TorrentDef import TorrentDef -from Tribler.Core.simpledefs import NTFY_CHANNELCAST -from Tribler.Core.simpledefs import NTFY_TORRENTS -from Tribler.Core.simpledefs import NTFY_VOTECAST -from Tribler.Main.Utility.GuiDBTuples import CollectedTorrent, RemoteTorrent, NotCollectedTorrent, Channel, \ - ChannelTorrent from Tribler.Policies.defs import SIMILARITY_TRESHOLD -from Tribler.dispersy.taskmanager import TaskManager def validate_source_string(source): @@ -85,115 +77,3 @@ def ent2chr(input_str): code = input_str.group(1) code_int = int(code) if code.isdigit() else int(code[1:], 16) return chr(code_int) if code_int < 256 else '?' - -# TODO(ardhi) : temporary function until GUI and core code are separated -class TorrentManagerCM(TaskManager): - """ - *Temporary* class to handle load torrent. - - Adapted from TorrentManager in SearchGridManager - """ - def __init__(self, session): - super(TorrentManagerCM, self).__init__() - - self.session = session - self.torrent_db = self.session.open_dbhandler(NTFY_TORRENTS) - self.channelcast_db = self.session.open_dbhandler(NTFY_CHANNELCAST) - self.votecastdb = self.session.open_dbhandler(NTFY_VOTECAST) - - self.dslist = [] - - def load_torrent(self, torrent, callback=None): - """ - function to load torrent dictionary to torrent object. - - From TorrentManager.loadTorrent in SearchGridManager - """ - - # session is quitting - if not (self.session and self.session.get_torrent_store() and self.session.lm.torrent_store): - return - - if not isinstance(torrent, CollectedTorrent): - if torrent.torrent_id <= 0: - torrent_id = self.torrent_db.getTorrentID(torrent.infohash) - if torrent_id: - torrent.update_torrent_id(torrent_id) - - if not self.session.has_collected_torrent(torrent.infohash): - files = [] - trackers = [] - - # see if we have most info in our tables - if isinstance(torrent, RemoteTorrent): - torrent_id = self.torrent_db.getTorrentID(torrent.infohash) - else: - torrent_id = torrent.torrent_id - - trackers.extend(self.torrent_db.getTrackerListByTorrentID(torrent_id)) - - if 'DHT' in trackers: - trackers.remove('DHT') - if 'no-DHT' in trackers: - trackers.remove('no-DHT') - - # replacement # self.downloadTorrentfileFromPeers(torrent, None) - if self.session.has_download(torrent.infohash): - return False - - if torrent.query_candidates is None or len(torrent.query_candidates) == 0: - self.session.download_torrentfile(torrent.infohash, None, 0) - else: - for candidate in torrent.query_candidates: - self.session.download_torrentfile_from_peer(candidate, torrent.infohash, None, 0) - - torrent = NotCollectedTorrent(torrent, files, trackers) - - else: - tdef = TorrentDef.load_from_memory(self.session.get_collected_torrent(torrent.infohash)) - - if torrent.torrent_id <= 0: - del torrent.torrent_id - - torrent = CollectedTorrent(torrent, tdef) - - # replacement # self.library_manager.addDownloadState(torrent) - for dl_state in self.dslist: - torrent.addDs(dl_state) - - # return - if callback is not None: - callback(torrent) - else: - return torrent - - def create_torrents(self, tor_values, _, channel_dict): - """ - function to create torrents from channel. Adapted from - ChannelManager in SearchGridManager - """ - - #adding new channel from the one that can't be detected from torrent values - fetch_channels = set(hit[0] for hit in tor_values if hit[0] not in channel_dict) - if len(fetch_channels) > 0: - channels_new_dict = self.channelcast_db.getChannels(fetch_channels) - channels = [] - for hit in channels_new_dict: - channel = Channel(*hit) - channels.append(channel) - - for channel in channels: - channel_dict[channel.id] = channel - - # creating torrents - torrents = [] - for hit in tor_values: - if hit: - chan_torrent = ChannelTorrent(*hit[1:] + [channel_dict.get(hit[0], None), None]) - chan_torrent.torrent_db = self.torrent_db - chan_torrent.channelcast_db = self.channelcast_db - - if chan_torrent.name: - torrents.append(chan_torrent) - - return torrents diff --git a/Tribler/Test/Core/CreditMining/test_creditmining_sys.py b/Tribler/Test/Core/CreditMining/test_creditmining_sys.py index 2affdb9c124..edf878cc3ff 100644 --- a/Tribler/Test/Core/CreditMining/test_creditmining_sys.py +++ b/Tribler/Test/Core/CreditMining/test_creditmining_sys.py @@ -16,7 +16,6 @@ from Tribler.Core.TorrentDef import TorrentDef from Tribler.Core.Utilities.twisted_thread import deferred, reactor from Tribler.Core.simpledefs import NTFY_TORRENTS, NTFY_UPDATE, NTFY_CHANNELCAST -from Tribler.Main.Utility.GuiDBTuples import CollectedTorrent from Tribler.Policies.BoostingManager import BoostingManager, BoostingSettings from Tribler.Test.Core.CreditMining.mock_creditmining import MockLtTorrent, ResourceFailClass from Tribler.Test.test_as_server import TestAsServer, TESTS_DATA_DIR @@ -323,6 +322,12 @@ def setUpPreSession(self): # we use dummy dispersy here self.config.set_dispersy(False) + def _load(self, _): + """ + Dummy method to download the torrent + """ + return defer.succeed(self.tdef) + @blocking_call_on_reactor_thread def create_torrents_in_channel(self, dispersy_cid_hex): """ @@ -358,20 +363,6 @@ def test_chn_lookup(self): self.boosting_manager.add_source(dispersy_cid) chn_obj = self.boosting_manager.get_source_object(dispersy_cid) - def _load(torrent, callback=None): - if not isinstance(torrent, CollectedTorrent): - torrent_id = 0 - if torrent.torrent_id <= 0: - torrent_id = self.session.lm.torrent_db.getTorrentID(torrent.infohash) - if torrent_id: - torrent.update_torrent_id(torrent_id) - - torrent = CollectedTorrent(torrent, self.tdef) - if callback is not None: - callback(torrent) - else: - return torrent - def check_torrents_channel(src, defer_param=None, target=1): """ check if a torrent already in channel and ready to download @@ -400,7 +391,7 @@ def check_torrents_channel(src, defer_param=None, target=1): return defer_param - chn_obj.torrent_mgr.load_torrent = _load + chn_obj._load_torrent = self._load d = self.check_source(dispersy_cid) d.addCallback(check_torrents_channel, target=1) @@ -444,21 +435,7 @@ def _set_id_channel(channel_id): self.boosting_manager.add_source(dispersy_cid) chn_obj = self.boosting_manager.get_source_object(dispersy_cid) - def _load(torrent, callback=None): - if not isinstance(torrent, CollectedTorrent): - torrent_id = 0 - if torrent.torrent_id <= 0: - torrent_id = self.session.lm.torrent_db.getTorrentID(torrent.infohash) - if torrent_id: - torrent.update_torrent_id(torrent_id) - - torrent = CollectedTorrent(torrent, self.tdef) - if callback is not None: - callback(torrent) - else: - return torrent - - chn_obj.torrent_mgr.load_torrent = _load + chn_obj._load_torrent = self._load def clean_community(_): """ @@ -498,29 +475,19 @@ def test_chn_max_torrents(self): self.boosting_manager.add_source(dispersy_cid) chn_obj = self.boosting_manager.get_source_object(dispersy_cid) chn_obj.max_torrents = 2 - chn_obj.torrent_mgr.load_torrent = lambda dummy_1, dummy_2: None - - def _load(torrent, callback=None): - if not isinstance(torrent, CollectedTorrent): - torrent_id = 0 - if torrent.torrent_id <= 0: - torrent_id = self.session.lm.torrent_db.getTorrentID(torrent.infohash) - if torrent_id: - torrent.update_torrent_id(torrent_id) - - infohash_str = binascii.hexlify(torrent.infohash) - torrent = CollectedTorrent(torrent, self.tdef if infohash_str.startswith("fc") else pioneer_tdef) - if callback is not None: - callback(torrent) - else: - return torrent + chn_obj._load_torrent = lambda _: defer.Deferred() + + def _load(infohash): + defer_ret = defer.Deferred() + defer_ret.callback(self.tdef if binascii.hexlify(infohash).startswith("fc") else pioneer_tdef) + return defer_ret def activate_mgr(): """ activate ltmgr and adjust max torrents to emulate overflow torrents """ chn_obj.max_torrents = 1 - chn_obj.torrent_mgr.load_torrent = _load + chn_obj._load_torrent = _load reactor.callLater(5, activate_mgr) From 1e20d3747f02de8c5099e817931b97da1459fe67 Mon Sep 17 00:00:00 2001 From: Ardhi Putra Pratama H Date: Sat, 2 Jul 2016 06:22:26 +0200 Subject: [PATCH 2/4] Refactor GUI to cope with GUIDBTuples removal --- Tribler/Main/vwxGUI/CreditMiningPanel.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Tribler/Main/vwxGUI/CreditMiningPanel.py b/Tribler/Main/vwxGUI/CreditMiningPanel.py index b6b5e64afb8..e4477bf65a9 100644 --- a/Tribler/Main/vwxGUI/CreditMiningPanel.py +++ b/Tribler/Main/vwxGUI/CreditMiningPanel.py @@ -382,9 +382,9 @@ def show_source_info(self, data): self.source_label.SetLabel("Source : Channel (stored)") self.source_name.SetLabel("Name : " + data.get_source_text()) - self.torrent_num.SetLabel("# Torrents : " + str(data.channel.nr_torrents)) - self.last_updt.SetLabel("Latest update : " + format_time(data.channel.modified)) - self.votes_num.SetLabel('Favorite votes : ' + str(data.channel.nr_favorites)) + self.torrent_num.SetLabel("# Torrents : " + str(data.channel_dict[4])) + self.last_updt.SetLabel("Latest update : " + format_time(data.channel_dict[8])) + self.votes_num.SetLabel('Favorite votes : ' + str(data.channel_dict[5])) self.status_cm.SetLabel("Active" if data.enabled else "Inactive") debug_str = hexlify(data.source) From 7c7c99b6cb0c1d078a33dd72209f26857ca26187 Mon Sep 17 00:00:00 2001 From: Ardhi Putra Pratama H Date: Tue, 19 Jul 2016 14:41:37 +0200 Subject: [PATCH 3/4] Removed session and task check --- Tribler/Policies/BoostingSource.py | 22 +++------------------- 1 file changed, 3 insertions(+), 19 deletions(-) diff --git a/Tribler/Policies/BoostingSource.py b/Tribler/Policies/BoostingSource.py index a0d95db9bb1..a2916138e7f 100644 --- a/Tribler/Policies/BoostingSource.py +++ b/Tribler/Policies/BoostingSource.py @@ -127,17 +127,6 @@ def get_source_text(self): def _on_err(self, err_msg): self._logger.error(err_msg) - def check_and_register_task(self, name, task, delay=None, value=None, interval=None): - """ - Helper function to avoid assertion in register task. - - It will register task if it has not already registered - """ - task_ret = None - if not self.is_pending_task_active(name): - task_ret = self.register_task(name, task, delay, value, interval) - - return task_ret class ChannelSource(BoostingSource): """ @@ -201,8 +190,8 @@ def get_channel_id(): self.channel_id = self.community._channel_id self.channel_dict = self.channelcast_db.getChannel(self.channel_id) - task_call = self.check_and_register_task(str(self.source) + "_update", - LoopingCall(self._update)).start(self.interval, now=True) + task_call = self.register_task(str(self.source) + "_update", + LoopingCall(self._update)).start(self.interval, now=True) if task_call: self._logger.debug("Registering update call") @@ -272,7 +261,7 @@ def _update(self): # it's highly probable the checktor function is running at this time (if it's already running) # if not running, start the checker - task_call = self.check_and_register_task(hexlify(self.source) + "_checktor", LoopingCall(self._check_tor)) + task_call = self.register_task(hexlify(self.source) + "_checktor", LoopingCall(self._check_tor)) if task_call: self._logger.debug("Registering check torrent function") task_call.start(self.check_torrent_interval, now=True) @@ -289,17 +278,12 @@ def _load_torrent(self, infohash): with TorrentDef object as parameter. """ - # session is quitting - if self.session is None or self.session.lm.torrent_store is None or self.session.get_torrent_store() is None: - return - def add_to_loaded(infohash_str): """ function to add loaded infohash to memory """ self.loaded_torrent[unhexlify(infohash_str)].callback( TorrentDef.load_from_memory(self.session.get_collected_torrent(unhexlify(infohash_str)))) - self.loaded_torrent[unhexlify(infohash_str)] = None if infohash not in self.loaded_torrent: self.loaded_torrent[infohash] = defer.Deferred() From 5b6a6f0afc07b0c161e4732ed51519cbc3703a36 Mon Sep 17 00:00:00 2001 From: Ardhi Putra Pratama H Date: Mon, 26 Sep 2016 12:49:26 +0200 Subject: [PATCH 4/4] Add test for native loading Squashed commits with : - Use looping call in checking loaded torrent --- .../CreditMining/test_creditmining_sys.py | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/Tribler/Test/Core/CreditMining/test_creditmining_sys.py b/Tribler/Test/Core/CreditMining/test_creditmining_sys.py index edf878cc3ff..1dce79614c6 100644 --- a/Tribler/Test/Core/CreditMining/test_creditmining_sys.py +++ b/Tribler/Test/Core/CreditMining/test_creditmining_sys.py @@ -7,6 +7,7 @@ import binascii import os import shutil +from twisted.internet.task import LoopingCall from twisted.internet import defer from twisted.web.server import Site @@ -387,6 +388,8 @@ def check_torrents_channel(src, defer_param=None, target=1): if src_obj.community: src_obj.community.cancel_all_pending_tasks() + self.assertEqual(src_obj.get_source_text(), 'Simple Channel') + defer_param.callback(src) return defer_param @@ -517,6 +520,58 @@ def check_torrents_channel(src, defer_param=None): d.addCallback(check_torrents_channel) return d + @deferred(timeout=40) + def test_chn_native_load(self): + self.session.get_dispersy = lambda: True + self.session.lm.dispersy = Dispersy(ManualEnpoint(0), self.getStateDir()) + dispersy_cid_hex = "abcd" * 9 + "0012" + dispersy_cid = binascii.unhexlify(dispersy_cid_hex) + + # create channel and insert torrent + self.create_fake_allchannel_community() + self.create_torrents_in_channel(dispersy_cid_hex) + + self.session.download_torrentfile = \ + lambda dummy_ihash, function, _: function(binascii.hexlify(TORRENT_FILE_INFOHASH)) + + def get_bin_torrent(_): + """ + get binary data of a torrent + """ + f = open(TORRENT_FILE, "rb") + bdata = f.read() + f.close() + return bdata + + self.session.get_collected_torrent = get_bin_torrent + + self.boosting_manager.add_source(dispersy_cid) + + def _loop_check(_): + defer_param = defer.Deferred() + + def check_loaded(src): + """ + check if a torrent has been loaded + """ + src_obj = self.boosting_manager.get_source_object(src) + if src_obj.loaded_torrent[TORRENT_FILE_INFOHASH] is not None: + src_obj.community.cancel_all_pending_tasks() + src_obj.kill_tasks() + self.check_loaded_lc.stop() + self.check_loaded_lc = None + defer_param.callback(src) + + self.check_loaded_lc = LoopingCall(check_loaded, dispersy_cid) + self.check_loaded_lc.start(1, now=True) + + return defer_param + + defer_ret = self.check_source(dispersy_cid) + defer_ret.addCallback(_loop_check) + + return defer_ret + def tearDown(self): self.session.lm.dispersy._communities['allchannel'].cancel_all_pending_tasks() self.session.lm.dispersy.cancel_all_pending_tasks()