From a18a036822240417c82f2170ddd141638e3d1cd1 Mon Sep 17 00:00:00 2001 From: Quinten Stokkink Date: Wed, 11 Dec 2024 14:53:37 +0100 Subject: [PATCH] Use db_session retry and more exception handling in upgrade --- .ruff.toml | 1 - src/tribler/upgrade_script.py | 360 ++++++++++++++++++---------------- 2 files changed, 194 insertions(+), 167 deletions(-) diff --git a/.ruff.toml b/.ruff.toml index b711ca71f0..e1f7953802 100644 --- a/.ruff.toml +++ b/.ruff.toml @@ -4,7 +4,6 @@ lint.unfixable = [] lint.ignore = [ "ANN003", - "ANN101", "ARG001", "ARG002", "ARG005", diff --git a/src/tribler/upgrade_script.py b/src/tribler/upgrade_script.py index 2161e8a45c..0fe276204a 100644 --- a/src/tribler/upgrade_script.py +++ b/src/tribler/upgrade_script.py @@ -18,7 +18,7 @@ from typing import TYPE_CHECKING from configobj import ConfigObj -from pony.orm import Database, db_session +from pony.orm import Database, OperationalError, db_session if TYPE_CHECKING: from collections.abc import Generator @@ -28,7 +28,7 @@ FROM: str = "7.14" TO: str = "8.0" -# ruff: noqa: B007,F841,N802,RUF015,W291 +# ruff: noqa: B007,C901,F841,N802,RUF015,W291 def batched(results: list, n: int = 1) -> Generator[list]: @@ -102,6 +102,84 @@ def _import_7_14_settings(src: str, dst: TriblerConfigManager) -> None: _copy_if_exists(old, "tunnel_community/max_circuits", dst, "tunnel_community/max_circuits", int) +@db_session(retry=3) +def _inject_StatementOp_inner(db: Database, batch: list) -> None: + """ + The inner injection script to write old StatementOp info into the current database. + """ + for (subject_name, subject_type, + object_name, object_type, + stmt_added_count, stmt_removed_count, stmt_local_operation, + peer_public_key, peer_added_at, + stmtop_operation, stmtop_clock, stmtop_signature, stmtop_updated_at, stmtop_auto_generated) in batch: + # Insert subject + results = list(db.execute("SELECT id FROM Resource WHERE name=$subject_name AND type=$subject_type", + globals(), locals())) + if not results: + cursor = db.execute("INSERT INTO Resource " + "VALUES ((SELECT COALESCE(MAX(id),0)+1 FROM Resource), $subject_name, " + "$subject_type)", + globals(), locals()) + results = [(cursor.lastrowid,)] + subject_id, = results[0] + + # Insert object + results = list( + db.execute("SELECT id FROM Resource WHERE name=$object_name AND type=$object_type", + globals(), locals())) + if not results: + cursor = db.execute( + "INSERT INTO Resource VALUES ((SELECT COALESCE(MAX(id),0)+1 FROM Resource), $object_name, " + "$object_type)", + globals(), locals() + ) + results = [(cursor.lastrowid,)] + object_id, = results[0] + + # Insert statement + results = list(db.execute( + "SELECT id FROM Statement WHERE object=$object_id AND subject=$subject_id", + globals(), locals())) + if not results: + cursor = db.execute( + "INSERT INTO Statement VALUES ((SELECT COALESCE(MAX(id),0)+1 FROM Statement), $subject_id, " + "$object_id, $stmt_added_count, $stmt_removed_count, $stmt_local_operation)", + globals(), locals() + ) + results = [(cursor.lastrowid,)] + statement_id, = results[0] + + # Insert peer + results = list( + db.execute("SELECT id, added_at FROM Peer WHERE public_key=$peer_public_key", + globals(), locals())) + if results and results[0][1] >= peer_added_at: + db.execute("UPDATE Peer SET added_at=$peer_added_at WHERE public_key=$peer_public_key", + globals(), locals()) + results = [(results[0][0],)] + elif not results: + cursor = db.execute( + "INSERT INTO Peer VALUES ((SELECT COALESCE(MAX(id),0)+1 FROM Peer), $peer_public_key, " + "$peer_added_at)", + globals(), locals() + ) + results = [(cursor.lastrowid,)] + else: + results = [(results[0][0],)] + peer_id, = results[0] + + # Insert statement op + results = list(db.execute("SELECT id FROM StatementOp WHERE statement=$statement_id AND " + "peer=$peer_id", + globals(), locals())) + if not results: + db.execute( + "INSERT INTO StatementOp VALUES ((SELECT COALESCE(MAX(id),0)+1 FROM StatementOp), " + "$statement_id, $peer_id, $stmtop_operation, $stmtop_clock, $stmtop_signature, " + "$stmtop_updated_at, $stmtop_auto_generated)", + globals(), locals()) + + def _inject_StatementOp(abs_src_db: str, abs_dst_db: str) -> None: """ Import old StatementOp entries. @@ -122,85 +200,45 @@ def _inject_StatementOp(abs_src_db: str, abs_dst_db: str) -> None: db = Database() db.bind(provider="sqlite", filename=abs_dst_db) for batch in batched(output, n=20): - with db_session: - for (subject_name, subject_type, - object_name, object_type, - stmt_added_count, stmt_removed_count, stmt_local_operation, - peer_public_key, peer_added_at, - stmtop_operation, stmtop_clock, stmtop_signature, stmtop_updated_at, stmtop_auto_generated) in batch: - try: - # Insert subject - results = list(db.execute("SELECT id FROM Resource WHERE name=$subject_name AND type=$subject_type", - globals(), locals())) - if not results: - cursor = db.execute("INSERT INTO Resource " - "VALUES ((SELECT COALESCE(MAX(id),0)+1 FROM Resource), $subject_name, " - "$subject_type)", - globals(), locals()) - results = [(cursor.lastrowid, )] - subject_id, = results[0] - - # Insert object - results = list( - db.execute("SELECT id FROM Resource WHERE name=$object_name AND type=$object_type", - globals(), locals())) - if not results: - cursor = db.execute( - "INSERT INTO Resource VALUES ((SELECT COALESCE(MAX(id),0)+1 FROM Resource), $object_name, " - "$object_type)", - globals(), locals() - ) - results = [(cursor.lastrowid, )] - object_id, = results[0] - - # Insert statement - results = list(db.execute( - "SELECT id FROM Statement WHERE object=$object_id AND subject=$subject_id", - globals(), locals())) - if not results: - cursor = db.execute( - "INSERT INTO Statement VALUES ((SELECT COALESCE(MAX(id),0)+1 FROM Statement), $subject_id, " - "$object_id, $stmt_added_count, $stmt_removed_count, $stmt_local_operation)", - globals(), locals() - ) - results = [(cursor.lastrowid, )] - statement_id, = results[0] - - # Insert peer - results = list( - db.execute("SELECT id, added_at FROM Peer WHERE public_key=$peer_public_key", - globals(), locals())) - if results and results[0][1] >= peer_added_at: - db.execute("UPDATE Peer SET added_at=$peer_added_at WHERE public_key=$peer_public_key", - globals(), locals()) - results = [(results[0][0],)] - elif not results: - cursor = db.execute( - "INSERT INTO Peer VALUES ((SELECT COALESCE(MAX(id),0)+1 FROM Peer), $peer_public_key, " - "$peer_added_at)", - globals(), locals() - ) - results = [(cursor.lastrowid, )] - else: - results = [(results[0][0], )] - peer_id, = results[0] - - # Insert statement op - results = list(db.execute("SELECT id FROM StatementOp WHERE statement=$statement_id AND " - "peer=$peer_id", - globals(), locals())) - if not results: - db.execute( - "INSERT INTO StatementOp VALUES ((SELECT COALESCE(MAX(id),0)+1 FROM StatementOp), " - "$statement_id, $peer_id, $stmtop_operation, $stmtop_clock, $stmtop_signature, " - "$stmtop_updated_at, $stmtop_auto_generated)", - globals(), locals()) - - db.commit() - except sqlite3.DatabaseError as e: - db.rollback() - logging.exception(e) + try: + _inject_StatementOp_inner(db, batch) + except OperationalError as e: + logging.exception(e) + try: db.disconnect() + except OperationalError as e: + logging.exception(e) + + +@db_session(retry=3) +def _inject_ChannelNode_inner(db: Database, batch: list) -> None: + """ + The inner injection script to write old ChannelNode info into the current database. + """ + for (infohash, size, torrent_date, tracker_info, title, tags, metadata_type, reserved_flags, + origin_id, public_key, id_, timestamp, signature, added_on, status, xxx, tag_processor_version, + seeders, leechers, last_check, self_checked, has_data) in batch: + # Insert subject + results = list(db.execute("SELECT rowid FROM TorrentState WHERE infohash=$infohash", + globals(), locals())) + if not results: + cursor = db.execute("INSERT INTO TorrentState " + "VALUES ((SELECT COALESCE(MAX(rowid),0)+1 FROM TorrentState), " + "$infohash, $seeders, $leechers, $last_check, $self_checked, $has_data)", + globals(), locals()) + results = [(cursor.lastrowid,)] + health_id, = results[0] + + # Insert channel ChannelNode + results = list(db.execute("SELECT rowid FROM ChannelNode WHERE public_key=$public_key AND id_=$id_", + globals(), locals())) + if not results: + db.execute( + "INSERT INTO ChannelNode VALUES ((SELECT COALESCE(MAX(rowid),0)+1 FROM ChannelNode), " + "$infohash, $size, $torrent_date, $tracker_info, $title, $tags, $metadata_type, " + "$reserved_flags, $origin_id, $public_key, $id_, $timestamp, $signature, $added_on, " + "$status, $xxx, $health_id, $tag_processor_version)", + globals(), locals()) def _inject_ChannelNode(abs_src_db: str, abs_dst_db: str) -> None: @@ -221,38 +259,39 @@ def _inject_ChannelNode(abs_src_db: str, abs_dst_db: str) -> None: db = Database() db.bind(provider="sqlite", filename=abs_dst_db) for batch in batched(output, n=20): - with db_session: - for (infohash, size, torrent_date, tracker_info, title, tags, metadata_type, reserved_flags, - origin_id, public_key, id_, timestamp, signature, added_on, status, xxx, tag_processor_version, - seeders, leechers, last_check, self_checked, has_data) in batch: - try: - # Insert subject - results = list(db.execute("SELECT rowid FROM TorrentState WHERE infohash=$infohash", - globals(), locals())) - if not results: - cursor = db.execute("INSERT INTO TorrentState " - "VALUES ((SELECT COALESCE(MAX(rowid),0)+1 FROM TorrentState), " - "$infohash, $seeders, $leechers, $last_check, $self_checked, $has_data)", - globals(), locals()) - results = [(cursor.lastrowid, )] - health_id, = results[0] - - # Insert channel ChannelNode - results = list(db.execute("SELECT rowid FROM ChannelNode WHERE public_key=$public_key AND id_=$id_", - globals(), locals())) - if not results: - db.execute( - "INSERT INTO ChannelNode VALUES ((SELECT COALESCE(MAX(rowid),0)+1 FROM ChannelNode), " - "$infohash, $size, $torrent_date, $tracker_info, $title, $tags, $metadata_type, " - "$reserved_flags, $origin_id, $public_key, $id_, $timestamp, $signature, $added_on, " - "$status, $xxx, $health_id, $tag_processor_version)", - globals(), locals()) - - db.commit() - except sqlite3.DatabaseError as e: - db.rollback() - logging.exception(e) + try: + _inject_ChannelNode_inner(db, batch) + except OperationalError as e: + logging.exception(e) + try: db.disconnect() + except OperationalError as e: + logging.exception(e) + + +@db_session(retry=3) +def _inject_TrackerState_inner(db: Database, batch: list) -> None: + """ + The inner injection script to write old TrackerState info into the current database. + """ + for (url, last_check, alive, failures) in batch: + results = list(db.execute("SELECT rowid, last_check, alive, failures FROM TrackerState WHERE " + "url=$url", + globals(), locals())) + if results: + tracker_id, n_last_check, n_alive, n_failures = results[0] + s_last_check = max(n_last_check, last_check) + s_alive = alive if last_check > n_last_check else n_alive + s_failures = int(failures) + int(n_failures) + db.execute( + "UPDATE TrackerState SET last_check=$s_last_check, alive=$s_alive, failures=$s_failures " + "WHERE rowid=$tracker_id", + globals(), locals()) + else: + db.execute( + "INSERT INTO TrackerState VALUES ((SELECT COALESCE(MAX(rowid),0)+1 FROM TrackerState), " + "$url, $last_check, $alive, $failures)", + globals(), locals()) def _inject_TrackerState(abs_src_db: str, abs_dst_db: str) -> None: @@ -266,32 +305,44 @@ def _inject_TrackerState(abs_src_db: str, abs_dst_db: str) -> None: db = Database() db.bind(provider="sqlite", filename=abs_dst_db) for batch in batched(output, n=20): - with db_session: - for (url, last_check, alive, failures) in batch: - try: - results = list(db.execute("SELECT rowid, last_check, alive, failures FROM TrackerState WHERE " - "url=$url", - globals(), locals())) - if results: - tracker_id, n_last_check, n_alive, n_failures = results[0] - s_last_check = max(n_last_check, last_check) - s_alive = alive if last_check > n_last_check else n_alive - s_failures = int(failures) + int(n_failures) - db.execute( - "UPDATE TrackerState SET last_check=$s_last_check, alive=$s_alive, failures=$s_failures " - "WHERE rowid=$tracker_id", - globals(), locals()) - else: - db.execute( - "INSERT INTO TrackerState VALUES ((SELECT COALESCE(MAX(rowid),0)+1 FROM TrackerState), " - "$url, $last_check, $alive, $failures)", - globals(), locals()) - - db.commit() - except sqlite3.DatabaseError as e: - db.rollback() - logging.exception(e) + try: + _inject_TrackerState_inner(db, batch) + except OperationalError as e: + logging.exception(e) + try: db.disconnect() + except OperationalError as e: + logging.exception(e) + + +@db_session(retry=3) +def _inject_TorrentState_TrackerState_inner(db: Database, batch: list) -> None: + """ + The inner injection script to write old TorrentState info into the current database. + """ + for (infohash, url) in batch: + results = list(db.execute("""SELECT TorrentState.infohash, TrackerState.url +FROM TorrentState_TrackerState +INNER JOIN TorrentState ON TorrentState_TrackerState.torrentstate=TorrentState.rowid +INNER JOIN TrackerState ON TorrentState_TrackerState.trackerstate=TrackerState.rowid +WHERE TorrentState.infohash=$infohash AND TrackerState.url=$url +;""", globals(), locals())) + if not results: + # Note: both the tracker and torrent state should've been imported already + torrent_states = list(db.execute("SELECT rowid FROM TorrentState WHERE infohash=$infohash", + globals(), locals())) + tracker_states = list(db.execute("SELECT rowid FROM TrackerState WHERE url=$url", + globals(), locals())) + if not torrent_states: + logging.warning("Torrent state for %s disappeared mid-upgrade!", infohash) + continue + if not tracker_states: + logging.warning("Tracker state for %s disappeared mid-upgrade!", url) + continue + torrent_state, = torrent_states[0] + tracker_state, = tracker_states[0] + db.execute("INSERT INTO TorrentState_TrackerState VALUES ($torrent_state, $tracker_state)", + globals(), locals()) def _inject_TorrentState_TrackerState(abs_src_db: str, abs_dst_db: str) -> None: @@ -309,37 +360,14 @@ def _inject_TorrentState_TrackerState(abs_src_db: str, abs_dst_db: str) -> None: db = Database() db.bind(provider="sqlite", filename=abs_dst_db) for batch in batched(output, n=20): - with db_session: - for (infohash, url) in batch: - try: - results = list(db.execute("""SELECT TorrentState.infohash, TrackerState.url -FROM TorrentState_TrackerState -INNER JOIN TorrentState ON TorrentState_TrackerState.torrentstate=TorrentState.rowid -INNER JOIN TrackerState ON TorrentState_TrackerState.trackerstate=TrackerState.rowid -WHERE TorrentState.infohash=$infohash AND TrackerState.url=$url -;""", globals(), locals())) - if not results: - # Note: both the tracker and torrent state should've been imported already - torrent_states = list(db.execute("SELECT rowid FROM TorrentState WHERE infohash=$infohash", - globals(), locals())) - tracker_states = list(db.execute("SELECT rowid FROM TrackerState WHERE url=$url", - globals(), locals())) - if not torrent_states: - logging.warning("Torrent state for %s disappeared mid-upgrade!", infohash) - continue - if not tracker_states: - logging.warning("Tracker state for %s disappeared mid-upgrade!", url) - continue - torrent_state, = torrent_states[0] - tracker_state, = tracker_states[0] - db.execute("INSERT INTO TorrentState_TrackerState VALUES ($torrent_state, $tracker_state)", - globals(), locals()) - - db.commit() - except sqlite3.DatabaseError as e: - db.rollback() - logging.exception(e) + try: + _inject_TorrentState_TrackerState_inner(db, batch) + except OperationalError as e: + logging.exception(e) + try: db.disconnect() + except OperationalError as e: + logging.exception(e) def _inject_7_14_tables(src_db: str, dst_db: str, db_format: str) -> None: