Skip to content

Commit

Permalink
Make DB upgrader run in foreground
Browse files Browse the repository at this point in the history
This commit makes DB upgrader from 72 to 73(Pony)
run in foreground (at the "spinning gears" screen).
"Skip" button is shown on the screen during the upgrading
process. A corresponding dialog window REST endpoint are added, too.
To speed up the upgrade process, disk synchronization is disabled.
This is safe since we don't touch the original database.
  • Loading branch information
ichorid committed May 3, 2019
1 parent bb827a4 commit 6420df6
Show file tree
Hide file tree
Showing 13 changed files with 315 additions and 125 deletions.
11 changes: 10 additions & 1 deletion Tribler/Core/Modules/MetadataStore/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def tick(self):


class MetadataStore(object):
def __init__(self, db_filename, channels_dir, my_key):
def __init__(self, db_filename, channels_dir, my_key, disable_sync=False):
self.db_filename = db_filename
self.channels_dir = channels_dir
self.my_key = my_key
Expand All @@ -112,6 +112,15 @@ def __init__(self, db_filename, channels_dir, my_key):
# at definition.
self._db = orm.Database()

# Possibly disable disk sync.
# !!! ACHTUNG !!! This should be used only for special cases (e.g. DB upgrades), because
# losing power during a write will corrupt the database.
if disable_sync:
@self._db.on_connect(provider='sqlite')
def sqlite_disable_sync(_, connection):
cursor = connection.cursor()
cursor.execute("PRAGMA synchronous = 0")

self.MiscData = misc.define_binding(self._db)

self.TrackerState = tracker_state.define_binding(self._db)
Expand Down
4 changes: 3 additions & 1 deletion Tribler/Core/Modules/restapi/root_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from Tribler.Core.Modules.restapi.torrentinfo_endpoint import TorrentInfoEndpoint
from Tribler.Core.Modules.restapi.trustchain_endpoint import TrustchainEndpoint
from Tribler.Core.Modules.restapi.trustview_endpoint import TrustViewEndpoint
from Tribler.Core.Modules.restapi.upgrader_endpoint import UpgraderEndpoint
from Tribler.Core.Modules.restapi.wallets_endpoint import WalletsEndpoint
from Tribler.pyipv8.ipv8.REST.root_endpoint import RootEndpoint as IPV8RootEndpoint

Expand All @@ -38,9 +39,11 @@ def __init__(self, session):
self.events_endpoint = EventsEndpoint(self.session)
self.state_endpoint = StateEndpoint(self.session)
self.shutdown_endpoint = ShutdownEndpoint(self.session)
self.upgrader_endpoint = UpgraderEndpoint(self.session)
self.putChild(b"events", self.events_endpoint)
self.putChild(b"state", self.state_endpoint)
self.putChild(b"shutdown", self.shutdown_endpoint)
self.putChild(b"upgrader", self.upgrader_endpoint)

