Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Torrent store implementation, tests and migration code #1175

Merged
merged 18 commits into from
Feb 2, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ We make use of submodules, so remember using the --recursive argument when cloni

### Debian/Ubuntu/Mint
```bash
sudo apt-get install libav-tools libjs-excanvas libjs-mootools libx11-6 python-apsw python-cherrypy3 python-crypto python-feedparser python-gmpy python-libtorrent python-m2crypto python-netifaces python-pil python-pyasn1 python-requests python-twisted python-wxgtk2.8 python2.7 vlc
sudo apt-get install libav-tools libjs-excanvas libjs-mootools libx11-6 python-apsw python-leveldb python-cherrypy3 python-crypto python-feedparser python-gmpy python-libtorrent python-m2crypto python-netifaces python-pil python-pyasn1 python-requests python-twisted python-wxgtk2.8 python2.7 vlc
```

### Windows and OSX
Expand Down
9 changes: 9 additions & 0 deletions Tribler/Core/APIImplementation/LaunchManyCore.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from Tribler.Core.osutils import get_readable_torrent_name
from Tribler.Core.simpledefs import (NTFY_DISPERSY, NTFY_STARTED, NTFY_TORRENTS, NTFY_UPDATE, NTFY_INSERT,
NTFY_ACTIVITIES, NTFY_REACHABLE, NTFY_ACT_UPNP)
from Tribler.Core.torrentstore import TorrentStore
from Tribler.Main.globals import DefaultDownloadStartupConfig
from Tribler.community.tunnel.crypto.elgamalcrypto import ElgamalCrypto
from Tribler.dispersy.util import blockingCallFromThread
Expand Down Expand Up @@ -76,6 +77,7 @@ def __init__(self):
self.rawserver = None
self.multihandler = None

self.torrent_store = None
self.rtorrent_handler = None
self.tftp_handler = None

Expand Down Expand Up @@ -113,6 +115,9 @@ def register(self, session, sesslock, autoload_discovery=True):

self.multihandler = MultiHandler(self.rawserver, self.sessdoneflag)

if self.session.get_torrent_store():
self.torrent_store = TorrentStore(self.session.get_torrent_store_dir())

# torrent collecting: RemoteTorrentHandler
if self.session.get_torrent_collecting():
from Tribler.Core.RemoteTorrentHandler import RemoteTorrentHandler
Expand Down Expand Up @@ -656,6 +661,10 @@ def early_shutdown(self):
self.tftp_handler.shutdown()
self.tftp_handler = None

if self.torrent_store is not None:
self.torrent_store.close()
self.torrent_store = None

if self.dispersy:
self._logger.info("lmc: Shutting down Dispersy...")
now = timemod.time()
Expand Down
7 changes: 5 additions & 2 deletions Tribler/Core/CacheDB/db_versions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
# 18 is used by Tribler 6.1.x - 6.2.0
# 22 is used by Tribler 6.3.x
# 23 is used by Tribler 6.4.0 RC1
# 24 is used by Tribler 6.4.0 RC2
# 24 is used by Tribler 6.4.0 RC2 - 6.4.X
# 25 is used by Tribler 6.5-git

TRIBLER_59_DB_VERSION = 17
TRIBLER_60_DB_VERSION = 17
Expand All @@ -17,8 +18,10 @@

TRIBLER_64RC2_DB_VERSION = 24

TRIBLER_65PRE_DB_VERSION = 25

# the lowest supported database version number
LOWEST_SUPPORTED_DB_VERSION = TRIBLER_59_DB_VERSION

# the latest database version number
LATEST_DB_VERSION = TRIBLER_64RC2_DB_VERSION
LATEST_DB_VERSION = TRIBLER_65PRE_DB_VERSION
50 changes: 28 additions & 22 deletions Tribler/Core/RemoteTorrentHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from Tribler.Core.TorrentDef import TorrentDef
from Tribler.Core.simpledefs import NTFY_TORRENTS, INFOHASH_LENGTH

from Tribler.Core.torrentstore import TorrentStore

