From e0fbed2703a26d681d7c2da86c45a35ff601b9a4 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Mon, 12 Feb 2024 15:13:27 +0100 Subject: [PATCH 1/6] cancel logstreaming on disconnect --- .../api/routes/solvers_jobs_getters.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_getters.py b/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_getters.py index e7944365043..385e87ab248 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_getters.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_getters.py @@ -6,7 +6,7 @@ from typing import Annotated from uuid import UUID -from fastapi import APIRouter, Depends, status +from fastapi import APIRouter, Depends, Request, status from fastapi.exceptions import HTTPException from fastapi.responses import RedirectResponse from fastapi_pagination.api import create_page @@ -17,6 +17,7 @@ from models_library.users import UserID from pydantic import NonNegativeInt from pydantic.types import PositiveInt +from servicelib.fastapi.requests_decorators import cancel_on_disconnect from servicelib.logging_utils import log_context from starlette.background import BackgroundTask @@ -380,7 +381,9 @@ async def get_job_pricing_unit( response_class=LogStreamingResponse, include_in_schema=API_SERVER_DEV_FEATURES_ENABLED, ) +@cancel_on_disconnect async def get_log_stream( + request: Request, solver_key: SolverKeyId, version: VersionStr, job_id: JobID, From 3af5d08daaba3be980aeca2abc9043877602795d Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Tue, 13 Feb 2024 10:43:55 +0100 Subject: [PATCH 2/6] improve design of log_generator --- .../api/dependencies/rabbitmq.py | 6 ++--- .../api/routes/solvers_jobs_getters.py | 8 +++--- .../core/settings.py | 2 +- .../services/log_streaming.py | 26 +++++++------------ .../tests/unit/test_services_rabbitmq.py | 6 ++--- 5 files changed, 18 insertions(+), 30 deletions(-) diff --git a/services/api-server/src/simcore_service_api_server/api/dependencies/rabbitmq.py b/services/api-server/src/simcore_service_api_server/api/dependencies/rabbitmq.py index ca96e10a3f2..daf9272af4a 100644 --- a/services/api-server/src/simcore_service_api_server/api/dependencies/rabbitmq.py +++ b/services/api-server/src/simcore_service_api_server/api/dependencies/rabbitmq.py @@ -18,8 +18,6 @@ def get_log_distributor(app: Annotated[FastAPI, Depends(get_app)]) -> LogDistrib return cast(LogDistributor, app.state.log_distributor) -def get_max_log_check_seconds( - app: Annotated[FastAPI, Depends(get_app)] -) -> NonNegativeInt: +def get_log_check_timeout(app: Annotated[FastAPI, Depends(get_app)]) -> NonNegativeInt: assert app.state.settings # nosec - return cast(NonNegativeInt, app.state.settings.API_SERVER_MAX_LOG_CHECK_SECONDS) + return cast(NonNegativeInt, app.state.settings.API_SERVER_LOG_CHECK_TIMEOUT_SECONDS) diff --git a/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_getters.py b/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_getters.py index 385e87ab248..3f81daf7296 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_getters.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_getters.py @@ -36,7 +36,7 @@ from ..dependencies.application import get_reverse_url_mapper from ..dependencies.authentication import get_current_user_id, get_product_name from ..dependencies.database import Engine, get_db_engine -from ..dependencies.rabbitmq import get_log_distributor, get_max_log_check_seconds +from ..dependencies.rabbitmq import get_log_check_timeout, get_log_distributor from ..dependencies.services import get_api_client from ..dependencies.webserver import AuthSession, get_webserver_session from ..errors.custom_errors import InsufficientCredits, MissingWallet @@ -391,9 +391,7 @@ async def get_log_stream( director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))], log_distributor: Annotated[LogDistributor, Depends(get_log_distributor)], user_id: Annotated[UserID, Depends(get_current_user_id)], - max_log_check_seconds: Annotated[ - NonNegativeInt, Depends(get_max_log_check_seconds) - ], + log_check_timeout: Annotated[NonNegativeInt, Depends(get_log_check_timeout)], ): job_name = _compose_job_resource_name(solver_key, version, job_id) with log_context( @@ -406,7 +404,7 @@ async def get_log_stream( director2_api=director2_api, job_id=job_id, log_distributor=log_distributor, - max_log_check_seconds=max_log_check_seconds, + log_check_timeout=log_check_timeout, ) await log_streamer.setup() return LogStreamingResponse( diff --git a/services/api-server/src/simcore_service_api_server/core/settings.py b/services/api-server/src/simcore_service_api_server/core/settings.py index 39bfb48504b..0a99efab05c 100644 --- a/services/api-server/src/simcore_service_api_server/core/settings.py +++ b/services/api-server/src/simcore_service_api_server/core/settings.py @@ -137,7 +137,7 @@ class ApplicationSettings(BasicSettings): API_SERVER_DIRECTOR_V2: DirectorV2Settings | None = Field( auto_default_from_env=True ) - API_SERVER_MAX_LOG_CHECK_SECONDS: NonNegativeInt = 30 + API_SERVER_LOG_CHECK_TIMEOUT_SECONDS: NonNegativeInt = 3 * 60 API_SERVER_PROMETHEUS_INSTRUMENTATION_ENABLED: bool = True # DEV-TOOLS API_SERVER_DEV_HTTP_CALLS_LOGS_PATH: Path | None = Field( diff --git a/services/api-server/src/simcore_service_api_server/services/log_streaming.py b/services/api-server/src/simcore_service_api_server/services/log_streaming.py index 271ab521766..4a445999840 100644 --- a/services/api-server/src/simcore_service_api_server/services/log_streaming.py +++ b/services/api-server/src/simcore_service_api_server/services/log_streaming.py @@ -1,7 +1,6 @@ import asyncio import logging from asyncio import Queue -from datetime import datetime, timezone from typing import AsyncIterable, Awaitable, Callable, Final from models_library.rabbitmq_messages import LoggerRabbitMessage @@ -110,7 +109,7 @@ def __init__( director2_api: DirectorV2Api, job_id: JobID, log_distributor: LogDistributor, - max_log_check_seconds: NonNegativeInt, + log_check_timeout: NonNegativeInt, ): self._user_id = user_id self._director2_api = director2_api @@ -118,7 +117,7 @@ def __init__( self._job_id: JobID = job_id self._log_distributor: LogDistributor = log_distributor self._is_registered: bool = False - self._max_log_check_seconds: NonNegativeInt = max_log_check_seconds + self._log_check_timeout: NonNegativeInt = log_check_timeout async def setup(self): await self._log_distributor.register(self._job_id, self._queue.put) @@ -137,26 +136,19 @@ async def __aexit__(self, exc_type, exc, tb): async def _project_done(self) -> bool: task = await self._director2_api.get_computation(self._job_id, self._user_id) - return not task.stopped is None + return not (task.stopped is None) async def log_generator(self) -> AsyncIterable[str]: if not self._is_registered: raise LogStreamerNotRegistered( f"LogStreamer for job_id={self._job_id} is not correctly registered" ) - last_log_time: datetime | None = None while True: - while self._queue.empty(): + try: + log: JobLog = await asyncio.wait_for( + self._queue.get(), timeout=self._log_check_timeout + ) + yield log.json() + _NEW_LINE + except asyncio.TimeoutError: if await self._project_done(): return - await asyncio.sleep( - 0.2 - if last_log_time is None - else min( - (datetime.now(tz=timezone.utc) - last_log_time).total_seconds(), - self._max_log_check_seconds, - ) - ) - log: JobLog = await self._queue.get() - last_log_time = datetime.now(tz=timezone.utc) - yield log.json() + _NEW_LINE diff --git a/services/api-server/tests/unit/test_services_rabbitmq.py b/services/api-server/tests/unit/test_services_rabbitmq.py index ee42af20fdc..d5e2cf71a66 100644 --- a/services/api-server/tests/unit/test_services_rabbitmq.py +++ b/services/api-server/tests/unit/test_services_rabbitmq.py @@ -348,7 +348,7 @@ def _get_computation(request: httpx.Request, **kwargs) -> httpx.Response: director2_api=d2_client, job_id=project_id, log_distributor=log_distributor, - max_log_check_seconds=1, + log_check_timeout=1, ) as log_streamer: yield log_streamer @@ -393,7 +393,7 @@ async def test_log_generator(mocker: MockFixture, faker: Faker): "simcore_service_api_server.services.log_streaming.LogStreamer._project_done", return_value=True, ) - log_streamer = LogStreamer(user_id=3, director2_api=None, job_id=None, log_distributor=None, max_log_check_seconds=1) # type: ignore + log_streamer = LogStreamer(user_id=3, director2_api=None, job_id=None, log_distributor=None, log_check_timeout=1) # type: ignore log_streamer._is_registered = True published_logs: list[str] = [] @@ -414,7 +414,7 @@ async def test_log_generator(mocker: MockFixture, faker: Faker): async def test_log_generator_context(mocker: MockFixture, faker: Faker): - log_streamer = LogStreamer(user_id=3, director2_api=None, job_id=None, log_distributor=None, max_log_check_seconds=1) # type: ignore + log_streamer = LogStreamer(user_id=3, director2_api=None, job_id=None, log_distributor=None, log_check_timeout=1) # type: ignore with pytest.raises(LogStreamerNotRegistered): async for log in log_streamer.log_generator(): print(log) From e77aff06c328ac317e4207ca837a67fbfb30ce41 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Tue, 13 Feb 2024 11:47:35 +0100 Subject: [PATCH 3/6] add test that logdistributor deregisters if client disconnect --- .../test_api_routers_solvers_jobs_logs.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/services/api-server/tests/unit/api_solvers/test_api_routers_solvers_jobs_logs.py b/services/api-server/tests/unit/api_solvers/test_api_routers_solvers_jobs_logs.py index 5958f9661dc..281681be822 100644 --- a/services/api-server/tests/unit/api_solvers/test_api_routers_solvers_jobs_logs.py +++ b/services/api-server/tests/unit/api_solvers/test_api_routers_solvers_jobs_logs.py @@ -87,6 +87,7 @@ def fake_project_for_streaming( yield fake_project +@pytest.mark.parametrize("disconnect", [True, False]) async def test_log_streaming( app: FastAPI, auth: httpx.BasicAuth, @@ -96,6 +97,7 @@ async def test_log_streaming( fake_log_distributor, fake_project_for_streaming: ProjectGet, mocked_directorv2_service: MockRouter, + disconnect: bool, ): job_id: JobID = fake_project_for_streaming.uuid @@ -107,10 +109,13 @@ async def test_log_streaming( auth=auth, ) as response: response.raise_for_status() - async for line in response.aiter_lines(): - job_log = JobLog.parse_raw(line) - pprint(job_log.json()) - collected_messages += job_log.messages + if not disconnect: + async for line in response.aiter_lines(): + job_log = JobLog.parse_raw(line) + pprint(job_log.json()) + collected_messages += job_log.messages + + assert fake_log_distributor.deregister_is_called assert ( collected_messages From 63eb2a14b64170c362e21215aa72a3554523514b Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Tue, 13 Feb 2024 13:09:39 +0100 Subject: [PATCH 4/6] @pcrespov clean up log_generator --- .../simcore_service_api_server/services/log_streaming.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/services/api-server/src/simcore_service_api_server/services/log_streaming.py b/services/api-server/src/simcore_service_api_server/services/log_streaming.py index 4a445999840..379539ccea5 100644 --- a/services/api-server/src/simcore_service_api_server/services/log_streaming.py +++ b/services/api-server/src/simcore_service_api_server/services/log_streaming.py @@ -143,12 +143,12 @@ async def log_generator(self) -> AsyncIterable[str]: raise LogStreamerNotRegistered( f"LogStreamer for job_id={self._job_id} is not correctly registered" ) - while True: + done: bool = False + while not done: try: log: JobLog = await asyncio.wait_for( self._queue.get(), timeout=self._log_check_timeout ) yield log.json() + _NEW_LINE except asyncio.TimeoutError: - if await self._project_done(): - return + done = await self._project_done() From a305429ad78c3e416101ba890fa3e101b567d2a1 Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Tue, 13 Feb 2024 13:10:19 +0100 Subject: [PATCH 5/6] clean up logic @pcrespov --- .../src/simcore_service_api_server/services/log_streaming.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/api-server/src/simcore_service_api_server/services/log_streaming.py b/services/api-server/src/simcore_service_api_server/services/log_streaming.py index 379539ccea5..44e6e2f2f76 100644 --- a/services/api-server/src/simcore_service_api_server/services/log_streaming.py +++ b/services/api-server/src/simcore_service_api_server/services/log_streaming.py @@ -136,7 +136,7 @@ async def __aexit__(self, exc_type, exc, tb): async def _project_done(self) -> bool: task = await self._director2_api.get_computation(self._job_id, self._user_id) - return not (task.stopped is None) + return task.stopped is not None async def log_generator(self) -> AsyncIterable[str]: if not self._is_registered: From 1cb94144e38220a84649b7fb658a8a379385ed0c Mon Sep 17 00:00:00 2001 From: Mads Bisgaard Date: Tue, 13 Feb 2024 13:35:26 +0100 Subject: [PATCH 6/6] make pylint happy --- .../api/routes/solvers_jobs_getters.py | 1 + 1 file changed, 1 insertion(+) diff --git a/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_getters.py b/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_getters.py index 3f81daf7296..7f193f93fc4 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_getters.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_getters.py @@ -1,4 +1,5 @@ # pylint: disable=too-many-arguments +# pylint: disable=W0613 import logging from collections import deque