Skip to content

Commit

Permalink
Refactored gigchannel logic in seperate manager
Browse files Browse the repository at this point in the history
  • Loading branch information
devos50 committed Dec 23, 2018
1 parent c78f2ff commit f5c4d72
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 94 deletions.
101 changes: 10 additions & 91 deletions Tribler/Core/APIImplementation/LaunchManyCore.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from threading import Event, enumerate as enumerate_threads
from traceback import print_exc

from pony.orm import db_session
from six import text_type
from twisted.internet import reactor
from twisted.internet.defer import Deferred, inlineCallbacks, DeferredList, succeed
Expand All @@ -26,6 +25,7 @@
from Tribler.Core.CacheDB.sqlitecachedb import forceDBThread
from Tribler.Core.DownloadConfig import DownloadStartupConfig, DefaultDownloadStartupConfig
from Tribler.Core.Modules.MetadataStore.store import MetadataStore
from Tribler.Core.Modules.gigachannel_manager import GigaChannelManager
from Tribler.Core.Modules.payout_manager import PayoutManager
from Tribler.Core.Modules.resource_monitor import ResourceMonitor
from Tribler.Core.Modules.search_manager import SearchManager
Expand Down Expand Up @@ -101,6 +101,7 @@ def __init__(self):

self.search_manager = None
self.channel_manager = None
self.gigachannel_manager = None

self.video_server = None

Expand All @@ -122,7 +123,6 @@ def __init__(self):
self.payout_manager = None
self.mds = None


def register(self, session, session_lock):
assert isInIOThread()
if not self.registered:
Expand Down Expand Up @@ -511,12 +511,8 @@ def init(self):
channels_dir = os.path.join(self.session.config.get_chant_channels_dir())
database_path = os.path.join(self.session.config.get_state_dir(), 'sqlite', 'metadata.db')
self.mds = MetadataStore(database_path, channels_dir, self.session.trustchain_keypair)
# Metadata Store checks the database at regular intervals to see if new channels are available for preview
# or subscribed channels require updating.
queue_check_interval = 2.0 # seconds
#TODO: add errback here
self.register_task("Process channels download queue",
LoopingCall(self.check_channels_updates)).start(queue_check_interval)
self.gigachannel_manager = GigaChannelManager(self.session)
self.gigachannel_manager.start()

self.session.set_download_states_callback(self.sesscb_states_callback)

Expand All @@ -525,88 +521,6 @@ def init(self):

self.initComplete = True

def check_channels_updates(self):
with db_session:
channels_queue = list(self.mds.ChannelMetadata.get_updated_channels())

for channel in channels_queue:
if not self.session.has_download(binascii.hexlify(str(channel.infohash))):
self._logger.info("Downloading new channel version %s ver %i->%i",
str(channel.public_key).encode("hex"),
channel.local_version, channel.version)
self.download_channel(channel)

def check_obsolete_channel_torrents(self):
# Now check for obsolete channel torrents
#channel.votes = download.get_num_connected_seeds_peers()[0]

for dl in [d for d in self.session.get_downloads() if
d.get_channel_download() and (time.time() - dl.tdef.get_creation_date()) > 604800]:
infohash = dl.tdef.infohash()
with db_session:
if self.mds.ChannelMetadata.get_channel_with_infohash(infohash):
continue
# This is more than a week old previous version of some channel: delete it
self._logger.debug("Removing old channel version %s", infohash.encode('hex'))
self.session.remove_download(dl)

def on_channel_download_finished(self, download, channel_id, finished_deferred=None):
if download.get_channel_download():
channel_dirname = os.path.join(self.session.lm.mds.channels_dir, download.get_def().get_name())
self.mds.process_channel_dir(channel_dirname, channel_id)
if finished_deferred:
finished_deferred.callback(download)

@db_session
def remove_channel(self, channel):
channel.subscribed = False
channel.remove_contents()
channel.local_version = 0

# Remove all stuff matching the channel dir name / public key / torrent title
remove_list = [d for d in self.get_channel_downloads() if d.tdef.get_name_utf8() == channel.dir_name]