TORRENT_OVERFLOW_CHECKING_INTERVAL = 30 * 60
LOW_PRIO_COLLECTING = 0
Expand Down Expand Up @@ -140,39 +140,44 @@ def download_torrent(self, candidate, infohash, user_callback=None, priority=1,
callback = lambda ih = infohash: user_callback(ih)
self.torrent_callbacks.setdefault(infohash, set()).add(callback)

def get_torrent_filename(self, infohash):
return u"%s.torrent" % hexlify(infohash)

def get_torrent_filepath(self, infohash):
return os.path.join(self.tor_col_dir, self.get_torrent_filename(infohash))

def has_torrent(self, infohash):
return os.path.exists(self.get_torrent_filepath(infohash))

@call_on_reactor_thread
def save_torrent(self, tdef, callback=None):
infohash = tdef.get_infohash()
file_path = self.get_torrent_filepath(infohash)
infohash_str = hexlify(infohash)

if not self.has_torrent(infohash):
if self.session.lm.torrent_store == None:
self._logger.error("Torrent store is not loaded")
return

# TODO(emilon): could we check the database instead of the store?
# Checking if a key is present fetches the whole torrent from disk if its
# not on the writeback cache.
if infohash_str not in self.session.lm.torrent_store:
# save torrent to file
try:
tdef.save(file_path)
bdata = tdef.encode()

except Exception as e:
self._logger(u"failed to save torrent %s: %s", file_path, e)
self._logger.error(u"failed to encode torrent %s: %s", infohash_str, e)
return
try:
self.session.lm.torrent_store[infohash_str] = bdata
except Exception as e:
self._logger.error(u"failed to store torrent data for %s, exception was: %s", infohash_str, e)

# add torrent to database
if self.torrent_db.hasTorrent(infohash):
self.torrent_db.updateTorrent(infohash, torrent_file_name=file_path)
self.torrent_db.updateTorrent(infohash, torrent_file_name="lvl")
else:
self.torrent_db.addExternalTorrent(tdef, extra_info={u"filename": file_path, u"status": u"good"})
self.torrent_db.addExternalTorrent(tdef, extra_info={u"filename": "lvl", u"status": u"good"})

if callback:
# TODO(emilon): should we catch exceptions from the callback?
callback()

# TODO(emilon): remove all the torrent_file_name references in the callback chain
# notify all
self.notify_possible_torrent_infohash(infohash, file_path)
self.notify_possible_torrent_infohash(infohash, infohash_str)

@call_on_reactor_thread
def download_torrentmessage(self, candidate, infohash, user_callback=None, priority=1):
Expand Down Expand Up @@ -234,6 +239,7 @@ def save_metadata(self, infohash, thumbnail_subpath, data):

self.notify_possible_metadata_infohash(infohash, thumbnail_subpath)

# TODO(emilon): HERE
def notify_possible_torrent_infohash(self, infohash, torrent_file_name=None):
if infohash not in self.torrent_callbacks:
return
Expand Down Expand Up @@ -564,15 +570,15 @@ def _do_request(self):
if thumbnail_subpath:
file_name = thumbnail_subpath
else:
file_name = self._remote_torrent_handler.get_torrent_filename(infohash)
file_name = hexlify(infohash)+'.torrent'

extra_info = {u"infohash": infohash, u"thumbnail_subpath": thumbnail_subpath}
# do not download if TFTP has been shutdown
if self._session.lm.tftp_handler is None:
return
self._session.lm.tftp_handler.download_file(file_name, ip, port, extra_info=extra_info,
success_callback=self._success_callback,
failure_callback=self._failure_callback)
success_callback=self._on_download_successful,
failure_callback=self._on_download_failed)
self._active_request_list.append(key)

def _clear_active_request(self, key):
Expand All @@ -581,7 +587,7 @@ def _clear_active_request(self, key):
self._active_request_list.remove(key)

@call_on_reactor_thread
def _success_callback(self, address, file_name, file_data, extra_info):
def _on_download_successful(self, address, file_name, file_data, extra_info):
self._logger.debug(u"successfully downloaded %s from %s:%s", file_name, address[0], address[1])

# metadata has thumbnail_subpath info
Expand Down Expand Up @@ -612,7 +618,7 @@ def _success_callback(self, address, file_name, file_data, extra_info):
self._start_pending_requests()

@call_on_reactor_thread
def _failure_callback(self, address, file_name, error_msg, extra_info):
def _on_download_failed(self, address, file_name, error_msg, extra_info):
self._logger.debug(u"failed to download %s from %s:%s: %s", file_name, address[0], address[1], error_msg)

# metadata has thumbnail_subpath info
Expand Down
39 changes: 26 additions & 13 deletions Tribler/Core/Session.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,22 @@
import socket

from Tribler.Core import NoDispersyRLock

from Tribler.Core.CacheDB.sqlitecachedb import SQLiteCacheDB
from Tribler.Core.Upgrade.upgrade import TriblerUpgrader

from Tribler.Core.APIImplementation.LaunchManyCore import TriblerLaunchMany
from Tribler.Core.APIImplementation.UserCallbackHandler import UserCallbackHandler
from Tribler.Core.CacheDB.sqlitecachedb import SQLiteCacheDB
from Tribler.Core.SessionConfig import SessionConfigInterface, SessionStartupConfig
from Tribler.Core.Upgrade.upgrade import TriblerUpgrader
from Tribler.Core.exceptions import NotYetImplementedException, OperationNotEnabledByConfigurationException
from Tribler.Core.osutils import get_appstate_dir, is_android
from Tribler.Core.simpledefs import (STATEDIR_TORRENTCOLL_DIR, STATEDIR_PEERICON_DIR, STATEDIR_DLPSTATE_DIR,
STATEDIR_SESSCONFIG, NTFY_MISC, NTFY_PEERS, NTFY_BUNDLERPREFERENCE,
NTFY_TORRENTS, NTFY_MYPREFERENCES, NTFY_VOTECAST, NTFY_CHANNELCAST, NTFY_UPDATE,
NTFY_USEREVENTLOG, NTFY_INSERT, NTFY_DELETE, NTFY_METADATA)
from Tribler.Core.osutils import get_appstate_dir
from Tribler.Core.simpledefs import (STATEDIR_PEERICON_DIR,
STATEDIR_DLPSTATE_DIR, STATEDIR_SESSCONFIG,
NTFY_MISC, NTFY_PEERS,
NTFY_BUNDLERPREFERENCE, NTFY_TORRENTS,
NTFY_MYPREFERENCES, NTFY_VOTECAST,
NTFY_CHANNELCAST, NTFY_UPDATE,
NTFY_USEREVENTLOG, NTFY_INSERT, NTFY_DELETE,
NTFY_METADATA, STATEDIR_TORRENT_STORE_DIR)


