Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass the number of the opened REST API port from Core to GUI #7251

Merged
merged 2 commits into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/tribler/core/components/restapi/rest/rest_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ async def start(self):
"""
Starts the HTTP API with the listen port as specified in the session configuration.
"""
self._logger.info(f'Start {self.http_host}:{self.config.http_port}')
self._logger.info(f'An attempt to start REST API on {self.http_host}:{self.config.http_port}')

# Not using setup_aiohttp_apispec here, as we need access to the APISpec to set the security scheme
aiohttp_apispec = AiohttpApiSpec(
Expand Down Expand Up @@ -135,6 +135,7 @@ async def start(self):
self.set_api_port(api_port)
await self.site.start()
else:
self._logger.info(f"Searching for a free port starting from {api_port}")
bind_attempts = 0
while bind_attempts < 10:
try:
Expand Down
2 changes: 1 addition & 1 deletion src/tribler/core/components/restapi/rest/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ class APISettings(TriblerConfigSection):
https_port: int = -1
https_certfile: str = ''
key: Optional[str] = None
retry_port: bool = False
retry_port: bool = True

_port_validator = validator('http_port', 'https_port', allow_reuse=True)(validate_port_with_minus_one)
18 changes: 18 additions & 0 deletions src/tribler/core/utilities/process_manager/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import psutil

from tribler.core.utilities.process_manager import sql_scripts
from tribler.core.utilities.process_manager.utils import with_retry
from tribler.core.version import version_id

Expand Down Expand Up @@ -203,3 +204,20 @@ def _update(self, connection: sqlite3.Connection):
self.rowid, prev_version, self.pid, self.kind.value, self.app_version, self.started_at])
if cursor.rowcount == 0:
self.logger.error(f'Row {self.rowid} with row version {prev_version} was not found')

def get_core_process(self) -> Optional[TriblerProcess]:
"""
Returns Core process created by the current GUI process, or None if the Core process was not found in the DB.
"""
if self.kind != ProcessKind.GUI:
raise TypeError('The `get_core_process` method can only be used for a GUI process')

with self.manager.connect() as connection:
cursor = connection.execute(f"""
SELECT {sql_scripts.SELECT_COLUMNS}
FROM processes WHERE "primary" = 1 and kind = ? and creator_pid = ?
""", [ProcessKind.Core.value, self.pid])
rows = cursor.fetchall()
if len(rows) > 1: # should not happen
raise RuntimeError('Multiple Core processes were found for a single GUI process')
return TriblerProcess.from_row(self.manager, rows[0]) if rows else None
49 changes: 49 additions & 0 deletions src/tribler/core/utilities/process_manager/tests/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,52 @@ def test_tribler_process_save(logger_error: Mock, current_process):
p.save()
assert logger_error.called
assert logger_error.call_args[0][0] == 'Row 123 with row version 1 was not found'


@pytest.fixture(name='gui_process')
def gui_process_fixture(process_manager):
gui_process = TriblerProcess(pid=1, kind=ProcessKind.GUI, app_version='v1', started_at=1, manager=process_manager)
gui_process.save()
return gui_process


def test_get_core_process_no_core_process_found(gui_process):
assert gui_process.get_core_process() is None # no associated core processes were found


def test_get_core_process_non_primary(gui_process):
core_process = TriblerProcess(pid=2, kind=ProcessKind.Core, app_version='v1', started_at=1, creator_pid=1,
manager=gui_process.manager)
core_process.save()
assert gui_process.get_core_process() is None # core process should be primary to be selected


def test_get_core_process(gui_process):
core_process = TriblerProcess(pid=2, kind=ProcessKind.Core, app_version='v1', started_at=1, creator_pid=1,
manager=gui_process.manager)
core_process.primary = True
core_process.save()
p = gui_process.get_core_process()
assert p is not None # the core process was found for this GUI process
assert p is not core_process # it is a new object retrieved from the database, not the one created initially
assert p.pid == core_process.pid # it has the correct pid value
assert p.api_port is None # the api port is not specified yet

