diff --git a/README.md b/README.md index c1fca932d00..157d4be7ca5 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ We make use of submodules, so remember using the --recursive argument when cloni ### Debian/Ubuntu/Mint ```bash -sudo apt-get install libav-tools libjs-excanvas libjs-mootools libx11-6 python-apsw python-cherrypy3 python-crypto python-feedparser python-gmpy python-libtorrent python-m2crypto python-netifaces python-pil python-pyasn1 python-requests python-twisted python-wxgtk2.8 python2.7 vlc +sudo apt-get install libav-tools libjs-excanvas libjs-mootools libx11-6 python-apsw python-leveldb python-cherrypy3 python-crypto python-feedparser python-gmpy python-libtorrent python-m2crypto python-netifaces python-pil python-pyasn1 python-requests python-twisted python-wxgtk2.8 python2.7 vlc ``` ### Windows and OSX diff --git a/Tribler/Core/APIImplementation/LaunchManyCore.py b/Tribler/Core/APIImplementation/LaunchManyCore.py index ccb8f5a4524..ae48654d9a9 100644 --- a/Tribler/Core/APIImplementation/LaunchManyCore.py +++ b/Tribler/Core/APIImplementation/LaunchManyCore.py @@ -22,6 +22,7 @@ from Tribler.Core.osutils import get_readable_torrent_name from Tribler.Core.simpledefs import (NTFY_DISPERSY, NTFY_STARTED, NTFY_TORRENTS, NTFY_UPDATE, NTFY_INSERT, NTFY_ACTIVITIES, NTFY_REACHABLE, NTFY_ACT_UPNP) +from Tribler.Core.torrentstore import TorrentStore from Tribler.Main.globals import DefaultDownloadStartupConfig from Tribler.community.tunnel.crypto.elgamalcrypto import ElgamalCrypto from Tribler.dispersy.util import blockingCallFromThread @@ -76,6 +77,7 @@ def __init__(self): self.rawserver = None self.multihandler = None + self.torrent_store = None self.rtorrent_handler = None self.tftp_handler = None @@ -113,6 +115,9 @@ def register(self, session, sesslock, autoload_discovery=True): self.multihandler = MultiHandler(self.rawserver, self.sessdoneflag) + if self.session.get_torrent_store(): + self.torrent_store = TorrentStore(self.session.get_torrent_store_dir()) + # torrent collecting: RemoteTorrentHandler if self.session.get_torrent_collecting(): from Tribler.Core.RemoteTorrentHandler import RemoteTorrentHandler @@ -656,6 +661,10 @@ def early_shutdown(self): self.tftp_handler.shutdown() self.tftp_handler = None + if self.torrent_store is not None: + self.torrent_store.close() + self.torrent_store = None + if self.dispersy: self._logger.info("lmc: Shutting down Dispersy...") now = timemod.time() diff --git a/Tribler/Core/CacheDB/db_versions.py b/Tribler/Core/CacheDB/db_versions.py index dbe7366ea94..2f48b9bf6b1 100644 --- a/Tribler/Core/CacheDB/db_versions.py +++ b/Tribler/Core/CacheDB/db_versions.py @@ -3,7 +3,8 @@ # 18 is used by Tribler 6.1.x - 6.2.0 # 22 is used by Tribler 6.3.x # 23 is used by Tribler 6.4.0 RC1 -# 24 is used by Tribler 6.4.0 RC2 +# 24 is used by Tribler 6.4.0 RC2 - 6.4.X +# 25 is used by Tribler 6.5-git TRIBLER_59_DB_VERSION = 17 TRIBLER_60_DB_VERSION = 17 @@ -17,8 +18,10 @@ TRIBLER_64RC2_DB_VERSION = 24 +TRIBLER_65PRE_DB_VERSION = 25 + # the lowest supported database version number LOWEST_SUPPORTED_DB_VERSION = TRIBLER_59_DB_VERSION # the latest database version number -LATEST_DB_VERSION = TRIBLER_64RC2_DB_VERSION +LATEST_DB_VERSION = TRIBLER_65PRE_DB_VERSION diff --git a/Tribler/Core/RemoteTorrentHandler.py b/Tribler/Core/RemoteTorrentHandler.py index bf3fd8675a6..4144abc8b8c 100644 --- a/Tribler/Core/RemoteTorrentHandler.py +++ b/Tribler/Core/RemoteTorrentHandler.py @@ -20,7 +20,7 @@ from Tribler.Core.TorrentDef import TorrentDef from Tribler.Core.simpledefs import NTFY_TORRENTS, INFOHASH_LENGTH - +from Tribler.Core.torrentstore import TorrentStore TORRENT_OVERFLOW_CHECKING_INTERVAL = 30 * 60 LOW_PRIO_COLLECTING = 0 @@ -140,39 +140,44 @@ def download_torrent(self, candidate, infohash, user_callback=None, priority=1, callback = lambda ih = infohash: user_callback(ih) self.torrent_callbacks.setdefault(infohash, set()).add(callback) - def get_torrent_filename(self, infohash): - return u"%s.torrent" % hexlify(infohash) - - def get_torrent_filepath(self, infohash): - return os.path.join(self.tor_col_dir, self.get_torrent_filename(infohash)) - - def has_torrent(self, infohash): - return os.path.exists(self.get_torrent_filepath(infohash)) - @call_on_reactor_thread def save_torrent(self, tdef, callback=None): infohash = tdef.get_infohash() - file_path = self.get_torrent_filepath(infohash) + infohash_str = hexlify(infohash) - if not self.has_torrent(infohash): + if self.session.lm.torrent_store == None: + self._logger.error("Torrent store is not loaded") + return + + # TODO(emilon): could we check the database instead of the store? + # Checking if a key is present fetches the whole torrent from disk if its + # not on the writeback cache. + if infohash_str not in self.session.lm.torrent_store: # save torrent to file try: - tdef.save(file_path) + bdata = tdef.encode() + except Exception as e: - self._logger(u"failed to save torrent %s: %s", file_path, e) + self._logger.error(u"failed to encode torrent %s: %s", infohash_str, e) return + try: + self.session.lm.torrent_store[infohash_str] = bdata + except Exception as e: + self._logger.error(u"failed to store torrent data for %s, exception was: %s", infohash_str, e) # add torrent to database if self.torrent_db.hasTorrent(infohash): - self.torrent_db.updateTorrent(infohash, torrent_file_name=file_path) + self.torrent_db.updateTorrent(infohash, torrent_file_name="lvl") else: - self.torrent_db.addExternalTorrent(tdef, extra_info={u"filename": file_path, u"status": u"good"}) + self.torrent_db.addExternalTorrent(tdef, extra_info={u"filename": "lvl", u"status": u"good"}) if callback: + # TODO(emilon): should we catch exceptions from the callback? callback() + # TODO(emilon): remove all the torrent_file_name references in the callback chain # notify all - self.notify_possible_torrent_infohash(infohash, file_path) + self.notify_possible_torrent_infohash(infohash, infohash_str) @call_on_reactor_thread def download_torrentmessage(self, candidate, infohash, user_callback=None, priority=1): @@ -234,6 +239,7 @@ def save_metadata(self, infohash, thumbnail_subpath, data): self.notify_possible_metadata_infohash(infohash, thumbnail_subpath) + # TODO(emilon): HERE def notify_possible_torrent_infohash(self, infohash, torrent_file_name=None): if infohash not in self.torrent_callbacks: return @@ -564,15 +570,15 @@ def _do_request(self): if thumbnail_subpath: file_name = thumbnail_subpath else: - file_name = self._remote_torrent_handler.get_torrent_filename(infohash) + file_name = hexlify(infohash)+'.torrent' extra_info = {u"infohash": infohash, u"thumbnail_subpath": thumbnail_subpath} # do not download if TFTP has been shutdown if self._session.lm.tftp_handler is None: return self._session.lm.tftp_handler.download_file(file_name, ip, port, extra_info=extra_info, - success_callback=self._success_callback, - failure_callback=self._failure_callback) + success_callback=self._on_download_successful, + failure_callback=self._on_download_failed) self._active_request_list.append(key) def _clear_active_request(self, key): @@ -581,7 +587,7 @@ def _clear_active_request(self, key): self._active_request_list.remove(key) @call_on_reactor_thread - def _success_callback(self, address, file_name, file_data, extra_info): + def _on_download_successful(self, address, file_name, file_data, extra_info): self._logger.debug(u"successfully downloaded %s from %s:%s", file_name, address[0], address[1]) # metadata has thumbnail_subpath info @@ -612,7 +618,7 @@ def _success_callback(self, address, file_name, file_data, extra_info): self._start_pending_requests() @call_on_reactor_thread - def _failure_callback(self, address, file_name, error_msg, extra_info): + def _on_download_failed(self, address, file_name, error_msg, extra_info): self._logger.debug(u"failed to download %s from %s:%s: %s", file_name, address[0], address[1], error_msg) # metadata has thumbnail_subpath info diff --git a/Tribler/Core/Session.py b/Tribler/Core/Session.py index a7b130d1ed1..b0226b9bdc4 100644 --- a/Tribler/Core/Session.py +++ b/Tribler/Core/Session.py @@ -8,19 +8,22 @@ import socket from Tribler.Core import NoDispersyRLock - -from Tribler.Core.CacheDB.sqlitecachedb import SQLiteCacheDB -from Tribler.Core.Upgrade.upgrade import TriblerUpgrader - from Tribler.Core.APIImplementation.LaunchManyCore import TriblerLaunchMany from Tribler.Core.APIImplementation.UserCallbackHandler import UserCallbackHandler +from Tribler.Core.CacheDB.sqlitecachedb import SQLiteCacheDB from Tribler.Core.SessionConfig import SessionConfigInterface, SessionStartupConfig +from Tribler.Core.Upgrade.upgrade import TriblerUpgrader from Tribler.Core.exceptions import NotYetImplementedException, OperationNotEnabledByConfigurationException -from Tribler.Core.osutils import get_appstate_dir, is_android -from Tribler.Core.simpledefs import (STATEDIR_TORRENTCOLL_DIR, STATEDIR_PEERICON_DIR, STATEDIR_DLPSTATE_DIR, - STATEDIR_SESSCONFIG, NTFY_MISC, NTFY_PEERS, NTFY_BUNDLERPREFERENCE, - NTFY_TORRENTS, NTFY_MYPREFERENCES, NTFY_VOTECAST, NTFY_CHANNELCAST, NTFY_UPDATE, - NTFY_USEREVENTLOG, NTFY_INSERT, NTFY_DELETE, NTFY_METADATA) +from Tribler.Core.osutils import get_appstate_dir +from Tribler.Core.simpledefs import (STATEDIR_PEERICON_DIR, + STATEDIR_DLPSTATE_DIR, STATEDIR_SESSCONFIG, + NTFY_MISC, NTFY_PEERS, + NTFY_BUNDLERPREFERENCE, NTFY_TORRENTS, + NTFY_MYPREFERENCES, NTFY_VOTECAST, + NTFY_CHANNELCAST, NTFY_UPDATE, + NTFY_USEREVENTLOG, NTFY_INSERT, NTFY_DELETE, + NTFY_METADATA, STATEDIR_TORRENT_STORE_DIR) + GOTM2CRYPTO = False try: @@ -92,8 +95,19 @@ def set_and_create_dir(dirname, setter, default_dir): create_dir(dirname or default_dir) set_and_create_dir(scfg.get_state_dir(), scfg.set_state_dir, Session.get_default_state_dir()) - set_and_create_dir(scfg.get_torrent_collecting_dir(), scfg.set_torrent_collecting_dir, os.path.join(scfg.get_state_dir(), STATEDIR_TORRENTCOLL_DIR)) - set_and_create_dir(scfg.get_peer_icon_path(), scfg.set_peer_icon_path, os.path.join(scfg.get_state_dir(), STATEDIR_PEERICON_DIR)) + # Note that we are setting it to STATEDIR_TORRENT_STORE_DIR instead of + # STATEDIR_TORRENTCOLL_DIR as that dir is unused and only kept for + # the upgrade process. + set_and_create_dir(scfg.get_torrent_collecting_dir(), + scfg.set_torrent_collecting_dir, + os.path.join(scfg.get_state_dir(), STATEDIR_TORRENT_STORE_DIR)) + + set_and_create_dir(scfg.get_torrent_store_dir(), + scfg.set_torrent_store_dir, + os.path.join(scfg.get_state_dir(), STATEDIR_TORRENT_STORE_DIR)) + + set_and_create_dir(scfg.get_peer_icon_path(), scfg.set_peer_icon_path, + os.path.join(scfg.get_state_dir(), STATEDIR_PEERICON_DIR)) create_dir(os.path.join(scfg.get_state_dir(), u"sqlite")) @@ -189,6 +203,7 @@ def del_instance(): Session.__single = None del_instance = staticmethod(del_instance) + @staticmethod def get_default_state_dir(homedirpostfix='.Tribler'): """ Returns the factory default directory for storing session state on the current platform (Win32,Mac,Unix). @@ -207,8 +222,6 @@ def get_default_state_dir(homedirpostfix='.Tribler'): statedir = os.path.join(appdir, homedirpostfix) return statedir - get_default_state_dir = staticmethod(get_default_state_dir) - # # Public methods # diff --git a/Tribler/Core/SessionConfig.py b/Tribler/Core/SessionConfig.py index f8ccba3ab0d..30a8c7bbe9c 100644 --- a/Tribler/Core/SessionConfig.py +++ b/Tribler/Core/SessionConfig.py @@ -286,6 +286,31 @@ def get_libtorrent_utp(self): """ return self.sessconfig.get(u'libtorrent', u'utp') + # + # Torrent file store + # + def get_torrent_store(self): + """ Returns whether to enable the torrent store. + @return Boolean. """ + return self.sessconfig.get(u'torrent_store', u'enabled') + + def set_torrent_store(self, value): + """ Store torrent files in a leveldb database (default = True). + @param value Boolean. + """ + self.sessconfig.set(u'torrent_store', u'enabled', value) + + def get_torrent_store_dir(self): + """ Returns the torrent store directory. + @return str + """ + return self.sessconfig.get(u'torrent_store', u'dir') + + def set_torrent_store_dir(self, value): + """ Store torrent store dir(default = state_dir/collected_torrents). + @param value str. + """ + self.sessconfig.set(u'torrent_store', u'dir', value) # # Torrent file collecting diff --git a/Tribler/Core/TFTP/handler.py b/Tribler/Core/TFTP/handler.py index 1268942d05f..c1d301a305f 100644 --- a/Tribler/Core/TFTP/handler.py +++ b/Tribler/Core/TFTP/handler.py @@ -289,7 +289,7 @@ def _handle_new_request(self, ip, port, packet): if file_name.startswith(DIR_PREFIX): file_data, file_size = self._load_directory(file_name) else: - file_data, file_size = self._load_file(file_name) + file_data, file_size = self._load_torrent(file_name) checksum = b64encode(sha1(file_data).digest()) except FileNotFound as e: self._logger.warn(u"[READ %s:%s] file/dir not found: %s", ip, port, e) @@ -371,6 +371,21 @@ def _load_directory(self, file_name): # load the zip file as binary return self._load_file(file_name, file_path=tmpfile_path) + def _load_torrent(self, file_name): + """ Loads a file into memory. + :param file_name: The path of the file. + """ + + infohash=file_name[:-8] # len('.torrent') = 8 + + file_data = self.session.lm.torrent_store.get(infohash) + # check if file exists + if not file_data: + msg = u"Torrent not in store: %s" % infohash + raise FileNotFound(msg) + + return file_data, len(file_data) + def _get_next_data(self, session): """ Gets the next block of data to be uploaded. This method is only used for data uploading. :return The data to transfer. diff --git a/Tribler/Core/Upgrade/db_upgrader.py b/Tribler/Core/Upgrade/db_upgrader.py index 69450299230..3a5d515259f 100644 --- a/Tribler/Core/Upgrade/db_upgrader.py +++ b/Tribler/Core/Upgrade/db_upgrader.py @@ -10,15 +10,18 @@ import logging import os from binascii import hexlify -from glob import iglob -from sqlite3 import Connection from shutil import rmtree +from sqlite3 import Connection + +from twisted.internet.threads import blockingCallFromThread from Tribler.Category.Category import Category from Tribler.Core.CacheDB.SqliteCacheDBHandler import TorrentDBHandler, MiscDBHandler from Tribler.Core.CacheDB.db_versions import LOWEST_SUPPORTED_DB_VERSION, LATEST_DB_VERSION from Tribler.Core.CacheDB.sqlitecachedb import str2bin from Tribler.Core.TorrentDef import TorrentDef +from Tribler.Core.Utilities.twisted_thread import reactor +from Tribler.Core.torrentstore import TorrentStore class VersionNoLongerSupportedError(Exception): @@ -30,16 +33,18 @@ class DatabaseUpgradeError(Exception): class DBUpgrader(object): + """ Migration tool for upgrading the collected torrent files/thumbnails on disk structure from Tribler version 6.3 to 6.4. """ - def __init__(self, session, db, status_update_func=None): + def __init__(self, session, db, torrent_store, status_update_func=None): self._logger = logging.getLogger(self.__class__.__name__) self.session = session self.db = db - self.status_update_func = status_update_func if status_update_func else lambda _:None + self.status_update_func = status_update_func if status_update_func else lambda _: None + self.torrent_store = torrent_store self.failed = True self.torrent_collecting_dir = self.session.get_torrent_collecting_dir() @@ -64,6 +69,10 @@ def start_migrate(self): if self.db.version == 23: self._upgrade_23_to_24() + # version 24 -> 25 (25 is also a dummy version, where the torrent files get migrated to a levedb based store. + if self.db.version == 24: + self._upgrade_24_to_25() + # check if we managed to upgrade to the latest DB version. if self.db.version == LATEST_DB_VERSION: self.status_update_func(u"Database upgrade finished.") @@ -333,25 +342,35 @@ def _upgrade_23_to_24(self): # update database version self.db.write_version(24) + def _upgrade_24_to_25(self): + self.status_update_func(u"Upgrading database from v%s to v%s..." % (24, 25)) + + # update database version (that one was easy :D) + self.db.write_version(25) + def reimport_torrents(self): """Import all torrent files in the collected torrent dir, all the files already in the database will be ignored. """ + self.status_update_func("Opening TorrentDBHandler...") # TODO(emilon): That's a freakishly ugly hack. torrent_db_handler = TorrentDBHandler(self.session) torrent_db_handler.misc_db = MiscDBHandler(self.session) torrent_db_handler.misc_db.initialize() torrent_db_handler.category = Category.getInstance() + # TODO(emilon): It would be nice to drop the corrupted torrent data from the store as a bonus. + self.status_update_func("Registering recovered torrents...") try: - for torrent_file in iglob(os.path.join(self.torrent_collecting_dir, "*.torrent")): - torrentdef = TorrentDef.load(torrent_file) + for infoshash_str, torrent_data in self.torrent_store.itervalues(): + self.status_update_func("> %s" % infoshash_str) + torrentdef = TorrentDef.load_from_memory(torrent_data) if torrentdef.is_finalized(): infohash = torrentdef.get_infohash() if not torrent_db_handler.hasTorrent(infohash): self.status_update_func(u"Registering recovered torrent: %s" % hexlify(infohash)) - torrent_db_handler._addTorrentToDB(torrentdef, source="BC", extra_info={"filename":torrent_file}) - + torrent_db_handler._addTorrentToDB(torrentdef, source="BC", extra_info={"filename": infoshash_str}) finally: torrent_db_handler.close() Category.delInstance() self.db.commit_now() + return self.torrent_store.flush() diff --git a/Tribler/Core/Upgrade/torrent_upgrade64.py b/Tribler/Core/Upgrade/torrent_upgrade64.py index 566d3289ad3..033514fa52b 100755 --- a/Tribler/Core/Upgrade/torrent_upgrade64.py +++ b/Tribler/Core/Upgrade/torrent_upgrade64.py @@ -142,7 +142,7 @@ def update_status(): if self.total_swift_file_count > 0: progress = float(self.swift_files_deleted) / self.total_swift_file_count progress *= 100 - self.status_update_func(u"Deleting swift files %.2f%%..." % progress) + self.status_update_func(u"Deleting swift files %.1f%%..." % progress) for root, _, files in os.walk(self.torrent_collecting_dir): for name in files: diff --git a/Tribler/Core/Upgrade/torrent_upgrade65.py b/Tribler/Core/Upgrade/torrent_upgrade65.py new file mode 100644 index 00000000000..14081238af2 --- /dev/null +++ b/Tribler/Core/Upgrade/torrent_upgrade65.py @@ -0,0 +1,118 @@ +# torrent_upgrade65.py --- +# +# Filename: torrent_upgrade65.py +# Description: +# Author: Elric Milon +# Maintainer: +# Created: Tue Jan 27 15:50:05 2015 (+0100) + +# Commentary: +# +# +# +# + +# Change Log: +# +# +# +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or (at +# your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Emacs. If not, see . +# +# + +# Code: +import os +from binascii import hexlify +from shutil import rmtree + +from .torrent_upgrade64 import TorrentMigrator64 +from Tribler.Core.TorrentDef import TorrentDef + + +class TorrentMigrator65(TorrentMigrator64): + + def __init__(self, session, db, torrent_store, status_update_func=None): + super(TorrentMigrator65, self).__init__(session, db, status_update_func) + self.torrent_store = torrent_store + + def _migrate_torrent_collecting_dir(self): + """ + Migrates the torrent collecting directory. + """ + # check and create the temporary migration directory if necessary + + if not os.path.isdir(self.torrent_collecting_dir): + raise RuntimeError(u"The torrent collecting directory doesn't exist: %s", self.torrent_collecting_dir) + + self._delete_swift_reseeds() + + # get total file numbers and then start cleaning up + self._get_total_file_count() + self._delete_swift_files() + self._ingest_torrent_files() + + # delete all directories in the torrent collecting directory, we don't migrate thumbnails + self._delete_all_directories() + + # replace the old directory with the new one + rmtree(self.torrent_collecting_dir) + + # create the empty file to indicate that we have finished the torrent collecting directory migration + open(self.tmp_migration_tcd_file, "wb").close() + + # set the unused torrent collecting dir to a dir that already exists + # so it stops being recreated. + self.session.set_torrent_collecting_dir(self.session.get_torrent_store_dir()) + + + def _ingest_torrent_files(self): + """ + Renames all the torrent files to INFOHASH.torrent and delete unparseable ones. + """ + def update_status(): + progress = 1.0 + if self.total_torrent_file_count > 0: + progress = float(self.total_torrent_files_processed) / self.total_torrent_file_count + progress *= 100 + self.status_update_func(u"Ingesting torrent files %.1f%% (%d/%d)..." + % (progress, self.torrent_files_migrated, + self.torrent_files_dropped)) + + self.status_update_func("Ingesting torrent files...") + for root, _, files in os.walk(self.torrent_collecting_dir): + for name in files: + file_path = os.path.join(root, name) + try: + tdef = TorrentDef.load(file_path) + with open(file_path, 'rb') as torrent_file: + self.torrent_store[hexlify(tdef.infohash)] = torrent_file.read() + # self.torrent_store[hexlify(tdef.infohash)] = tdef.encode() + self.torrent_files_migrated += 1 + except Exception as e: + self._logger.error(u"dropping corrupted torrent file %s: %s", file_path, str(e)) + self.torrent_files_dropped += 1 + os.unlink(file_path) + self.total_torrent_files_processed += 1 + if not self.total_torrent_files_processed % 2000: + self.torrent_store.flush() + update_status() + + # We don't want to walk through the child directories + break + self.status_update_func("All torrent files processed.") + + +# +# torrent_upgrade65.py ends here diff --git a/Tribler/Core/Upgrade/upgrade.py b/Tribler/Core/Upgrade/upgrade.py index b75f17f6809..f46ec3e1d54 100644 --- a/Tribler/Core/Upgrade/upgrade.py +++ b/Tribler/Core/Upgrade/upgrade.py @@ -2,9 +2,12 @@ import os import shutil +from twisted.internet.defer import inlineCallbacks + from Tribler.Core.CacheDB.db_versions import LATEST_DB_VERSION from Tribler.Core.Upgrade.db_upgrader import DBUpgrader -from Tribler.Core.Upgrade.torrent_upgrade64 import TorrentMigrator64 +from Tribler.Core.Upgrade.torrent_upgrade65 import TorrentMigrator65 +from Tribler.Core.torrentstore import TorrentStore from Tribler.dispersy.util import call_on_reactor_thread @@ -31,6 +34,7 @@ def update_status(self, status_text): self.current_status = status_text @call_on_reactor_thread + @inlineCallbacks def check_and_upgrade(self): """ Checks the database version and upgrade if it is not the latest version. """ @@ -45,20 +49,25 @@ def check_and_upgrade(self): else: # upgrade try: - torrent_migrator = TorrentMigrator64(self.session, self.db, status_update_func=self.update_status) - torrent_migrator.start_migrate() + torrent_store = TorrentStore(self.session.get_torrent_store_dir()) + torrent_migrator = TorrentMigrator65(self.session, self.db, torrent_store=torrent_store, status_update_func=self.update_status) + yield torrent_migrator.start_migrate() - db_migrator = DBUpgrader(self.session, self.db, status_update_func=self.update_status) - db_migrator.start_migrate() + db_migrator = DBUpgrader(self.session, self.db, torrent_store=torrent_store, status_update_func=self.update_status) + yield db_migrator.start_migrate() # Import all the torrent files not in the database, we do this in # case we have some unhandled torrent files left due to # bugs/crashes, etc. - db_migrator.reimport_torrents() + self.update_status("Recovering unregistered torrents...") + yield db_migrator.reimport_torrents() + + yield torrent_store.close() + del torrent_store self.failed = False except Exception as e: - self._logger.error(u"failed to upgrade: %s", e) + self._logger.exception(u"failed to upgrade: %s", e) if self.failed: self._stash_database_away() diff --git a/Tribler/Core/defaults.py b/Tribler/Core/defaults.py index 7b0a2ba6767..0974ee5e04d 100644 --- a/Tribler/Core/defaults.py +++ b/Tribler/Core/defaults.py @@ -34,7 +34,7 @@ # Version 4: remove swift # -SESSDEFAULTS_VERSION = 4 +SESSDEFAULTS_VERSION = 6 sessdefaults = OrderedDict() # General Tribler settings @@ -74,6 +74,11 @@ sessdefaults['torrent_checking']['enabled'] = 1 sessdefaults['torrent_checking']['torrent_checking_period'] = 31 # will be changed to min(max(86400/ntorrents, 15), 300) at runtime +# Torrent store settings +sessdefaults['torrent_store'] = OrderedDict() +sessdefaults['torrent_store']['enabled'] = True +sessdefaults['torrent_store']['dir'] = None + # Torrent collecting settings sessdefaults['torrent_collecting'] = OrderedDict() sessdefaults['torrent_collecting']['enabled'] = True diff --git a/Tribler/Core/simpledefs.py b/Tribler/Core/simpledefs.py index f68e51f4674..ffe446c78bb 100644 --- a/Tribler/Core/simpledefs.py +++ b/Tribler/Core/simpledefs.py @@ -42,6 +42,7 @@ STATEDIR_DLPSTATE_DIR = 'dlcheckpoints' STATEDIR_PEERICON_DIR = 'icons' STATEDIR_TORRENTCOLL_DIR = 'collected_torrent_files' +STATEDIR_TORRENT_STORE_DIR = 'collected_torrents' STATEDIR_SESSCONFIG = 'libtribler.conf' diff --git a/Tribler/Core/torrentstore.py b/Tribler/Core/torrentstore.py new file mode 100644 index 00000000000..3e05006560f --- /dev/null +++ b/Tribler/Core/torrentstore.py @@ -0,0 +1,139 @@ +# torrentstore.py --- +# +# Filename: torrentstore.py +# Description: +# Author: Elric Milon +# Maintainer: +# Created: Wed Jan 21 14:22:08 2015 (+0100) + +# Commentary: +# +# +# +# + +# Change Log: +# +# +# +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or (at +# your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Emacs. If not, see . +# +# + +# Code: +from collections import MutableMapping +from itertools import chain + +from leveldb import LevelDB, WriteBatch +from twisted.internet import reactor +from twisted.internet.defer import succeed +from twisted.internet.task import LoopingCall +from twisted.internet.threads import deferToThread + +from Tribler.dispersy.taskmanager import TaskManager + + +WRITEBACK_PERIOD = 120 + +# TODO(emilon): This could be easily abstracted into a generic cached store +# TODO(emilon): Make sure the caching makes an actual difference in IO and kill +# it if it doesn't as it complicates the code. + + +class TorrentStore(MutableMapping, TaskManager): + _reactor = reactor + + def __init__(self, store_dir): + super(TorrentStore, self).__init__() + + self._store_dir = store_dir + self._pending_torrents = {} + self._db = LevelDB(store_dir) + + self._writeback_lc = self.register_task("flush cache ", LoopingCall(self.flush)) + self._writeback_lc.clock = self._reactor + self._writeback_lc.start(WRITEBACK_PERIOD) + + def __getitem__(self, key): + try: + return self._pending_torrents[key] + except KeyError: + return self._db.Get(key) + + def __setitem__(self, key, value): + self._pending_torrents[key] = value + # self._db.Put(key, value) + + def __delitem__(self, key): + if key in self._pending_torrents: + self._pending_torrents.pop(key) + self._db.Delete(key) + + def __iter__(self): + for k in self._pending_torrents.iterkeys(): + yield k + for k, _ in self._db.RangeIter(): + yield k + + def __contains__(self, key): + if key in self._pending_torrents: + return True + try: + self.__getitem__(key) + return True + except KeyError: + pass + + return False + + def __len__(self): + return len(self._pending_torrents) + len(list(self.keys())) + + def keys(self): + return [k for k, _ in self._db.RangeIter()] + + def iteritems(self): + return chain(self._pending_torrents, self._db.RangeIter()) + + def put(self, k, v): + self.__setitem__(k, v) + + def rangescan(self, start=None, end=None): + if start is None and end is None: + return self._db.RangeIter() + elif end is None: + return self._db.RangeIter(start) + else: + return self._db.RangeIter(start, end) + + def flush(self): + if self._pending_torrents: + write_batch = WriteBatch() + for k, v in self._pending_torrents.iteritems(): + write_batch.Put(k, v) + self._pending_torrents.clear() + return deferToThread(self._db.Write, write_batch) + else: + return succeed(None) + + def close(self): + self.cancel_all_pending_tasks() + d = self.flush() + self._db = None + return d + + +# +# torrentstore.py ends here diff --git a/Tribler/Main/Build/Mac/setuptriblermac.py b/Tribler/Main/Build/Mac/setuptriblermac.py index d6c3c8966b3..21ae2d44772 100644 --- a/Tribler/Main/Build/Mac/setuptriblermac.py +++ b/Tribler/Main/Build/Mac/setuptriblermac.py @@ -15,7 +15,7 @@ # modules to include into bundle includeModules = ["encodings.hex_codec", "encodings.utf_8", "encodings.latin_1", "xml.sax", "email.iterators", "netifaces", "apsw", "libtorrent", "twisted", "M2Crypto", "pycrypto", "pyasn1", "Image" - "urllib3", "requests"] + "urllib3", "requests", "leveldb"] # gui panels to include includePanels = [ diff --git a/Tribler/Main/Build/Win32/setuptribler.py b/Tribler/Main/Build/Win32/setuptribler.py index 95eb88ee227..8d57223cdaa 100644 --- a/Tribler/Main/Build/Win32/setuptribler.py +++ b/Tribler/Main/Build/Win32/setuptribler.py @@ -49,13 +49,15 @@ ["Tribler.Core.DecentralizedTracking.pymdht.core", "Tribler.Main.tribler_main", "netifaces", "csv", "cherrypy", "twisted", "apsw", "libtorrent", "M2Crypto", - "zope.interface", "pyasn1", "gmpy", "Image", "requests"] + "zope.interface", "pyasn1", "gmpy", "Image", "requests", "leveldb"] setup( # (Disabling bundle_files for now -- apparently causes some issues with Win98) # options = {"py2exe": {"bundle_files": 1}}, # zipfile = None, - options={"py2exe": {"packages": packages, "optimize": 2}}, + options={"py2exe": {"packages": packages, + "optimize": 2, + "dll_excludes": ["mswsock.dll"]}}, data_files=[("installdir", [])], windows=[target], ) diff --git a/Tribler/Main/Dialogs/SaveAs.py b/Tribler/Main/Dialogs/SaveAs.py index 60c02f5497d..7acad9f5b68 100644 --- a/Tribler/Main/Dialogs/SaveAs.py +++ b/Tribler/Main/Dialogs/SaveAs.py @@ -109,8 +109,8 @@ def __init__(self, parent, tdef, defaultdir, defaultname, selectedFiles=None): torrent = Torrent.fromTorrentDef(tdef) torrentsearch_manager = self.guiutility.torrentsearch_manager - def callback(saveas_id, torrent_filename): - tdef = TorrentDef.load(torrent_filename) + def callback(saveas_id, infohash): + tdef = TorrentDef.load_from_memory(self.utility.session.lm.torrent_store.get(infohash)) event = CollectedEvent(tdef=tdef) saveas = wx.FindWindowById(saveas_id) if saveas: diff --git a/Tribler/Main/vwxGUI/SearchGridManager.py b/Tribler/Main/vwxGUI/SearchGridManager.py index e297ff45897..09d02f2ab2d 100644 --- a/Tribler/Main/vwxGUI/SearchGridManager.py +++ b/Tribler/Main/vwxGUI/SearchGridManager.py @@ -271,17 +271,12 @@ def loadTorrent(self, torrent, callback=None): if 'no-DHT' in trackers: trackers.remove('no-DHT') - if len(files) > 0: - # We still call getTorrent to fetch .torrent - self.getTorrent(torrent, None) - torrent = NotCollectedTorrent(torrent, files, trackers) - else: - torrent_callback = lambda torfilename: self.loadTorrent(torrent, callback) - torrent_filename = self.getTorrent(torrent, torrent_callback) + # We still call getTorrent to fetch .torrent + self.getTorrent(torrent, None) + + torrent = NotCollectedTorrent(torrent, files, trackers) - if torrent_filename[0]: - return torrent_filename[1] else: try: tdef = TorrentDef.load(torrent_filename) diff --git a/Tribler/Test/API/test_seeding.py b/Tribler/Test/API/test_seeding.py index eb3f1a6efa1..54650dca18e 100644 --- a/Tribler/Test/API/test_seeding.py +++ b/Tribler/Test/API/test_seeding.py @@ -62,19 +62,22 @@ def setup_seeder(self, filename='video.avi'): self.torrentfn = os.path.join(self.session.get_state_dir(), "gen.torrent") self.tdef.save(self.torrentfn) - self._logger.debug("setup_seeder: name is %s", self.tdef.metainfo['info']['name']) + self._logger.debug("name is %s", self.tdef.metainfo['info']['name']) self.dscfg = DownloadStartupConfig() self.dscfg.set_dest_dir(os.path.join(BASE_DIR, "API")) # basedir of the file we are seeding d = self.session.start_download(self.tdef, self.dscfg) d.set_state_callback(self.seeder_state_callback) - self._logger.debug("setup_seeder: starting to wait for download to reach seeding state") + self._logger.debug("starting to wait for download to reach seeding state") assert self.seeding_event.wait(60) def seeder_state_callback(self, ds): d = ds.get_download() - self._logger.debug("seeder: %s %s %s ", repr(d.get_def().get_name()), dlstatus_strings[ds.get_status()], ds.get_progress()) + self._logger.debug("seeder status: %s %s %s", + repr(d.get_def().get_name()), + dlstatus_strings[ds.get_status()], + ds.get_progress()) if ds.get_status() == DLSTATUS_SEEDING: self.seeding_event.set() @@ -125,7 +128,10 @@ def subtest_download(self): def downloader_state_callback(self, ds): d = ds.get_download() - self._logger.debug("download: %s %s %s", repr(d.get_def().get_name()), dlstatus_strings[ds.get_status()], ds.get_progress()) + self._logger.debug("download status: %s %s %s", + repr(d.get_def().get_name()), + dlstatus_strings[ds.get_status()], + ds.get_progress()) if ds.get_status() == DLSTATUS_SEEDING: # File is in diff --git a/Tribler/Test/test_as_server.py b/Tribler/Test/test_as_server.py index 57d55c42523..82db5115354 100644 --- a/Tribler/Test/test_as_server.py +++ b/Tribler/Test/test_as_server.py @@ -149,6 +149,7 @@ def setUpPreSession(self): self.config.set_megacache(False) self.config.set_dispersy(False) self.config.set_mainline_dht(False) + self.config.set_torrent_store(False) self.config.set_torrent_collecting(False) self.config.set_libtorrent(False) self.config.set_dht_torrent_collecting(False) diff --git a/Tribler/Test/test_remote_search.py b/Tribler/Test/test_remote_search.py index f3211fd8f8b..9b1e3c97299 100644 --- a/Tribler/Test/test_remote_search.py +++ b/Tribler/Test/test_remote_search.py @@ -9,6 +9,10 @@ class TestRemoteQuery(TestGuiAsServer): + def setUpPreSession(self): + super(TestRemoteQuery, self).setUpPreSession() + self.config.set_torrent_store(True) + def test_remotesearch(self): def do_assert(): self.screenshot('After doing mp3 search, got %d results' % self.frame.searchlist.GetNrResults()) diff --git a/Tribler/Test/test_remote_torrent_handler.py b/Tribler/Test/test_remote_torrent_handler.py index e087a87bafc..68ab59219cd 100644 --- a/Tribler/Test/test_remote_torrent_handler.py +++ b/Tribler/Test/test_remote_torrent_handler.py @@ -13,12 +13,21 @@ from unittest import skip class TestRemoteTorrentHandler(TestAsServer): + """ Tests the download_torrent() method of TestRemoteTorrentHandler. """ + def __init__(self, *argv, **kwargs): + super(TestRemoteTorrentHandler, self).__init__(*argv, **kwargs) + + self.file_names = {} + self.infohash_strs = {} + self.infohashes = {} + def setUpPreSession(self): super(TestRemoteTorrentHandler, self).setUpPreSession() self.config.set_dispersy(True) + self.config.set_torrent_store(True) def tearDown(self): self._shutdown_session(self.session2) @@ -27,18 +36,17 @@ def tearDown(self): super(TestRemoteTorrentHandler, self).tearDown() def test_torrentdownload(self): - print >> sys.stderr, u"Start torrent download test..." + self._logger.info(u"Start torrent download test...") def do_check_download(torrent_file=None): - des_file_path = os.path.join(self.session2.get_torrent_collecting_dir(), self.file_name1) - self.assertTrue(os.path.exists(des_file_path) and os.path.isfile(des_file_path), - u"Failed to download torrent file 1.") - des_file_path = os.path.join(self.session2.get_torrent_collecting_dir(), self.file_name2) - self.assertTrue(os.path.exists(des_file_path) and os.path.isfile(des_file_path), - u"Failed to download torrent file 2.") + for i, infohash_str in enumerate(self.infohash_strs): + self._logger.info(u"Checking... %s", self.file_names[i]) + for item in self.session2.lm.torrent_store.iterkeys(): + self.assertTrue(infohash_str in self.session2.lm.torrent_store, + u"Failed to download torrent file 1.") - print >> sys.stderr, u"Torrent files 1 and 2 downloaded successfully." + self._logger.info(u"Torrent files 1 and 2 downloaded successfully.") self.download_event.set() self.quit() @@ -48,8 +56,8 @@ def do_start_download(): @call_on_reactor_thread def _start_download(): candidate = Candidate(("127.0.0.1", self.session1_port), False) - self.session2.lm.rtorrent_handler.download_torrent(candidate, self.infohash1) - self.session2.lm.rtorrent_handler.download_torrent(candidate, self.infohash2, + self.session2.lm.rtorrent_handler.download_torrent(candidate, self.infohashes[0]) + self.session2.lm.rtorrent_handler.download_torrent(candidate, self.infohashes[1], user_callback=do_check_download) _start_download() @@ -64,25 +72,16 @@ def setup_torrentdownloader(self): self.download_event = Event() self.session1_port = self.session.get_dispersy_port() - infohash1_str = "41aea20908363a80d44234e8fef07fab506cd3b4" - infohash2_str = "45a647b1120ed9fe7f793e17585efb4b0efdf1a5" - - self.infohash1 = infohash1_str.decode('hex') - self.file_name1 = u"%s.torrent" % infohash1_str + self.infohash_strs = ["41aea20908363a80d44234e8fef07fab506cd3b4", + "45a647b1120ed9fe7f793e17585efb4b0efdf1a5"] - self.infohash2 = infohash2_str.decode('hex') - self.file_name2 = u"%s.torrent" % infohash2_str + for i, infohash in enumerate(self.infohash_strs): + self.infohashes[i] = infohash.decode('hex') + self.file_names[i] = file_name = u"%s.torrent" % infohash - # copy file to the uploader's torrent_collecting_dir - src_file_path1 = os.path.join(BASE_DIR, u"data", self.file_name1) - des_file_path1 = os.path.join(self.session.get_torrent_collecting_dir(), self.file_name1) - copyfile(src_file_path1, des_file_path1) - - src_file_path2 = os.path.join(BASE_DIR, u"data", self.file_name2) - des_file_path2 = os.path.join(self.session.get_torrent_collecting_dir(), self.file_name2) - copyfile(src_file_path2, des_file_path2) - - print >> sys.stderr, u"Uploader's torrent_collect_dir = %s" % self.session.get_torrent_collecting_dir() + # Put the torrents into the uploader's store + with open(os.path.join(BASE_DIR, u"data", file_name), 'r') as torrent_file: + self.session.lm.torrent_store.put(infohash, torrent_file.read()) from Tribler.Core.Session import Session @@ -103,18 +102,16 @@ def setup_torrentdownloader(self): self.session2.start() sleep(1) - print >> sys.stderr, u"Downloader's torrent_collect_dir = %s" % self.session2.get_torrent_collecting_dir() - @skip("The metadata collecting is not used ATM, broken by the new torrent store stuff too") def test_metadatadownload(self): - print >> sys.stderr, u"Start metadata download test..." + self._logger.info(u"Start metadata download test...") def do_check_download(torrent_file=None): des_file_path = os.path.join(self.session2.get_torrent_collecting_dir(), self.metadata_dir) self.assertTrue(os.path.exists(des_file_path) and os.path.isdir(des_file_path), u"Failed to download metadata.") - print >> sys.stderr, u"metadata downloaded successfully." + self._logger.info(u"metadata downloaded successfully.") self.quit() self.download_event.set() @@ -144,7 +141,7 @@ def setup_metadatadownloader(self): # copy file to the uploader's torrent_collecting_dir src_dir_path = os.path.join(BASE_DIR, u"data", self.metadata_dir) des_dir_path = os.path.join(self.session.get_torrent_collecting_dir(), self.metadata_dir) - print >> sys.stderr, u"Uploader's torrent_collect_dir = %s" % self.session.get_torrent_collecting_dir() + self._logger.info(u"Uploader's torrent_collect_dir = %s", self.session.get_torrent_collecting_dir()) copytree(src_dir_path, des_dir_path) from Tribler.Core.Session import Session @@ -166,6 +163,6 @@ def setup_metadatadownloader(self): self.session2.start() sleep(1) - print >> sys.stderr, u"Downloader's torrent_collect_dir = %s" % self.session2.get_torrent_collecting_dir() - print >> sys.stderr, u"Uploader port: %s, Downloader port: %s" % (self.session1_port, - self.session2.get_dispersy_port()) + self._logger.info(u"Downloader's torrent_collect_dir = %s", self.session2.get_torrent_collecting_dir()) + self._logger.info(u"Uploader port: %s, Downloader port: %s", + self.session1_port, self.session2.get_dispersy_port()) diff --git a/Tribler/Test/test_sqlitecachedb_upgrade.py b/Tribler/Test/test_sqlitecachedb_upgrade.py index d259c10f78f..314a61d3915 100644 --- a/Tribler/Test/test_sqlitecachedb_upgrade.py +++ b/Tribler/Test/test_sqlitecachedb_upgrade.py @@ -40,7 +40,13 @@ def test_upgrade_from_obsolete_version(self): self.sqlitedb = SQLiteCacheDB(self.session) self.sqlitedb.initialize(dbpath) - db_migrator = DBUpgrader(self.session, self.sqlitedb) + class MockTorrentStore(object): + def flush(): + pass + def close(): + pass + + db_migrator = DBUpgrader(self.session, self.sqlitedb, torrent_store=MockTorrentStore()) self.assertRaises(VersionNoLongerSupportedError, db_migrator.start_migrate) def test_upgrade_from_17(self): diff --git a/Tribler/Test/test_torrent_store.py b/Tribler/Test/test_torrent_store.py new file mode 100644 index 00000000000..bf28b65e81a --- /dev/null +++ b/Tribler/Test/test_torrent_store.py @@ -0,0 +1,109 @@ +# test_torrent_store.py --- +# +# Filename: test_torrent_store.py +# Description: +# Author: Elric Milon +# Maintainer: +# Created: Wed Jan 21 12:45:30 2015 (+0100) + +# Commentary: +# +# +# +# + +# Change Log: +# +# +# +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or (at +# your option) any later version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Emacs. If not, see . +# +# + +# Code: +import unittest +from shutil import rmtree +from tempfile import mkdtemp + +from twisted.internet.defer import inlineCallbacks +from twisted.internet.task import Clock + +from Tribler.Core.Utilities.twisted_thread import deferred +from Tribler.Core.torrentstore import TorrentStore, WRITEBACK_PERIOD + + +K = "foo" +V = "bar" + + +class ClockedTorrentStore(TorrentStore): + _reactor = Clock() + + +class TestTorrentStore(unittest.TestCase): + + def setUp(self): + self.openStore(mkdtemp(prefix=__name__)) + + def tearDown(self): + self.closeStore() + + @deferred(timeout=5) + @inlineCallbacks + def closeStore(self): + yield self.store.close() + self.store = None + rmtree(self.store_dir) + + def openStore(self, store_dir): + self.store_dir = store_dir + self.store = ClockedTorrentStore(store_dir=self.store_dir) + + @deferred(timeout=5) + @inlineCallbacks + def test_storeIsPersistent(self): + self.store.put(K, V) + self.assertEqual(self.store.get(K), V) + store_dir = self.store._store_dir + yield self.store.close() + self.openStore(store_dir) + self.assertEqual(self.store.get(K), V) + + def test_canPutAndDelete(self): + self.store[K] = V + self.assertEqual(self.store[K], V) + del self.store[K] + self.assertEqual(None, self.store.get(K)) + with self.assertRaises(KeyError) as raises: + self.store[K] + + def test_cacheIsFlushed(self): + self.store[K] = V + self.assertEqual(1, len(self.store._pending_torrents)) + self.store._reactor.advance(WRITEBACK_PERIOD) + self.assertEqual(0, len(self.store._pending_torrents)) + + @deferred(timeout=5) + @inlineCallbacks + def test_len(self): + self.assertEqual(0, len(self.store)) + self.store[K] = V + self.assertEqual(1, len(self.store), 1) + # test that even after writing the cached data, the lenght is still the same + yield self.store.flush() + self.assertEqual(1, len(self.store), 2) + +# +# test_torrent_store.py ends here diff --git a/Tribler/schema_sdb_v24.sql b/Tribler/schema_sdb_v25.sql similarity index 100% rename from Tribler/schema_sdb_v24.sql rename to Tribler/schema_sdb_v25.sql diff --git a/debian/control b/debian/control index 28b220b5b98..e1067a64b7f 100644 --- a/debian/control +++ b/debian/control @@ -21,6 +21,7 @@ Depends: ${misc:Depends}, python-crypto, python-feedparser, python-gmpy, + python-leveldb, python-libtorrent (>= 0.16.4), python-m2crypto, python-netifaces,