GOTM2CRYPTO = False
try:
Expand Down Expand Up @@ -92,8 +95,19 @@ def set_and_create_dir(dirname, setter, default_dir):
create_dir(dirname or default_dir)

set_and_create_dir(scfg.get_state_dir(), scfg.set_state_dir, Session.get_default_state_dir())
set_and_create_dir(scfg.get_torrent_collecting_dir(), scfg.set_torrent_collecting_dir, os.path.join(scfg.get_state_dir(), STATEDIR_TORRENTCOLL_DIR))
set_and_create_dir(scfg.get_peer_icon_path(), scfg.set_peer_icon_path, os.path.join(scfg.get_state_dir(), STATEDIR_PEERICON_DIR))
# Note that we are setting it to STATEDIR_TORRENT_STORE_DIR instead of
# STATEDIR_TORRENTCOLL_DIR as that dir is unused and only kept for
# the upgrade process.
set_and_create_dir(scfg.get_torrent_collecting_dir(),
scfg.set_torrent_collecting_dir,
os.path.join(scfg.get_state_dir(), STATEDIR_TORRENT_STORE_DIR))

set_and_create_dir(scfg.get_torrent_store_dir(),
scfg.set_torrent_store_dir,
os.path.join(scfg.get_state_dir(), STATEDIR_TORRENT_STORE_DIR))

set_and_create_dir(scfg.get_peer_icon_path(), scfg.set_peer_icon_path,
os.path.join(scfg.get_state_dir(), STATEDIR_PEERICON_DIR))

create_dir(os.path.join(scfg.get_state_dir(), u"sqlite"))

Expand Down Expand Up @@ -189,6 +203,7 @@ def del_instance():
Session.__single = None
del_instance = staticmethod(del_instance)

@staticmethod
def get_default_state_dir(homedirpostfix='.Tribler'):
""" Returns the factory default directory for storing session state
on the current platform (Win32,Mac,Unix).
Expand All @@ -207,8 +222,6 @@ def get_default_state_dir(homedirpostfix='.Tribler'):
statedir = os.path.join(appdir, homedirpostfix)
return statedir

get_default_state_dir = staticmethod(get_default_state_dir)

#
# Public methods
#
Expand Down
25 changes: 25 additions & 0 deletions Tribler/Core/SessionConfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,31 @@ def get_libtorrent_utp(self):
"""
return self.sessconfig.get(u'libtorrent', u'utp')

#
# Torrent file store
#
def get_torrent_store(self):
""" Returns whether to enable the torrent store.
@return Boolean. """
return self.sessconfig.get(u'torrent_store', u'enabled')

def set_torrent_store(self, value):
""" Store torrent files in a leveldb database (default = True).
@param value Boolean.
"""
self.sessconfig.set(u'torrent_store', u'enabled', value)

def get_torrent_store_dir(self):
""" Returns the torrent store directory.
@return str
"""
return self.sessconfig.get(u'torrent_store', u'dir')

def set_torrent_store_dir(self, value):
""" Store torrent store dir(default = state_dir/collected_torrents).
@param value str.
"""
self.sessconfig.set(u'torrent_store', u'dir', value)

#
# Torrent file collecting
Expand Down
17 changes: 16 additions & 1 deletion Tribler/Core/TFTP/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ def _handle_new_request(self, ip, port, packet):
if file_name.startswith(DIR_PREFIX):
file_data, file_size = self._load_directory(file_name)
else:
file_data, file_size = self._load_file(file_name)
file_data, file_size = self._load_torrent(file_name)
checksum = b64encode(sha1(file_data).digest())
except FileNotFound as e:
self._logger.warn(u"[READ %s:%s] file/dir not found: %s", ip, port, e)
Expand Down Expand Up @@ -371,6 +371,21 @@ def _load_directory(self, file_name):
# load the zip file as binary
return self._load_file(file_name, file_path=tmpfile_path)

def _load_torrent(self, file_name):
""" Loads a file into memory.
:param file_name: The path of the file.
"""

infohash=file_name[:-8] # len('.torrent') = 8

file_data = self.session.lm.torrent_store.get(infohash)
# check if file exists
if not file_data:
msg = u"Torrent not in store: %s" % infohash
raise FileNotFound(msg)

return file_data, len(file_data)

def _get_next_data(self, session):
""" Gets the next block of data to be uploaded. This method is only used for data uploading.
:return The data to transfer.
Expand Down
Loading