core_process.set_api_port(123)
p2 = gui_process.get_core_process()
assert p2 is not None # the core process was found for this GUI process
assert p2 is not core_process and p2 is not p # it is a new object retrieved from the database
assert p2.api_port == 123 # it has correct API port value

# Second Core process for the same GUI process should lead to an error
p3 = TriblerProcess(pid=3, kind=ProcessKind.Core, app_version='v1', started_at=1, creator_pid=1, primary=True,
manager=gui_process.manager)
p3.save()
with pytest.raises(RuntimeError, match='^Multiple Core processes were found for a single GUI process$'):
gui_process.get_core_process()


def test_get_core_process_exception(process_manager):
# in the process_manager fixture the current_process is a Core process
with pytest.raises(TypeError, match='^The `get_core_process` method can only be used for a GUI process$'):
process_manager.current_process.get_core_process()
61 changes: 54 additions & 7 deletions src/tribler/gui/core_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,25 @@
import os
import re
import sys
import time
from collections import deque
from pathlib import Path
from typing import Optional

from PyQt5.QtCore import QObject, QProcess, QProcessEnvironment
from PyQt5.QtCore import QObject, QProcess, QProcessEnvironment, QTimer

from tribler.core.utilities.process_manager import ProcessManager
from tribler.gui import gui_sentry_reporter
from tribler.gui.app_manager import AppManager
from tribler.gui.event_request_manager import EventRequestManager
from tribler.gui.exceptions import CoreCrashedError
from tribler.gui.tribler_request_manager import ShutdownRequest
from tribler.gui.exceptions import CoreConnectTimeoutError, CoreCrashedError
from tribler.gui.tribler_request_manager import ShutdownRequest, request_manager
from tribler.gui.utilities import connect


API_PORT_CHECK_INTERVAL = 100 # 0.1 seconds between attempts to retrieve Core API port
API_PORT_CHECK_TIMEOUT = 30 # Stop trying to determine API port after 30 seconds

CORE_OUTPUT_DEQUE_LENGTH = 10


Expand All @@ -25,8 +30,8 @@ class CoreManager(QObject):
a fake API will be started.
"""

def __init__(self, root_state_dir: Path, api_port: int, api_key: str, app_manager: AppManager,
events_manager: EventRequestManager):
def __init__(self, root_state_dir: Path, api_port: int, api_key: str,
app_manager: AppManager, process_manager: ProcessManager, events_manager: EventRequestManager):
QObject.__init__(self, None)

self._logger = logging.getLogger(self.__class__.__name__)
Expand All @@ -35,13 +40,20 @@ def __init__(self, root_state_dir: Path, api_port: int, api_key: str, app_manage
self.core_process: Optional[QProcess] = None
self.api_port = api_port
self.api_key = api_key

self.process_manager = process_manager
self.check_core_api_port_timer = QTimer()
self.check_core_api_port_timer.setSingleShot(True)
connect(self.check_core_api_port_timer.timeout, self.check_core_api_port)

self.events_manager = events_manager

self.upgrade_manager = None
self.core_args = None
self.core_env = None

self.core_started = False
self.core_started_at: Optional[int] = None
self.core_running = False
self.core_connected = False
self.shutting_down = False
Expand Down Expand Up @@ -118,9 +130,44 @@ def start_tribler_core(self):
self.core_process.start(sys.executable, core_args)

def on_core_started(self):
self._logger.info("Core process started")
self.core_started = True
self.core_started_at = time.time()
self.core_running = True
self.events_manager.connect(reschedule_on_err=True) # retry until REST API is ready
self.check_core_api_port()

def check_core_api_port(self, *args):
"""
Determines the actual REST API port of the Core process.