def _on_remove_failure(failure):
self._logger.exception(failure)

for i, d in enumerate(remove_list):
deferred = self.session.remove_download(d, remove_content=True)
deferred.addErrback(_on_remove_failure)
self.register_task(u'Remove_channel' + d.tdef.get_name_utf8() + u'-' + binascii.hexlify(
d.tdef.get_infohash()) + u'-' + str(i), deferred)

def download_channel(self, channel):
"""
Download a channel with a given infohash and title.
:param channel: The channel metadata ORM object.
"""
finished_deferred = Deferred()

dcfg = DownloadStartupConfig()
dcfg.set_dest_dir(self.mds.channels_dir)
dcfg.set_channel_download(True)
tdef = TorrentDefNoMetainfo(infohash=str(channel.infohash), name=channel.dir_name)
download = self.session.start_download_from_tdef(tdef, dcfg)
channel_id = channel.public_key
#TODO: add errbacks here!
download.finished_callback = lambda dl: self.on_channel_download_finished(dl, channel_id, finished_deferred)
if download.get_state().get_status() == DLSTATUS_SEEDING and not download.finished_callback_already_called:
download.finished_callback_already_called = True
download.finished_callback(download)
return download, finished_deferred

def updated_my_channel(self, new_torrent_path):
"""
Notify the core that we updated our channel.
:param new_torrent_path: path to the new torrent file
"""
# Start the new download
tdef = TorrentDef.load(new_torrent_path)
dcfg = DownloadStartupConfig()
dcfg.set_dest_dir(self.mds.channels_dir)
dcfg.set_channel_download(True)
self.add(tdef, dcfg)

def add(self, tdef, dscfg, pstate=None, setupDelay=0, hidden=False,
share_mode=False, checkpoint_disabled=False):
""" Called by any thread """
Expand Down Expand Up @@ -707,7 +621,7 @@ def get_downloads(self):

def get_channel_downloads(self):
with self.session_lock:
return [d for d in self.downloads.values() if d.get_channel_download()]
return [download for download in self.downloads.values() if download.get_channel_download()]

def get_download(self, infohash):
""" Called by any thread """
Expand Down Expand Up @@ -1037,6 +951,11 @@ def early_shutdown(self):
yield self.channel_manager.shutdown()
self.channel_manager = None

if self.gigachannel_manager:
self.session.notify_shutdown_state("Shutting down Gigachannel Manager...")
yield self.gigachannel_manager.shutdown()
self.gigachannel_manager = None

if self.search_manager:
self.session.notify_shutdown_state("Shutting down Search Manager...")
yield self.search_manager.shutdown()
Expand Down
116 changes: 116 additions & 0 deletions Tribler/Core/Modules/gigachannel_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import os
from binascii import hexlify

from pony.orm import db_session
from twisted.internet.defer import Deferred
from twisted.internet.task import LoopingCall

from Tribler.Core.DownloadConfig import DownloadStartupConfig
from Tribler.Core.TorrentDef import TorrentDefNoMetainfo, TorrentDef
from Tribler.pyipv8.ipv8.taskmanager import TaskManager


class GigaChannelManager(TaskManager):
"""
This class represents the main manager for gigachannels.
It provides methods to manage channels, download new channels or remove existing ones.
"""

def __init__(self, session):
super(GigaChannelManager, self).__init__()
self.session = session

def start(self):
"""
The Metadata Store checks the database at regular intervals to see if new channels are available for preview
or subscribed channels require updating.
"""
queue_check_interval = 2.0 # seconds
self.register_task("Process channels download queue",
LoopingCall(self.check_channels_updates)).start(queue_check_interval)

def shutdown(self):
"""
Stop the gigachannel manager.
"""
self.shutdown_task_manager()

def check_channels_updates(self):
"""
Check whether there are channels that are updated. If so, download the new version of the channel.
"""
with db_session:
channels_queue = list(self.session.lm.mds.ChannelMetadata.get_updated_channels())

for channel in channels_queue:
if not self.session.has_download(hexlify(str(channel.infohash))):
self._logger.info("Downloading new channel version %s ver %i->%i",
str(channel.public_key).encode("hex"),
channel.local_version, channel.version)
self.download_channel(channel)

