diff --git a/Tribler/Core/APIImplementation/LaunchManyCore.py b/Tribler/Core/APIImplementation/LaunchManyCore.py index 30e1f454512..514dc7e79e4 100644 --- a/Tribler/Core/APIImplementation/LaunchManyCore.py +++ b/Tribler/Core/APIImplementation/LaunchManyCore.py @@ -15,7 +15,6 @@ from threading import Event, enumerate as enumerate_threads from traceback import print_exc -from pony.orm import db_session from six import text_type from twisted.internet import reactor from twisted.internet.defer import Deferred, inlineCallbacks, DeferredList, succeed @@ -26,6 +25,7 @@ from Tribler.Core.CacheDB.sqlitecachedb import forceDBThread from Tribler.Core.DownloadConfig import DownloadStartupConfig, DefaultDownloadStartupConfig from Tribler.Core.Modules.MetadataStore.store import MetadataStore +from Tribler.Core.Modules.gigachannel_manager import GigaChannelManager from Tribler.Core.Modules.payout_manager import PayoutManager from Tribler.Core.Modules.resource_monitor import ResourceMonitor from Tribler.Core.Modules.search_manager import SearchManager @@ -101,6 +101,7 @@ def __init__(self): self.search_manager = None self.channel_manager = None + self.gigachannel_manager = None self.video_server = None @@ -122,7 +123,6 @@ def __init__(self): self.payout_manager = None self.mds = None - def register(self, session, session_lock): assert isInIOThread() if not self.registered: @@ -511,12 +511,8 @@ def init(self): channels_dir = os.path.join(self.session.config.get_chant_channels_dir()) database_path = os.path.join(self.session.config.get_state_dir(), 'sqlite', 'metadata.db') self.mds = MetadataStore(database_path, channels_dir, self.session.trustchain_keypair) - # Metadata Store checks the database at regular intervals to see if new channels are available for preview - # or subscribed channels require updating. - queue_check_interval = 2.0 # seconds - #TODO: add errback here - self.register_task("Process channels download queue", - LoopingCall(self.check_channels_updates)).start(queue_check_interval) + self.gigachannel_manager = GigaChannelManager(self.session) + self.gigachannel_manager.start() self.session.set_download_states_callback(self.sesscb_states_callback) @@ -525,88 +521,6 @@ def init(self): self.initComplete = True - def check_channels_updates(self): - with db_session: - channels_queue = list(self.mds.ChannelMetadata.get_updated_channels()) - - for channel in channels_queue: - if not self.session.has_download(binascii.hexlify(str(channel.infohash))): - self._logger.info("Downloading new channel version %s ver %i->%i", - str(channel.public_key).encode("hex"), - channel.local_version, channel.version) - self.download_channel(channel) - - def check_obsolete_channel_torrents(self): - # Now check for obsolete channel torrents - #channel.votes = download.get_num_connected_seeds_peers()[0] - - for dl in [d for d in self.session.get_downloads() if - d.get_channel_download() and (time.time() - dl.tdef.get_creation_date()) > 604800]: - infohash = dl.tdef.infohash() - with db_session: - if self.mds.ChannelMetadata.get_channel_with_infohash(infohash): - continue - # This is more than a week old previous version of some channel: delete it - self._logger.debug("Removing old channel version %s", infohash.encode('hex')) - self.session.remove_download(dl) - - def on_channel_download_finished(self, download, channel_id, finished_deferred=None): - if download.get_channel_download(): - channel_dirname = os.path.join(self.session.lm.mds.channels_dir, download.get_def().get_name()) - self.mds.process_channel_dir(channel_dirname, channel_id) - if finished_deferred: - finished_deferred.callback(download) - - @db_session - def remove_channel(self, channel): - channel.subscribed = False - channel.remove_contents() - channel.local_version = 0 - - # Remove all stuff matching the channel dir name / public key / torrent title - remove_list = [d for d in self.get_channel_downloads() if d.tdef.get_name_utf8() == channel.dir_name] - - def _on_remove_failure(failure): - self._logger.exception(failure) - - for i, d in enumerate(remove_list): - deferred = self.session.remove_download(d, remove_content=True) - deferred.addErrback(_on_remove_failure) - self.register_task(u'Remove_channel' + d.tdef.get_name_utf8() + u'-' + binascii.hexlify( - d.tdef.get_infohash()) + u'-' + str(i), deferred) - - def download_channel(self, channel): - """ - Download a channel with a given infohash and title. - :param channel: The channel metadata ORM object. - """ - finished_deferred = Deferred() - - dcfg = DownloadStartupConfig() - dcfg.set_dest_dir(self.mds.channels_dir) - dcfg.set_channel_download(True) - tdef = TorrentDefNoMetainfo(infohash=str(channel.infohash), name=channel.dir_name) - download = self.session.start_download_from_tdef(tdef, dcfg) - channel_id = channel.public_key - #TODO: add errbacks here! - download.finished_callback = lambda dl: self.on_channel_download_finished(dl, channel_id, finished_deferred) - if download.get_state().get_status() == DLSTATUS_SEEDING and not download.finished_callback_already_called: - download.finished_callback_already_called = True - download.finished_callback(download) - return download, finished_deferred - - def updated_my_channel(self, new_torrent_path): - """ - Notify the core that we updated our channel. - :param new_torrent_path: path to the new torrent file - """ - # Start the new download - tdef = TorrentDef.load(new_torrent_path) - dcfg = DownloadStartupConfig() - dcfg.set_dest_dir(self.mds.channels_dir) - dcfg.set_channel_download(True) - self.add(tdef, dcfg) - def add(self, tdef, dscfg, pstate=None, setupDelay=0, hidden=False, share_mode=False, checkpoint_disabled=False): """ Called by any thread """ @@ -707,7 +621,7 @@ def get_downloads(self): def get_channel_downloads(self): with self.session_lock: - return [d for d in self.downloads.values() if d.get_channel_download()] + return [download for download in self.downloads.values() if download.get_channel_download()] def get_download(self, infohash): """ Called by any thread """ @@ -1037,6 +951,11 @@ def early_shutdown(self): yield self.channel_manager.shutdown() self.channel_manager = None + if self.gigachannel_manager: + self.session.notify_shutdown_state("Shutting down Gigachannel Manager...") + yield self.gigachannel_manager.shutdown() + self.gigachannel_manager = None + if self.search_manager: self.session.notify_shutdown_state("Shutting down Search Manager...") yield self.search_manager.shutdown() diff --git a/Tribler/Core/Modules/gigachannel_manager.py b/Tribler/Core/Modules/gigachannel_manager.py new file mode 100644 index 00000000000..4a2f09d9732 --- /dev/null +++ b/Tribler/Core/Modules/gigachannel_manager.py @@ -0,0 +1,116 @@ +import os +from binascii import hexlify + +from pony.orm import db_session +from twisted.internet.defer import Deferred +from twisted.internet.task import LoopingCall + +from Tribler.Core.DownloadConfig import DownloadStartupConfig +from Tribler.Core.TorrentDef import TorrentDefNoMetainfo, TorrentDef +from Tribler.pyipv8.ipv8.taskmanager import TaskManager + + +class GigaChannelManager(TaskManager): + """ + This class represents the main manager for gigachannels. + It provides methods to manage channels, download new channels or remove existing ones. + """ + + def __init__(self, session): + super(GigaChannelManager, self).__init__() + self.session = session + + def start(self): + """ + The Metadata Store checks the database at regular intervals to see if new channels are available for preview + or subscribed channels require updating. + """ + queue_check_interval = 2.0 # seconds + self.register_task("Process channels download queue", + LoopingCall(self.check_channels_updates)).start(queue_check_interval) + + def shutdown(self): + """ + Stop the gigachannel manager. + """ + self.shutdown_task_manager() + + def check_channels_updates(self): + """ + Check whether there are channels that are updated. If so, download the new version of the channel. + """ + with db_session: + channels_queue = list(self.session.lm.mds.ChannelMetadata.get_updated_channels()) + + for channel in channels_queue: + if not self.session.has_download(hexlify(str(channel.infohash))): + self._logger.info("Downloading new channel version %s ver %i->%i", + str(channel.public_key).encode("hex"), + channel.local_version, channel.version) + self.download_channel(channel) + + def on_channel_download_finished(self, download, channel_id, finished_deferred=None): + """ + We have finished with downloading a channel. + :param download: The channel download itself. + :param channel_id: The ID of the channel. + :param finished_deferred: An optional deferred that should fire if the channel download has finished. + """ + if download.get_channel_download(): + channel_dirname = os.path.join(self.session.lm.mds.channels_dir, download.get_def().get_name()) + self.session.lm.mds.process_channel_dir(channel_dirname, channel_id) + if finished_deferred: + finished_deferred.callback(download) + + @db_session + def remove_channel(self, channel): + """ + Remove a channel from your local database/download list. + :param channel: The channel to remove. + """ + channel.subscribed = False + channel.remove_contents() + channel.local_version = 0 + + # Remove all stuff matching the channel dir name / public key / torrent title + remove_list = [d for d in self.session.lm.get_channel_downloads() if d.tdef.get_name_utf8() == channel.dir_name] + + def _on_remove_failure(failure): + self._logger.error("Error when removing the channel download: %s", failure) + + for i, d in enumerate(remove_list): + deferred = self.session.remove_download(d, remove_content=True) + deferred.addErrback(_on_remove_failure) + self.register_task(u'remove_channel' + d.tdef.get_name_utf8() + u'-' + hexlify(d.tdef.get_infohash()) + + u'-' + str(i), deferred) + + def download_channel(self, channel): + """ + Download a channel with a given infohash and title. + :param channel: The channel metadata ORM object. + """ + finished_deferred = Deferred() + + dcfg = DownloadStartupConfig() + dcfg.set_dest_dir(self.session.lm.mds.channels_dir) + dcfg.set_channel_download(True) + tdef = TorrentDefNoMetainfo(infohash=str(channel.infohash), name=channel.dir_name) + download = self.session.start_download_from_tdef(tdef, dcfg) + channel_id = channel.public_key + #TODO: add errbacks here! + download.finished_callback = lambda dl: self.on_channel_download_finished(dl, channel_id, finished_deferred) + if download.get_state().get_status() == DLSTATUS_SEEDING and not download.finished_callback_already_called: + download.finished_callback_already_called = True + download.finished_callback(download) + return download, finished_deferred + + def updated_my_channel(self, new_torrent_path): + """ + Notify the core that we updated our channel. + :param new_torrent_path: path to the new torrent file + """ + tdef = TorrentDef.load(new_torrent_path) + dcfg = DownloadStartupConfig() + dcfg.set_dest_dir(self.session.lm.mds.channels_dir) + dcfg.set_channel_download(True) + self.session.lm.add(tdef, dcfg) diff --git a/Tribler/Core/Modules/restapi/channels/channels_subscription_endpoint.py b/Tribler/Core/Modules/restapi/channels/channels_subscription_endpoint.py index b88348d87b6..72b20a0c630 100644 --- a/Tribler/Core/Modules/restapi/channels/channels_subscription_endpoint.py +++ b/Tribler/Core/Modules/restapi/channels/channels_subscription_endpoint.py @@ -171,7 +171,7 @@ def render_DELETE(self, request): return ChannelsModifySubscriptionEndpoint.return_404(request) elif not channel.subscribed: return ChannelsModifySubscriptionEndpoint.return_404(request, message=NOT_SUBSCRIBED_RESPONSE_MSG) - self.session.lm.remove_channel(channel) + self.session.lm.gigachannel_manager.remove_channel(channel) return json.dumps({"unsubscribed": True}) channel_info = self.get_channel_from_db(self.cid) diff --git a/Tribler/Core/Modules/restapi/downloads_endpoint.py b/Tribler/Core/Modules/restapi/downloads_endpoint.py index 59093851be2..962071c222b 100644 --- a/Tribler/Core/Modules/restapi/downloads_endpoint.py +++ b/Tribler/Core/Modules/restapi/downloads_endpoint.py @@ -350,7 +350,7 @@ def on_error(error): channel = self.session.lm.mds.process_payload(payload) if channel and not channel.subscribed and channel.local_version < channel.version: channel.subscribed = True - download, _ = self.session.lm.download_channel(channel) + download, _ = self.session.lm.gigachannel_manager.download_channel(channel) else: return json.dumps({"error": "Already subscribed"}) diff --git a/Tribler/Test/Core/Modules/MetadataStore/test_channel_download.py b/Tribler/Test/Core/Modules/MetadataStore/test_channel_download.py index 0f7e18fe0b1..aad6239729b 100644 --- a/Tribler/Test/Core/Modules/MetadataStore/test_channel_download.py +++ b/Tribler/Test/Core/Modules/MetadataStore/test_channel_download.py @@ -47,7 +47,7 @@ def test_channel_update_and_download(self): # Download the channel in our session with db_session: channel = self.session.lm.mds.process_payload(payload) - download, finished_deferred = self.session.lm.download_channel(channel) + download, finished_deferred = self.session.lm.gigachannel_manager.download_channel(channel) download.add_peer(("127.0.0.1", self.seeder_session.config.get_libtorrent_port())) yield finished_deferred