This function is first executed from the `on_core_started` after the physical Core process starts and then
repeatedly executed after API_PORT_CHECK_INTERVAL milliseconds until it retrieves the REST API port value from
the Core process. Shortly after the Core process starts, it adds itself to a process database. At that moment,
the api_port value in the database is not specified yet for the Core process. Then the Core REST manager finds
a suitable port and sets the api_port value in the process database. After that, the `check_core_api_port`
method retrieves the api_port value from the database and asks EventRequestManager to connect to that port.
"""
if not self.core_running or self.core_connected or self.shutting_down:
return

core_process = self.process_manager.current_process.get_core_process()
if core_process is not None and core_process.api_port:
api_port = core_process.api_port
self._logger.info(f"Got REST API port value from the Core process: {api_port}")
if api_port != self.api_port:
self.api_port = api_port
request_manager.port = api_port
self.events_manager.set_api_port(api_port)
# Previously it was necessary to reschedule on error because `events_manager.connect()` was executed
# before the REST API was available, so it retried until the REST API was ready. Now the API is ready
# to use when we can read the api_port value from the database, so now we can call
# events_manager.connect(reschedule_on_err=False). I kept reschedule_on_err=True just for reinsurance.
self.events_manager.connect(reschedule_on_err=True)

elif time.time() - self.core_started_at > API_PORT_CHECK_TIMEOUT:
raise CoreConnectTimeoutError(f"Can't get Core API port value within {API_PORT_CHECK_TIMEOUT} seconds")
else:
self.check_core_api_port_timer.start(API_PORT_CHECK_INTERVAL)

def on_core_stdout_read_ready(self):
if self.app_manager.quitting_app:
Expand Down Expand Up @@ -176,7 +223,7 @@ def stop(self, quit_app_on_core_finished=True):
if not self.core_connected:
# If Core is not connected via events_manager it also most probably cannot process API requests.
self._logger.warning('Core is not connected during the CoreManager shutdown, killing it...')
self.kill_core_process_and_remove_the_lock_file()
self.kill_core_process()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This renaming should actually be in the previous PR... When I extracted the current PR from the previous one, I missed this line...

return

self.events_manager.shutting_down = True
Expand Down
16 changes: 13 additions & 3 deletions src/tribler/gui/event_request_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ class EventRequestManager(QNetworkAccessManager):

def __init__(self, api_port, api_key, error_handler):
QNetworkAccessManager.__init__(self)
url = QUrl("http://localhost:%d/events" % api_port)
self.request = QNetworkRequest(url)
self.request.setRawHeader(b'X-Api-Key', api_key.encode('ascii'))
self.api_port = api_port
self.api_key = api_key
self.request = self.create_request()
self.start_time = time.time()
self.connect_timer = QTimer()
self.current_event_string = ""
Expand All @@ -65,6 +65,16 @@ def __init__(self, api_port, api_key, error_handler):
notifier.add_observer(notifications.tribler_shutdown_state, self.on_tribler_shutdown_state)
notifier.add_observer(notifications.report_config_error, self.on_report_config_error)

def create_request(self) -> QNetworkRequest:
url = QUrl(f"http://localhost:{self.api_port}/events")
request = QNetworkRequest(url)
request.setRawHeader(b'X-Api-Key', self.api_key.encode('ascii'))
return request

def set_api_port(self, api_port: int):
self.api_port = api_port
self.request = self.create_request()

def on_events_start(self, public_key: str, version: str):
# if public key format is changed, don't forget to change it at the core side as well
if public_key:
Expand Down
74 changes: 72 additions & 2 deletions src/tribler/gui/tests/test_core_manager.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,92 @@
import errno
import sys
import time
from unittest.mock import MagicMock, patch

import pytest

from tribler.gui.core_manager import CoreCrashedError, CoreManager
from tribler.gui.exceptions import CoreConnectTimeoutError


@pytest.fixture(name='core_manager')
def fixture_core_manager():
core_manager = CoreManager(root_state_dir=MagicMock(), api_port=MagicMock(), api_key=MagicMock(),
app_manager=MagicMock(),
events_manager=MagicMock())
app_manager=MagicMock(), process_manager=MagicMock(), events_manager=MagicMock())
core_manager.core_process = MagicMock(readAllStandardOutput=MagicMock(return_value=b'core stdout'),
readAllStandardError=MagicMock(return_value=b'core stderr'))
core_manager.check_core_api_port_timer = MagicMock()
return core_manager


def test_on_core_started_calls_check_core_api_port(core_manager):
assert not core_manager.core_running
assert not core_manager.core_started
assert core_manager.core_started_at is None
with patch.object(core_manager, 'check_core_api_port') as check_core_api_port:
core_manager.on_core_started()
assert check_core_api_port.called


def test_check_core_api_port_not_running(core_manager):
assert not core_manager.core_running
core_manager.check_core_api_port()
assert not core_manager.process_manager.current_process.get_core_process.called


def test_check_core_api_port_already_connected(core_manager):
core_manager.core_running = True
core_manager.core_connected = True
core_manager.check_core_api_port()
assert not core_manager.process_manager.current_process.get_core_process.called


def test_check_core_api_port_shutting_down(core_manager):
core_manager.core_running = True
core_manager.shutting_down = True
core_manager.check_core_api_port()
assert not core_manager.process_manager.current_process.get_core_process.called


def test_check_core_api_port_core_process_not_found(core_manager):
core_manager.core_running = True
core_manager.core_started_at = time.time()
core_manager.process_manager.current_process.get_core_process.return_value = None
core_manager.check_core_api_port()
assert core_manager.process_manager.current_process.get_core_process.called
assert core_manager.check_core_api_port_timer.start.called


def test_check_core_api_port_not_set(core_manager):
core_manager.core_running = True
core_manager.core_started_at = time.time()
core_manager.process_manager.current_process.get_core_process().api_port = None
core_manager.check_core_api_port()
assert core_manager.process_manager.current_process.get_core_process.called
assert core_manager.check_core_api_port_timer.start.called


@patch('tribler.gui.core_manager.request_manager')
def test_check_core_api_port(request_manager: MagicMock, core_manager):
core_manager.core_running = True
core_manager.core_started_at = time.time()
api_port = core_manager.process_manager.current_process.get_core_process().api_port
core_manager.check_core_api_port()
assert core_manager.process_manager.current_process.get_core_process.called
assert not core_manager.check_core_api_port_timer.start.called
assert core_manager.api_port == api_port
assert request_manager.port == api_port


def test_check_core_api_port_timeout(core_manager):
core_manager.core_running = True
# The timeout should be 30 seconds so let's pretend the core started 31 seconds before now
core_manager.core_started_at = time.time() - 31
core_manager.process_manager.current_process.get_core_process.return_value = None
with pytest.raises(CoreConnectTimeoutError, match="^Can't get Core API port value within 30 seconds$"):
core_manager.check_core_api_port()


def test_on_core_finished_calls_quit_application(core_manager):
# test that in case of `shutting_down` and `should_quit_app_on_core_finished` flags have been set to True
# then `quit_application` method will be called and Exception will not be raised
Expand Down
5 changes: 3 additions & 2 deletions src/tribler/gui/tribler_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ def __init__(

self.error_handler = ErrorHandler(self)
self.events_manager = EventRequestManager(api_port, api_key, self.error_handler)
self.core_manager = CoreManager(self.root_state_dir, api_port, api_key, app_manager, self.events_manager)
self.core_manager = CoreManager(self.root_state_dir, api_port, api_key,
app_manager, process_manager, self.events_manager)
self.version_history = VersionHistory(self.root_state_dir)
self.upgrade_manager = UpgradeManager(self.version_history)
self.pending_requests = {}
Expand Down Expand Up @@ -1212,7 +1213,7 @@ def dropEvent(self, e):
e.accept()

def clicked_force_shutdown(self):
self.core_manager.kill_core_process_and_remove_the_lock_file()
self.core_manager.kill_core_process()
self.app_manager.quit_application()

def clicked_skip_conversion(self):
Expand Down