diff --git a/src/tribler/core/components/restapi/rest/rest_manager.py b/src/tribler/core/components/restapi/rest/rest_manager.py index 12a08551048..e45ce94749f 100644 --- a/src/tribler/core/components/restapi/rest/rest_manager.py +++ b/src/tribler/core/components/restapi/rest/rest_manager.py @@ -98,7 +98,7 @@ async def start(self): """ Starts the HTTP API with the listen port as specified in the session configuration. """ - self._logger.info(f'Start {self.http_host}:{self.config.http_port}') + self._logger.info(f'An attempt to start REST API on {self.http_host}:{self.config.http_port}') # Not using setup_aiohttp_apispec here, as we need access to the APISpec to set the security scheme aiohttp_apispec = AiohttpApiSpec( @@ -135,6 +135,7 @@ async def start(self): self.set_api_port(api_port) await self.site.start() else: + self._logger.info(f"Searching for a free port starting from {api_port}") bind_attempts = 0 while bind_attempts < 10: try: diff --git a/src/tribler/core/components/restapi/rest/settings.py b/src/tribler/core/components/restapi/rest/settings.py index 65f58ada918..399b9c18385 100644 --- a/src/tribler/core/components/restapi/rest/settings.py +++ b/src/tribler/core/components/restapi/rest/settings.py @@ -13,6 +13,6 @@ class APISettings(TriblerConfigSection): https_port: int = -1 https_certfile: str = '' key: Optional[str] = None - retry_port: bool = False + retry_port: bool = True _port_validator = validator('http_port', 'https_port', allow_reuse=True)(validate_port_with_minus_one) diff --git a/src/tribler/core/utilities/process_manager/process.py b/src/tribler/core/utilities/process_manager/process.py index 44f5b492af8..ea9a648be0d 100644 --- a/src/tribler/core/utilities/process_manager/process.py +++ b/src/tribler/core/utilities/process_manager/process.py @@ -10,6 +10,7 @@ import psutil +from tribler.core.utilities.process_manager import sql_scripts from tribler.core.utilities.process_manager.utils import with_retry from tribler.core.version import version_id @@ -203,3 +204,20 @@ def _update(self, connection: sqlite3.Connection): self.rowid, prev_version, self.pid, self.kind.value, self.app_version, self.started_at]) if cursor.rowcount == 0: self.logger.error(f'Row {self.rowid} with row version {prev_version} was not found') + + def get_core_process(self) -> Optional[TriblerProcess]: + """ + Returns Core process created by the current GUI process, or None if the Core process was not found in the DB. + """ + if self.kind != ProcessKind.GUI: + raise TypeError('The `get_core_process` method can only be used for a GUI process') + + with self.manager.connect() as connection: + cursor = connection.execute(f""" + SELECT {sql_scripts.SELECT_COLUMNS} + FROM processes WHERE "primary" = 1 and kind = ? and creator_pid = ? + """, [ProcessKind.Core.value, self.pid]) + rows = cursor.fetchall() + if len(rows) > 1: # should not happen + raise RuntimeError('Multiple Core processes were found for a single GUI process') + return TriblerProcess.from_row(self.manager, rows[0]) if rows else None diff --git a/src/tribler/core/utilities/process_manager/tests/test_process.py b/src/tribler/core/utilities/process_manager/tests/test_process.py index 3eae8a98075..85aa6b6d641 100644 --- a/src/tribler/core/utilities/process_manager/tests/test_process.py +++ b/src/tribler/core/utilities/process_manager/tests/test_process.py @@ -143,3 +143,52 @@ def test_tribler_process_save(logger_error: Mock, current_process): p.save() assert logger_error.called assert logger_error.call_args[0][0] == 'Row 123 with row version 1 was not found' + + +@pytest.fixture(name='gui_process') +def gui_process_fixture(process_manager): + gui_process = TriblerProcess(pid=1, kind=ProcessKind.GUI, app_version='v1', started_at=1, manager=process_manager) + gui_process.save() + return gui_process + + +def test_get_core_process_no_core_process_found(gui_process): + assert gui_process.get_core_process() is None # no associated core processes were found + + +def test_get_core_process_non_primary(gui_process): + core_process = TriblerProcess(pid=2, kind=ProcessKind.Core, app_version='v1', started_at=1, creator_pid=1, + manager=gui_process.manager) + core_process.save() + assert gui_process.get_core_process() is None # core process should be primary to be selected + + +def test_get_core_process(gui_process): + core_process = TriblerProcess(pid=2, kind=ProcessKind.Core, app_version='v1', started_at=1, creator_pid=1, + manager=gui_process.manager) + core_process.primary = True + core_process.save() + p = gui_process.get_core_process() + assert p is not None # the core process was found for this GUI process + assert p is not core_process # it is a new object retrieved from the database, not the one created initially + assert p.pid == core_process.pid # it has the correct pid value + assert p.api_port is None # the api port is not specified yet + + core_process.set_api_port(123) + p2 = gui_process.get_core_process() + assert p2 is not None # the core process was found for this GUI process + assert p2 is not core_process and p2 is not p # it is a new object retrieved from the database + assert p2.api_port == 123 # it has correct API port value + + # Second Core process for the same GUI process should lead to an error + p3 = TriblerProcess(pid=3, kind=ProcessKind.Core, app_version='v1', started_at=1, creator_pid=1, primary=True, + manager=gui_process.manager) + p3.save() + with pytest.raises(RuntimeError, match='^Multiple Core processes were found for a single GUI process$'): + gui_process.get_core_process() + + +def test_get_core_process_exception(process_manager): + # in the process_manager fixture the current_process is a Core process + with pytest.raises(TypeError, match='^The `get_core_process` method can only be used for a GUI process$'): + process_manager.current_process.get_core_process() diff --git a/src/tribler/gui/core_manager.py b/src/tribler/gui/core_manager.py index 37a91b1ee42..95102b01a91 100644 --- a/src/tribler/gui/core_manager.py +++ b/src/tribler/gui/core_manager.py @@ -2,20 +2,25 @@ import os import re import sys +import time from collections import deque from pathlib import Path from typing import Optional -from PyQt5.QtCore import QObject, QProcess, QProcessEnvironment +from PyQt5.QtCore import QObject, QProcess, QProcessEnvironment, QTimer +from tribler.core.utilities.process_manager import ProcessManager from tribler.gui import gui_sentry_reporter from tribler.gui.app_manager import AppManager from tribler.gui.event_request_manager import EventRequestManager -from tribler.gui.exceptions import CoreCrashedError -from tribler.gui.tribler_request_manager import ShutdownRequest +from tribler.gui.exceptions import CoreConnectTimeoutError, CoreCrashedError +from tribler.gui.tribler_request_manager import ShutdownRequest, request_manager from tribler.gui.utilities import connect +API_PORT_CHECK_INTERVAL = 100 # 0.1 seconds between attempts to retrieve Core API port +API_PORT_CHECK_TIMEOUT = 30 # Stop trying to determine API port after 30 seconds + CORE_OUTPUT_DEQUE_LENGTH = 10 @@ -25,8 +30,8 @@ class CoreManager(QObject): a fake API will be started. """ - def __init__(self, root_state_dir: Path, api_port: int, api_key: str, app_manager: AppManager, - events_manager: EventRequestManager): + def __init__(self, root_state_dir: Path, api_port: int, api_key: str, + app_manager: AppManager, process_manager: ProcessManager, events_manager: EventRequestManager): QObject.__init__(self, None) self._logger = logging.getLogger(self.__class__.__name__) @@ -35,6 +40,12 @@ def __init__(self, root_state_dir: Path, api_port: int, api_key: str, app_manage self.core_process: Optional[QProcess] = None self.api_port = api_port self.api_key = api_key + + self.process_manager = process_manager + self.check_core_api_port_timer = QTimer() + self.check_core_api_port_timer.setSingleShot(True) + connect(self.check_core_api_port_timer.timeout, self.check_core_api_port) + self.events_manager = events_manager self.upgrade_manager = None @@ -42,6 +53,7 @@ def __init__(self, root_state_dir: Path, api_port: int, api_key: str, app_manage self.core_env = None self.core_started = False + self.core_started_at: Optional[int] = None self.core_running = False self.core_connected = False self.shutting_down = False @@ -118,9 +130,44 @@ def start_tribler_core(self): self.core_process.start(sys.executable, core_args) def on_core_started(self): + self._logger.info("Core process started") self.core_started = True + self.core_started_at = time.time() self.core_running = True - self.events_manager.connect(reschedule_on_err=True) # retry until REST API is ready + self.check_core_api_port() + + def check_core_api_port(self, *args): + """ + Determines the actual REST API port of the Core process. + + This function is first executed from the `on_core_started` after the physical Core process starts and then + repeatedly executed after API_PORT_CHECK_INTERVAL milliseconds until it retrieves the REST API port value from + the Core process. Shortly after the Core process starts, it adds itself to a process database. At that moment, + the api_port value in the database is not specified yet for the Core process. Then the Core REST manager finds + a suitable port and sets the api_port value in the process database. After that, the `check_core_api_port` + method retrieves the api_port value from the database and asks EventRequestManager to connect to that port. + """ + if not self.core_running or self.core_connected or self.shutting_down: + return + + core_process = self.process_manager.current_process.get_core_process() + if core_process is not None and core_process.api_port: + api_port = core_process.api_port + self._logger.info(f"Got REST API port value from the Core process: {api_port}") + if api_port != self.api_port: + self.api_port = api_port + request_manager.port = api_port + self.events_manager.set_api_port(api_port) + # Previously it was necessary to reschedule on error because `events_manager.connect()` was executed + # before the REST API was available, so it retried until the REST API was ready. Now the API is ready + # to use when we can read the api_port value from the database, so now we can call + # events_manager.connect(reschedule_on_err=False). I kept reschedule_on_err=True just for reinsurance. + self.events_manager.connect(reschedule_on_err=True) + + elif time.time() - self.core_started_at > API_PORT_CHECK_TIMEOUT: + raise CoreConnectTimeoutError(f"Can't get Core API port value within {API_PORT_CHECK_TIMEOUT} seconds") + else: + self.check_core_api_port_timer.start(API_PORT_CHECK_INTERVAL) def on_core_stdout_read_ready(self): if self.app_manager.quitting_app: @@ -176,7 +223,7 @@ def stop(self, quit_app_on_core_finished=True): if not self.core_connected: # If Core is not connected via events_manager it also most probably cannot process API requests. self._logger.warning('Core is not connected during the CoreManager shutdown, killing it...') - self.kill_core_process_and_remove_the_lock_file() + self.kill_core_process() return self.events_manager.shutting_down = True diff --git a/src/tribler/gui/event_request_manager.py b/src/tribler/gui/event_request_manager.py index 313afa6840c..12594f3b7c8 100644 --- a/src/tribler/gui/event_request_manager.py +++ b/src/tribler/gui/event_request_manager.py @@ -37,9 +37,9 @@ class EventRequestManager(QNetworkAccessManager): def __init__(self, api_port, api_key, error_handler): QNetworkAccessManager.__init__(self) - url = QUrl("http://localhost:%d/events" % api_port) - self.request = QNetworkRequest(url) - self.request.setRawHeader(b'X-Api-Key', api_key.encode('ascii')) + self.api_port = api_port + self.api_key = api_key + self.request = self.create_request() self.start_time = time.time() self.connect_timer = QTimer() self.current_event_string = "" @@ -65,6 +65,16 @@ def __init__(self, api_port, api_key, error_handler): notifier.add_observer(notifications.tribler_shutdown_state, self.on_tribler_shutdown_state) notifier.add_observer(notifications.report_config_error, self.on_report_config_error) + def create_request(self) -> QNetworkRequest: + url = QUrl(f"http://localhost:{self.api_port}/events") + request = QNetworkRequest(url) + request.setRawHeader(b'X-Api-Key', self.api_key.encode('ascii')) + return request + + def set_api_port(self, api_port: int): + self.api_port = api_port + self.request = self.create_request() + def on_events_start(self, public_key: str, version: str): # if public key format is changed, don't forget to change it at the core side as well if public_key: diff --git a/src/tribler/gui/tests/test_core_manager.py b/src/tribler/gui/tests/test_core_manager.py index 9764a976ec2..cdc709950a3 100644 --- a/src/tribler/gui/tests/test_core_manager.py +++ b/src/tribler/gui/tests/test_core_manager.py @@ -1,22 +1,92 @@ import errno import sys +import time from unittest.mock import MagicMock, patch import pytest from tribler.gui.core_manager import CoreCrashedError, CoreManager +from tribler.gui.exceptions import CoreConnectTimeoutError @pytest.fixture(name='core_manager') def fixture_core_manager(): core_manager = CoreManager(root_state_dir=MagicMock(), api_port=MagicMock(), api_key=MagicMock(), - app_manager=MagicMock(), - events_manager=MagicMock()) + app_manager=MagicMock(), process_manager=MagicMock(), events_manager=MagicMock()) core_manager.core_process = MagicMock(readAllStandardOutput=MagicMock(return_value=b'core stdout'), readAllStandardError=MagicMock(return_value=b'core stderr')) + core_manager.check_core_api_port_timer = MagicMock() return core_manager +def test_on_core_started_calls_check_core_api_port(core_manager): + assert not core_manager.core_running + assert not core_manager.core_started + assert core_manager.core_started_at is None + with patch.object(core_manager, 'check_core_api_port') as check_core_api_port: + core_manager.on_core_started() + assert check_core_api_port.called + + +def test_check_core_api_port_not_running(core_manager): + assert not core_manager.core_running + core_manager.check_core_api_port() + assert not core_manager.process_manager.current_process.get_core_process.called + + +def test_check_core_api_port_already_connected(core_manager): + core_manager.core_running = True + core_manager.core_connected = True + core_manager.check_core_api_port() + assert not core_manager.process_manager.current_process.get_core_process.called + + +def test_check_core_api_port_shutting_down(core_manager): + core_manager.core_running = True + core_manager.shutting_down = True + core_manager.check_core_api_port() + assert not core_manager.process_manager.current_process.get_core_process.called + + +def test_check_core_api_port_core_process_not_found(core_manager): + core_manager.core_running = True + core_manager.core_started_at = time.time() + core_manager.process_manager.current_process.get_core_process.return_value = None + core_manager.check_core_api_port() + assert core_manager.process_manager.current_process.get_core_process.called + assert core_manager.check_core_api_port_timer.start.called + + +def test_check_core_api_port_not_set(core_manager): + core_manager.core_running = True + core_manager.core_started_at = time.time() + core_manager.process_manager.current_process.get_core_process().api_port = None + core_manager.check_core_api_port() + assert core_manager.process_manager.current_process.get_core_process.called + assert core_manager.check_core_api_port_timer.start.called + + +@patch('tribler.gui.core_manager.request_manager') +def test_check_core_api_port(request_manager: MagicMock, core_manager): + core_manager.core_running = True + core_manager.core_started_at = time.time() + api_port = core_manager.process_manager.current_process.get_core_process().api_port + core_manager.check_core_api_port() + assert core_manager.process_manager.current_process.get_core_process.called + assert not core_manager.check_core_api_port_timer.start.called + assert core_manager.api_port == api_port + assert request_manager.port == api_port + + +def test_check_core_api_port_timeout(core_manager): + core_manager.core_running = True + # The timeout should be 30 seconds so let's pretend the core started 31 seconds before now + core_manager.core_started_at = time.time() - 31 + core_manager.process_manager.current_process.get_core_process.return_value = None + with pytest.raises(CoreConnectTimeoutError, match="^Can't get Core API port value within 30 seconds$"): + core_manager.check_core_api_port() + + def test_on_core_finished_calls_quit_application(core_manager): # test that in case of `shutting_down` and `should_quit_app_on_core_finished` flags have been set to True # then `quit_application` method will be called and Exception will not be raised diff --git a/src/tribler/gui/tribler_window.py b/src/tribler/gui/tribler_window.py index 19a978bc01b..e76b5ff610e 100644 --- a/src/tribler/gui/tribler_window.py +++ b/src/tribler/gui/tribler_window.py @@ -202,7 +202,8 @@ def __init__( self.error_handler = ErrorHandler(self) self.events_manager = EventRequestManager(api_port, api_key, self.error_handler) - self.core_manager = CoreManager(self.root_state_dir, api_port, api_key, app_manager, self.events_manager) + self.core_manager = CoreManager(self.root_state_dir, api_port, api_key, + app_manager, process_manager, self.events_manager) self.version_history = VersionHistory(self.root_state_dir) self.upgrade_manager = UpgradeManager(self.version_history) self.pending_requests = {} @@ -1212,7 +1213,7 @@ def dropEvent(self, e): e.accept() def clicked_force_shutdown(self): - self.core_manager.kill_core_process_and_remove_the_lock_file() + self.core_manager.kill_core_process() self.app_manager.quit_application() def clicked_skip_conversion(self):