diff --git a/services/api-server/src/simcore_service_api_server/api/dependencies/rabbitmq.py b/services/api-server/src/simcore_service_api_server/api/dependencies/rabbitmq.py index ca96e10a3f2..daf9272af4a 100644 --- a/services/api-server/src/simcore_service_api_server/api/dependencies/rabbitmq.py +++ b/services/api-server/src/simcore_service_api_server/api/dependencies/rabbitmq.py @@ -18,8 +18,6 @@ def get_log_distributor(app: Annotated[FastAPI, Depends(get_app)]) -> LogDistrib return cast(LogDistributor, app.state.log_distributor) -def get_max_log_check_seconds( - app: Annotated[FastAPI, Depends(get_app)] -) -> NonNegativeInt: +def get_log_check_timeout(app: Annotated[FastAPI, Depends(get_app)]) -> NonNegativeInt: assert app.state.settings # nosec - return cast(NonNegativeInt, app.state.settings.API_SERVER_MAX_LOG_CHECK_SECONDS) + return cast(NonNegativeInt, app.state.settings.API_SERVER_LOG_CHECK_TIMEOUT_SECONDS) diff --git a/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_getters.py b/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_getters.py index e7944365043..7f193f93fc4 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_getters.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs_getters.py @@ -1,4 +1,5 @@ # pylint: disable=too-many-arguments +# pylint: disable=W0613 import logging from collections import deque @@ -6,7 +7,7 @@ from typing import Annotated from uuid import UUID -from fastapi import APIRouter, Depends, status +from fastapi import APIRouter, Depends, Request, status from fastapi.exceptions import HTTPException from fastapi.responses import RedirectResponse from fastapi_pagination.api import create_page @@ -17,6 +18,7 @@ from models_library.users import UserID from pydantic import NonNegativeInt from pydantic.types import PositiveInt +from servicelib.fastapi.requests_decorators import cancel_on_disconnect from servicelib.logging_utils import log_context from starlette.background import BackgroundTask @@ -35,7 +37,7 @@ from ..dependencies.application import get_reverse_url_mapper from ..dependencies.authentication import get_current_user_id, get_product_name from ..dependencies.database import Engine, get_db_engine -from ..dependencies.rabbitmq import get_log_distributor, get_max_log_check_seconds +from ..dependencies.rabbitmq import get_log_check_timeout, get_log_distributor from ..dependencies.services import get_api_client from ..dependencies.webserver import AuthSession, get_webserver_session from ..errors.custom_errors import InsufficientCredits, MissingWallet @@ -380,7 +382,9 @@ async def get_job_pricing_unit( response_class=LogStreamingResponse, include_in_schema=API_SERVER_DEV_FEATURES_ENABLED, ) +@cancel_on_disconnect async def get_log_stream( + request: Request, solver_key: SolverKeyId, version: VersionStr, job_id: JobID, @@ -388,9 +392,7 @@ async def get_log_stream( director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))], log_distributor: Annotated[LogDistributor, Depends(get_log_distributor)], user_id: Annotated[UserID, Depends(get_current_user_id)], - max_log_check_seconds: Annotated[ - NonNegativeInt, Depends(get_max_log_check_seconds) - ], + log_check_timeout: Annotated[NonNegativeInt, Depends(get_log_check_timeout)], ): job_name = _compose_job_resource_name(solver_key, version, job_id) with log_context( @@ -403,7 +405,7 @@ async def get_log_stream( director2_api=director2_api, job_id=job_id, log_distributor=log_distributor, - max_log_check_seconds=max_log_check_seconds, + log_check_timeout=log_check_timeout, ) await log_streamer.setup() return LogStreamingResponse( diff --git a/services/api-server/src/simcore_service_api_server/core/settings.py b/services/api-server/src/simcore_service_api_server/core/settings.py index 39bfb48504b..0a99efab05c 100644 --- a/services/api-server/src/simcore_service_api_server/core/settings.py +++ b/services/api-server/src/simcore_service_api_server/core/settings.py @@ -137,7 +137,7 @@ class ApplicationSettings(BasicSettings): API_SERVER_DIRECTOR_V2: DirectorV2Settings | None = Field( auto_default_from_env=True ) - API_SERVER_MAX_LOG_CHECK_SECONDS: NonNegativeInt = 30 + API_SERVER_LOG_CHECK_TIMEOUT_SECONDS: NonNegativeInt = 3 * 60 API_SERVER_PROMETHEUS_INSTRUMENTATION_ENABLED: bool = True # DEV-TOOLS API_SERVER_DEV_HTTP_CALLS_LOGS_PATH: Path | None = Field( diff --git a/services/api-server/src/simcore_service_api_server/services/log_streaming.py b/services/api-server/src/simcore_service_api_server/services/log_streaming.py index 271ab521766..44e6e2f2f76 100644 --- a/services/api-server/src/simcore_service_api_server/services/log_streaming.py +++ b/services/api-server/src/simcore_service_api_server/services/log_streaming.py @@ -1,7 +1,6 @@ import asyncio import logging from asyncio import Queue -from datetime import datetime, timezone from typing import AsyncIterable, Awaitable, Callable, Final from models_library.rabbitmq_messages import LoggerRabbitMessage @@ -110,7 +109,7 @@ def __init__( director2_api: DirectorV2Api, job_id: JobID, log_distributor: LogDistributor, - max_log_check_seconds: NonNegativeInt, + log_check_timeout: NonNegativeInt, ): self._user_id = user_id self._director2_api = director2_api @@ -118,7 +117,7 @@ def __init__( self._job_id: JobID = job_id self._log_distributor: LogDistributor = log_distributor self._is_registered: bool = False - self._max_log_check_seconds: NonNegativeInt = max_log_check_seconds + self._log_check_timeout: NonNegativeInt = log_check_timeout async def setup(self): await self._log_distributor.register(self._job_id, self._queue.put) @@ -137,26 +136,19 @@ async def __aexit__(self, exc_type, exc, tb): async def _project_done(self) -> bool: task = await self._director2_api.get_computation(self._job_id, self._user_id) - return not task.stopped is None + return task.stopped is not None async def log_generator(self) -> AsyncIterable[str]: if not self._is_registered: raise LogStreamerNotRegistered( f"LogStreamer for job_id={self._job_id} is not correctly registered" ) - last_log_time: datetime | None = None - while True: - while self._queue.empty(): - if await self._project_done(): - return - await asyncio.sleep( - 0.2 - if last_log_time is None - else min( - (datetime.now(tz=timezone.utc) - last_log_time).total_seconds(), - self._max_log_check_seconds, - ) + done: bool = False + while not done: + try: + log: JobLog = await asyncio.wait_for( + self._queue.get(), timeout=self._log_check_timeout ) - log: JobLog = await self._queue.get() - last_log_time = datetime.now(tz=timezone.utc) - yield log.json() + _NEW_LINE + yield log.json() + _NEW_LINE + except asyncio.TimeoutError: + done = await self._project_done() diff --git a/services/api-server/tests/unit/api_solvers/test_api_routers_solvers_jobs_logs.py b/services/api-server/tests/unit/api_solvers/test_api_routers_solvers_jobs_logs.py index 5958f9661dc..281681be822 100644 --- a/services/api-server/tests/unit/api_solvers/test_api_routers_solvers_jobs_logs.py +++ b/services/api-server/tests/unit/api_solvers/test_api_routers_solvers_jobs_logs.py @@ -87,6 +87,7 @@ def fake_project_for_streaming( yield fake_project +@pytest.mark.parametrize("disconnect", [True, False]) async def test_log_streaming( app: FastAPI, auth: httpx.BasicAuth, @@ -96,6 +97,7 @@ async def test_log_streaming( fake_log_distributor, fake_project_for_streaming: ProjectGet, mocked_directorv2_service: MockRouter, + disconnect: bool, ): job_id: JobID = fake_project_for_streaming.uuid @@ -107,10 +109,13 @@ async def test_log_streaming( auth=auth, ) as response: response.raise_for_status() - async for line in response.aiter_lines(): - job_log = JobLog.parse_raw(line) - pprint(job_log.json()) - collected_messages += job_log.messages + if not disconnect: + async for line in response.aiter_lines(): + job_log = JobLog.parse_raw(line) + pprint(job_log.json()) + collected_messages += job_log.messages + + assert fake_log_distributor.deregister_is_called assert ( collected_messages diff --git a/services/api-server/tests/unit/test_services_rabbitmq.py b/services/api-server/tests/unit/test_services_rabbitmq.py index ee42af20fdc..d5e2cf71a66 100644 --- a/services/api-server/tests/unit/test_services_rabbitmq.py +++ b/services/api-server/tests/unit/test_services_rabbitmq.py @@ -348,7 +348,7 @@ def _get_computation(request: httpx.Request, **kwargs) -> httpx.Response: director2_api=d2_client, job_id=project_id, log_distributor=log_distributor, - max_log_check_seconds=1, + log_check_timeout=1, ) as log_streamer: yield log_streamer @@ -393,7 +393,7 @@ async def test_log_generator(mocker: MockFixture, faker: Faker): "simcore_service_api_server.services.log_streaming.LogStreamer._project_done", return_value=True, ) - log_streamer = LogStreamer(user_id=3, director2_api=None, job_id=None, log_distributor=None, max_log_check_seconds=1) # type: ignore + log_streamer = LogStreamer(user_id=3, director2_api=None, job_id=None, log_distributor=None, log_check_timeout=1) # type: ignore log_streamer._is_registered = True published_logs: list[str] = [] @@ -414,7 +414,7 @@ async def test_log_generator(mocker: MockFixture, faker: Faker): async def test_log_generator_context(mocker: MockFixture, faker: Faker): - log_streamer = LogStreamer(user_id=3, director2_api=None, job_id=None, log_distributor=None, max_log_check_seconds=1) # type: ignore + log_streamer = LogStreamer(user_id=3, director2_api=None, job_id=None, log_distributor=None, log_check_timeout=1) # type: ignore with pytest.raises(LogStreamerNotRegistered): async for log in log_streamer.log_generator(): print(log)