def on_channel_download_finished(self, download, channel_id, finished_deferred=None):
"""
We have finished with downloading a channel.
:param download: The channel download itself.
:param channel_id: The ID of the channel.
:param finished_deferred: An optional deferred that should fire if the channel download has finished.
"""
if download.get_channel_download():
channel_dirname = os.path.join(self.session.lm.mds.channels_dir, download.get_def().get_name())
self.session.lm.mds.process_channel_dir(channel_dirname, channel_id)
if finished_deferred:
finished_deferred.callback(download)

@db_session
def remove_channel(self, channel):
"""
Remove a channel from your local database/download list.
:param channel: The channel to remove.
"""
channel.subscribed = False
channel.remove_contents()
channel.local_version = 0

# Remove all stuff matching the channel dir name / public key / torrent title
remove_list = [d for d in self.session.lm.get_channel_downloads() if d.tdef.get_name_utf8() == channel.dir_name]

def _on_remove_failure(failure):
self._logger.error("Error when removing the channel download: %s", failure)

for i, d in enumerate(remove_list):
deferred = self.session.remove_download(d, remove_content=True)
deferred.addErrback(_on_remove_failure)
self.register_task(u'remove_channel' + d.tdef.get_name_utf8() + u'-' + hexlify(d.tdef.get_infohash()) +
u'-' + str(i), deferred)

def download_channel(self, channel):
"""
Download a channel with a given infohash and title.
:param channel: The channel metadata ORM object.
"""
finished_deferred = Deferred()

dcfg = DownloadStartupConfig()
dcfg.set_dest_dir(self.session.lm.mds.channels_dir)
dcfg.set_channel_download(True)
tdef = TorrentDefNoMetainfo(infohash=str(channel.infohash), name=channel.dir_name)
download = self.session.start_download_from_tdef(tdef, dcfg)
channel_id = channel.public_key
#TODO: add errbacks here!
download.finished_callback = lambda dl: self.on_channel_download_finished(dl, channel_id, finished_deferred)
if download.get_state().get_status() == DLSTATUS_SEEDING and not download.finished_callback_already_called:
download.finished_callback_already_called = True
download.finished_callback(download)
return download, finished_deferred

def updated_my_channel(self, new_torrent_path):
"""
Notify the core that we updated our channel.
:param new_torrent_path: path to the new torrent file
"""
tdef = TorrentDef.load(new_torrent_path)
dcfg = DownloadStartupConfig()
dcfg.set_dest_dir(self.session.lm.mds.channels_dir)
dcfg.set_channel_download(True)
self.session.lm.add(tdef, dcfg)
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def render_DELETE(self, request):
return ChannelsModifySubscriptionEndpoint.return_404(request)
elif not channel.subscribed:
return ChannelsModifySubscriptionEndpoint.return_404(request, message=NOT_SUBSCRIBED_RESPONSE_MSG)
self.session.lm.remove_channel(channel)
self.session.lm.gigachannel_manager.remove_channel(channel)
return json.dumps({"unsubscribed": True})

channel_info = self.get_channel_from_db(self.cid)
Expand Down
2 changes: 1 addition & 1 deletion Tribler/Core/Modules/restapi/downloads_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ def on_error(error):
channel = self.session.lm.mds.process_payload(payload)
if channel and not channel.subscribed and channel.local_version < channel.version:
channel.subscribed = True
download, _ = self.session.lm.download_channel(channel)
download, _ = self.session.lm.gigachannel_manager.download_channel(channel)
else:
return json.dumps({"error": "Already subscribed"})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def test_channel_update_and_download(self):
# Download the channel in our session
with db_session:
channel = self.session.lm.mds.process_payload(payload)
download, finished_deferred = self.session.lm.download_channel(channel)
download, finished_deferred = self.session.lm.gigachannel_manager.download_channel(channel)
download.add_peer(("127.0.0.1", self.seeder_session.config.get_libtorrent_port()))
yield finished_deferred

Expand Down

0 comments on commit f5c4d72

Please sign in to comment.