Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/event request manager reconnect #7001

Merged
36 changes: 31 additions & 5 deletions src/tribler/gui/core_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from PyQt5.QtCore import QObject, QProcess, QProcessEnvironment, pyqtSignal
from PyQt5.QtNetwork import QNetworkRequest

from tribler.core.utilities.process_checker import ProcessChecker
from tribler.gui.app_manager import AppManager
from tribler.gui.event_request_manager import EventRequestManager
from tribler.gui.exceptions import CoreCrashedError
Expand Down Expand Up @@ -51,8 +52,15 @@ def __init__(self, root_state_dir: Path, api_port: int, api_key: str, app_manage
connect(self.events_manager.core_connected, self.on_core_connected)

def on_core_connected(self, _):
if not self.core_finished:
self.core_connected = True
if self.core_finished:
self._logger.warning('Core connected after the core process is already finished')
return

if self.shutting_down:
self._logger.warning('Core connected after the shutting down is already started')
return

self.core_connected = True

def start(self, core_args=None, core_env=None, upgrade_manager=None, run_core=True):
"""
Expand Down Expand Up @@ -147,8 +155,13 @@ def stop(self, quit_app_on_core_finished=True):
self.shutting_down = True
self._logger.info("Stopping Core manager")

need_to_shutdown_core = (self.core_process or self.core_connected) and not self.core_finished
if need_to_shutdown_core:
if self.core_process and not self.core_finished:
if not self.core_connected:
# If Core is not connected via events_manager it also most probably cannot process API requests.
self._logger.warning('Core is not connected during the CoreManager shutdown, killing it...')
self.kill_core_process_and_remove_the_lock_file()
return

self.events_manager.shutting_down = True

def shutdown_request_processed(response):
Expand All @@ -166,10 +179,23 @@ def send_shutdown_request(initial=False):
send_shutdown_request(initial=True)

elif self.should_quit_app_on_core_finished:
self._logger.info('Core finished, quitting GUI application')
self._logger.info('Core is not running, quitting GUI application')
self.app_manager.quit_application()

def kill_core_process_and_remove_the_lock_file(self):
if not self.core_process:
self._logger.warning("Cannot kill the Core process as it is not initialized")

self.core_process.kill()
finished = self.core_process.waitForFinished()
if not finished:
self._logger.error('Cannot kill the core process')

process_checker = ProcessChecker(self.root_state_dir)
process_checker.remove_lock()

def on_core_finished(self, exit_code, exit_status):
self._logger.info("Core process finished")
self.core_running = False
self.core_finished = True
if self.shutting_down:
Expand Down
60 changes: 39 additions & 21 deletions src/tribler/gui/event_request_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import logging
import time
from typing import Optional

from PyQt5.QtCore import QTimer, QUrl, pyqtSignal
from PyQt5.QtNetwork import QNetworkAccessManager, QNetworkReply, QNetworkRequest
Expand All @@ -9,13 +10,13 @@
from tribler.core.components.reporter.reported_error import ReportedError
from tribler.core.utilities.notifier import Notifier
from tribler.gui import gui_sentry_reporter
from tribler.gui.exceptions import CoreConnectTimeoutError, CoreConnectionError
from tribler.gui.utilities import connect
from tribler.gui.exceptions import CoreConnectTimeoutError
from tribler.gui.utilities import connect, make_network_errors_dict

received_events = []

CORE_CONNECTION_ATTEMPTS_LIMIT = 120
RECONNECT_INTERVAL_MS = 500
CORE_CONNECTION_TIMEOUT = 60
RECONNECT_INTERVAL_MS = 100


class EventRequestManager(QNetworkAccessManager):
Expand All @@ -39,13 +40,15 @@ def __init__(self, api_port, api_key, error_handler):
url = QUrl("http://localhost:%d/events" % api_port)
self.request = QNetworkRequest(url)
self.request.setRawHeader(b'X-Api-Key', api_key.encode('ascii'))
self.remaining_connection_attempts = CORE_CONNECTION_ATTEMPTS_LIMIT
self.start_time = time.time()
self.connect_timer = QTimer()
self.current_event_string = ""
self.reply = None
self.reply: Optional[QNetworkReply] = None
self.receiving_data = False
self.shutting_down = False
self.error_handler = error_handler
self._logger = logging.getLogger(self.__class__.__name__)
self.network_errors = make_network_errors_dict()

self.connect_timer.setSingleShot(True)
connect(self.connect_timer.timeout, self.connect)
Expand Down Expand Up @@ -95,23 +98,38 @@ def on_tribler_shutdown_state(self,state: str):
def on_report_config_error(self, error):
self.config_error_signal.emit(error)

def on_error(self, error, reschedule_on_err):
if error == QNetworkReply.ConnectionRefusedError:
self._logger.debug("Tribler Core refused connection, retrying...")
else:
raise CoreConnectionError(f"Error {error} while trying to connect to Tribler Core")

if self.remaining_connection_attempts <= 0:
raise CoreConnectTimeoutError("Could not connect with the Tribler Core within "
f"{RECONNECT_INTERVAL_MS*CORE_CONNECTION_ATTEMPTS_LIMIT//1000} seconds")

self.remaining_connection_attempts -= 1
def on_error(self, error: int, reschedule_on_err: bool):
# If the REST API server is not started yet and the port is not opened, the error will be received.
# The specific error can be different on different systems:
# - QNetworkReply.ConnectionRefusedError (code 1);
# - QNetworkReply.HostNotFoundError (code 3);
# - QNetworkReply.TimeoutError (code 4);
# - QNetworkReply.UnknownNetworkError (code 99).
# Tribler GUI should retry on any of these errors.

# Depending on the system, while the server is not started, the error can be returned with some delay
# (like, five seconds). But don't try to specify a timeout using request.setTransferTimeout(REQUEST_TIMEOUT_MS).
# First, it is unnecessary, as the reply is sent almost immediately after the REST API is started,
# so the GUI will not wait five seconds for that. Also, with TransferTimeout specified, AIOHTTP starts
# raising ConnectionResetError "Cannot write to closing transport".

should_retry = reschedule_on_err and time.time() < self.start_time + CORE_CONNECTION_TIMEOUT
error_name = self.network_errors.get(error, error)
self._logger.info(f"Error {error_name} while trying to connect to Tribler Core"
+ (', will retry...' if should_retry else ''))

if reschedule_on_err:
# Reschedule an attempt
self.connect_timer.start(RECONNECT_INTERVAL_MS)
if should_retry:
self.connect_timer.start(RECONNECT_INTERVAL_MS) # Reschedule an attempt
else:
raise CoreConnectTimeoutError(
f"Could not connect with the Tribler Core within {CORE_CONNECTION_TIMEOUT} seconds")

def on_read_data(self):
if not self.receiving_data:
self.receiving_data = True
self._logger.info('Starts receiving data from Core')

if self.receivers(self.finished) == 0:
connect(self.finished, lambda reply: self.on_finished())
self.connect_timer.stop()
Expand Down Expand Up @@ -142,11 +160,11 @@ def on_finished(self):
if self.shutting_down:
return
self._logger.warning("Events connection dropped, attempting to reconnect")
self.remaining_connection_attempts = CORE_CONNECTION_ATTEMPTS_LIMIT
self.start_time = time.time()
self.connect_timer.start(RECONNECT_INTERVAL_MS)

def connect(self, reschedule_on_err=True):
self._logger.debug("Will connect to events endpoint")
self._logger.info(f"Connecting to events endpoint ({'with' if reschedule_on_err else 'without'} retrying)")
if self.reply is not None:
self.reply.deleteLater()
self.reply = self.get(self.request)
Expand Down
11 changes: 4 additions & 7 deletions src/tribler/gui/tribler_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,9 +527,11 @@ def tray_show_message(self, title, message):

def on_core_connected(self, version):
if self.tribler_started:
logging.warning("Received duplicate Tribler Core started event")
self._logger.warning("Received duplicate Tribler Core connected event")
return

self._logger.info("Core connected")

self.tribler_started = True
self.tribler_version = version

Expand Down Expand Up @@ -1205,12 +1207,7 @@ def dropEvent(self, e):
e.accept()

def clicked_force_shutdown(self):
pid = self.core_manager.core_process.pid()
try:
os.kill(pid, 9)
except OSError:
pass

self.core_manager.kill_core_process_and_remove_the_lock_file()
self.app_manager.quit_application()

def clicked_skip_conversion(self):
Expand Down
13 changes: 12 additions & 1 deletion src/tribler/gui/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import types
from datetime import datetime, timedelta
from pathlib import Path
from typing import Callable
from typing import Callable, Dict
from urllib.parse import quote_plus
from uuid import uuid4

Expand All @@ -21,6 +21,7 @@
pyqtSignal,
)
from PyQt5.QtGui import QPixmap, QRegion
from PyQt5.QtNetwork import QNetworkReply
from PyQt5.QtWidgets import QApplication, QMessageBox

import tribler.gui
Expand Down Expand Up @@ -490,3 +491,13 @@ def show_message_box(text: str = '', title: str = 'Error', icon: QMessageBox.Ico
message_box.setWindowTitle(title)
message_box.setText(text)
message_box.exec_()


def make_network_errors_dict() -> Dict[int, str]:
network_errors = {}
for name in dir(QNetworkReply):
if name.endswith('Error'):
value = getattr(QNetworkReply, name)
if isinstance(value, int):
network_errors[value] = name
return network_errors