def start_endpoints(self):
"""
Expand All @@ -52,7 +55,6 @@ def start_endpoints(self):
b"downloads": DownloadsEndpoint,
b"createtorrent": CreateTorrentEndpoint,
b"debug": DebugEndpoint,
b"shutdown": ShutdownEndpoint,
b"trustchain": TrustchainEndpoint,
b"trustview": TrustViewEndpoint,
b"statistics": StatisticsEndpoint,
Expand Down
44 changes: 44 additions & 0 deletions Tribler/Core/Modules/restapi/upgrader_endpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from __future__ import absolute_import

import logging

from twisted.web import resource

import Tribler.Core.Utilities.json_util as json


class UpgraderEndpoint(resource.Resource):
"""
With this endpoint you can control DB upgrade process of Tribler.
"""

def __init__(self, session):
resource.Resource.__init__(self)
self._logger = logging.getLogger(self.__class__.__name__)
self.session = session

def render_DELETE(self, _):
"""
.. http:put:: /upgrader
A PUT request to this endpoint will skip the DB upgrade process, if it is running.
**Example request**:
.. sourcecode:: none
curl -X DELETE http://localhost:8085/upgrader
**Example response**:
.. sourcecode:: javascript
{
"skip": True
}
"""

if self.session.upgrader:
self.session.upgrader.skip()

return json.twisted_dumps({"skip": True})
16 changes: 9 additions & 7 deletions Tribler/Core/Session.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from threading import RLock

from twisted.internet import threads
from twisted.internet.defer import fail, inlineCallbacks
from twisted.internet.defer import fail, inlineCallbacks, succeed
from twisted.python.failure import Failure
from twisted.python.log import addObserver
from twisted.python.threadable import isInIOThread
Expand Down Expand Up @@ -89,7 +89,6 @@ def __init__(self, config=None, autoload_discovery=True):

self.autoload_discovery = autoload_discovery


def create_state_directory_structure(self):
"""Create directory structure of the state directory."""

Expand Down Expand Up @@ -406,9 +405,15 @@ def start(self):
if self.upgrader_enabled:
self.upgrader = TriblerUpgrader(self)
self.readable_status = STATE_UPGRADING_READABLE
self.upgrader.run()
upgrader_deferred = self.upgrader.run()
else:
upgrader_deferred = succeed(None)

startup_deferred = self.lm.register(self, self.session_lock)
def after_upgrade(_):
self.upgrader = None

startup_deferred = upgrader_deferred.addCallbacks(lambda _: self.lm.register(self, self.session_lock),
lambda _: None).addCallbacks(after_upgrade, lambda _: None)

def load_checkpoint(_):
if self.config.get_libtorrent_enabled():
Expand Down Expand Up @@ -448,9 +453,6 @@ def on_early_shutdown_complete(_):
self.notify_shutdown_state("Shutting down Metadata Store...")
self.lm.mds.shutdown()

if self.upgrader:
self.upgrader.shutdown()

# We close the API manager as late as possible during shutdown.
if self.lm.api_manager is not None:
self.notify_shutdown_state("Shutting down API Manager...")
Expand Down
80 changes: 41 additions & 39 deletions Tribler/Core/Upgrade/db72_to_pony.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@

from six import text_type

from twisted.internet.threads import deferToThread
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from twisted.internet.task import deferLater

from Tribler.Core.Modules.MetadataStore.OrmBindings.channel_node import LEGACY_ENTRY, NEW
from Tribler.Core.Modules.MetadataStore.serialization import REGULAR_TORRENT
Expand Down Expand Up @@ -235,8 +237,7 @@ def convert_personal_channel(self):
# Just drop the entries from the previous try
my_channel = self.mds.ChannelMetadata.get_my_channel()
if my_channel:
# FIXME: PONY BUG - can't use bulk delete because of broken FOREIGN KEY constraints
my_channel.contents.delete()
my_channel.contents.delete(bulk=True)
my_channel.delete()
else:
# Something is wrong, this should never happen
Expand All @@ -251,8 +252,6 @@ def convert_personal_channel(self):
old_torrents = self.get_old_torrents(personal_channel_only=True, sign=True)
my_channel = self.mds.ChannelMetadata.create_channel(title=self.personal_channel_title, description='')
for (torrent, _) in old_torrents:
if self.shutting_down:
return
try:
md = self.mds.TorrentMetadata(**torrent)
md.parents.add(my_channel)
Expand All @@ -263,6 +262,25 @@ def convert_personal_channel(self):

v.value = CONVERSION_FINISHED

@inlineCallbacks
def update_convert_total(self, amount, elapsed):
if self.notifier_callback:
self.notifier_callback("%i entries converted in %i seconds (%i e/s)" % (amount, int(elapsed),
int(amount / elapsed)))
yield deferLater(reactor, 0.001, lambda: None)

@inlineCallbacks
def update_convert_progress(self, amount, total, elapsed):
if self.notifier_callback:
elapsed = 0.0001 if elapsed == 0.0 else elapsed
amount = amount or 1
est_speed = amount/elapsed
eta = str(datetime.timedelta(seconds=int((total - amount) / est_speed)))
self.notifier_callback("Converting old channels.\nTorrents converted: %i/%i (%i%%).\nTime remaining: %s" %
(amount, total, (amount * 100) // total, eta))
yield deferLater(reactor, 0.001, lambda: None)

@inlineCallbacks
def convert_discovered_torrents(self):
offset = 0
# Reflect conversion state
Expand All @@ -280,12 +298,12 @@ def convert_discovered_torrents(self):
batch_size = 100
total_to_convert = self.get_old_torrents_count()

reference_timedelta = datetime.timedelta(milliseconds=100)
reference_timedelta = datetime.timedelta(milliseconds=1000)
start = 0 + offset
end = start
elapsed = 1
while start < total_to_convert:
batch = self.get_old_torrents(batch_size=batch_size, offset=start)
if not batch:
if not batch or self.shutting_down:
break

end = start + len(batch)
Expand All @@ -294,43 +312,34 @@ def convert_discovered_torrents(self):
try:
with db_session:
for (t, _) in batch:
if self.shutting_down:
return
try:
self.mds.TorrentMetadata(**t)
except (TransactionIntegrityError, CacheIndexError):
pass
except (TransactionIntegrityError, CacheIndexError):
pass
batch_end_time = datetime.datetime.now() - batch_start_time
# It is not necessary to put 'sleep' here, because get_old_torrents effectively plays that role

elapsed = (datetime.datetime.utcnow() - start_time).total_seconds()
yield self.update_convert_progress(start, total_to_convert, elapsed)
target_coeff = (batch_end_time.total_seconds() / reference_timedelta.total_seconds())
if len(batch) == batch_size:
# Adjust batch size only for full batches
if target_coeff < 0.8:
batch_size += batch_size
elif target_coeff > 1.0:
elif target_coeff > 1.1:
batch_size = int(float(batch_size) / target_coeff)
batch_size += 1 # we want to guarantee that at least something will go through
self._logger.info("Converted old torrents batch: %i/%i %f ",
# we want to guarantee that at least some entries will go through
batch_size = batch_size if batch_size > 10 else 10
self._logger.info("Converted old torrents: %i/%i %f ",
start + batch_size, total_to_convert, float(batch_end_time.total_seconds()))

if self.notifier_callback:
self.notifier_callback("%i/%i" % (start + batch_size, total_to_convert))
start = end

with db_session:
v = self.mds.MiscData.get_for_update(name=CONVERSION_FROM_72_DISCOVERED)
v.value = CONVERSION_FINISHED

stop_time = datetime.datetime.utcnow()
elapsed = (stop_time - start_time).total_seconds()

if self.notifier_callback:
self.notifier_callback(
"%i entries converted in %i seconds (%i e/s)" % (
end - offset, int(elapsed), int((end - offset) / elapsed)))
yield self.update_convert_total(start, elapsed)

def convert_discovered_channels(self):
# Reflect conversion state
Expand All @@ -351,7 +360,7 @@ def convert_discovered_channels(self):
with db_session:
for c in old_channels:
if self.shutting_down:
return
break
try:
self.mds.ChannelMetadata(**c)
except:
Expand Down Expand Up @@ -385,22 +394,14 @@ def mark_conversion_finished(self):
else:
self.mds.MiscData(name=CONVERSION_FROM_72, value=CONVERSION_FINISHED)

@inlineCallbacks
def do_migration(self):
self.convert_personal_channel()

def background_stuff():
self.mds.clock = None # We should never touch the clock during legacy conversions
if not self.shutting_down:
self.convert_discovered_torrents()
if not self.shutting_down:
self.convert_discovered_channels()
if not self.shutting_down:
self.update_trackers_info()
if not self.shutting_down:
self.mark_conversion_finished()
self.mds._db.disconnect()

return deferToThread(background_stuff)
self.mds.clock = None # We should never touch the clock during legacy conversions
yield self.convert_discovered_torrents()
self.convert_discovered_channels()
self.update_trackers_info()
self.mark_conversion_finished()


def old_db_version_ok(old_database_path):
Expand All @@ -412,6 +413,7 @@ def old_db_version_ok(old_database_path):
version = int(cursor.fetchone()[0])
if version == 29:
return True
connection.close()
return False


Expand Down
37 changes: 21 additions & 16 deletions Tribler/Core/Upgrade/upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import logging
import os

from twisted.internet.defer import succeed

from Tribler.Core.Modules.MetadataStore.store import MetadataStore
from Tribler.Core.Upgrade.config_converter import convert_config_to_tribler71
from Tribler.Core.Upgrade.db72_to_pony import DispersyToPonyMigration, should_upgrade
Expand All @@ -18,10 +20,13 @@ def __init__(self, session):
self.notified = False
self.is_done = False
self.failed = True
self.finished_deferred = None
self.pony_upgrader = None

self.current_status = u"Initializing"
self._dtp72 = None

def skip(self):
if self._dtp72:
self._dtp72.shutting_down = True

def run(self):
"""
Expand All @@ -30,11 +35,8 @@ def run(self):
Note that by default, upgrading is enabled in the config. It is then disabled
after upgrading to Tribler 7.
"""
self.notify_starting()

self.upgrade_72_to_pony()
# self.upgrade_config_to_71()
self.notify_done()
return self.upgrade_72_to_pony()

def update_status(self, status_text):
self.session.notifier.notify(NTFY_UPGRADER_TICK, NTFY_STARTED, None, status_text)
Expand All @@ -45,14 +47,21 @@ def upgrade_72_to_pony(self):
new_database_path = os.path.join(self.session.config.get_state_dir(), 'sqlite', 'metadata.db')
channels_dir = os.path.join(self.session.config.get_chant_channels_dir())

self.pony_upgrader = DispersyToPonyMigration(old_database_path, self.update_status, logger=self._logger)
self._dtp72 = DispersyToPonyMigration(old_database_path, self.update_status, logger=self._logger)
if not should_upgrade(old_database_path, new_database_path, logger=self._logger):
return
self._dtp72 = None
return succeed(None)
self.notify_starting()
# We have to create the Metadata Store object because the LaunchManyCore has not been started yet
mds = MetadataStore(new_database_path, channels_dir, self.session.trustchain_keypair)
self.pony_upgrader.initialize(mds)
self.finished_deferred = self.pony_upgrader.do_migration()
mds.shutdown()
mds = MetadataStore(new_database_path, channels_dir, self.session.trustchain_keypair, disable_sync=True)
self._dtp72.initialize(mds)

def finish_migration(_):
mds.shutdown()
self.notify_done()

return self._dtp72.do_migration().addCallbacks(finish_migration, lambda _: None)


def upgrade_config_to_71(self):
"""
Expand All @@ -76,7 +85,3 @@ def notify_done(self):
Broadcast a notification (event) that the upgrader is done.
"""
self.session.notifier.notify(NTFY_UPGRADER, NTFY_FINISHED, None)

def shutdown(self):
if self.pony_upgrader:
self.pony_upgrader.shutting_down = True
Loading

0 comments on commit 6420df6

Please sign in to comment.