Skip to content

Commit

Permalink
🎨 improve investigate log streaming (ITISFoundation#5330)
Browse files Browse the repository at this point in the history
  • Loading branch information
bisgaard-itis authored and jsaq007 committed Feb 22, 2024
1 parent 9489c7b commit f6bd884
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ def get_log_distributor(app: Annotated[FastAPI, Depends(get_app)]) -> LogDistrib
return cast(LogDistributor, app.state.log_distributor)


def get_max_log_check_seconds(
app: Annotated[FastAPI, Depends(get_app)]
) -> NonNegativeInt:
def get_log_check_timeout(app: Annotated[FastAPI, Depends(get_app)]) -> NonNegativeInt:
assert app.state.settings # nosec
return cast(NonNegativeInt, app.state.settings.API_SERVER_MAX_LOG_CHECK_SECONDS)
return cast(NonNegativeInt, app.state.settings.API_SERVER_LOG_CHECK_TIMEOUT_SECONDS)
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
# pylint: disable=too-many-arguments
# pylint: disable=W0613

import logging
from collections import deque
from collections.abc import Callable
from typing import Annotated
from uuid import UUID

from fastapi import APIRouter, Depends, status
from fastapi import APIRouter, Depends, Request, status
from fastapi.exceptions import HTTPException
from fastapi.responses import RedirectResponse
from fastapi_pagination.api import create_page
Expand All @@ -17,6 +18,7 @@
from models_library.users import UserID
from pydantic import NonNegativeInt
from pydantic.types import PositiveInt
from servicelib.fastapi.requests_decorators import cancel_on_disconnect
from servicelib.logging_utils import log_context
from starlette.background import BackgroundTask

Expand All @@ -35,7 +37,7 @@
from ..dependencies.application import get_reverse_url_mapper
from ..dependencies.authentication import get_current_user_id, get_product_name
from ..dependencies.database import Engine, get_db_engine
from ..dependencies.rabbitmq import get_log_distributor, get_max_log_check_seconds
from ..dependencies.rabbitmq import get_log_check_timeout, get_log_distributor
from ..dependencies.services import get_api_client
from ..dependencies.webserver import AuthSession, get_webserver_session
from ..errors.custom_errors import InsufficientCredits, MissingWallet
Expand Down Expand Up @@ -380,17 +382,17 @@ async def get_job_pricing_unit(
response_class=LogStreamingResponse,
include_in_schema=API_SERVER_DEV_FEATURES_ENABLED,
)
@cancel_on_disconnect
async def get_log_stream(
request: Request,
solver_key: SolverKeyId,
version: VersionStr,
job_id: JobID,
webserver_api: Annotated[AuthSession, Depends(get_webserver_session)],
director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))],
log_distributor: Annotated[LogDistributor, Depends(get_log_distributor)],
user_id: Annotated[UserID, Depends(get_current_user_id)],
max_log_check_seconds: Annotated[
NonNegativeInt, Depends(get_max_log_check_seconds)
],
log_check_timeout: Annotated[NonNegativeInt, Depends(get_log_check_timeout)],
):
job_name = _compose_job_resource_name(solver_key, version, job_id)
with log_context(
Expand All @@ -403,7 +405,7 @@ async def get_log_stream(
director2_api=director2_api,
job_id=job_id,
log_distributor=log_distributor,
max_log_check_seconds=max_log_check_seconds,
log_check_timeout=log_check_timeout,
)
await log_streamer.setup()
return LogStreamingResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class ApplicationSettings(BasicSettings):
API_SERVER_DIRECTOR_V2: DirectorV2Settings | None = Field(
auto_default_from_env=True
)
API_SERVER_MAX_LOG_CHECK_SECONDS: NonNegativeInt = 30
API_SERVER_LOG_CHECK_TIMEOUT_SECONDS: NonNegativeInt = 3 * 60
API_SERVER_PROMETHEUS_INSTRUMENTATION_ENABLED: bool = True
# DEV-TOOLS
API_SERVER_DEV_HTTP_CALLS_LOGS_PATH: Path | None = Field(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
import logging
from asyncio import Queue
from datetime import datetime, timezone
from typing import AsyncIterable, Awaitable, Callable, Final

from models_library.rabbitmq_messages import LoggerRabbitMessage
Expand Down Expand Up @@ -110,15 +109,15 @@ def __init__(
director2_api: DirectorV2Api,
job_id: JobID,
log_distributor: LogDistributor,
max_log_check_seconds: NonNegativeInt,
log_check_timeout: NonNegativeInt,
):
self._user_id = user_id
self._director2_api = director2_api
self._queue: Queue[JobLog] = Queue()
self._job_id: JobID = job_id
self._log_distributor: LogDistributor = log_distributor
self._is_registered: bool = False
self._max_log_check_seconds: NonNegativeInt = max_log_check_seconds
self._log_check_timeout: NonNegativeInt = log_check_timeout

async def setup(self):
await self._log_distributor.register(self._job_id, self._queue.put)
Expand All @@ -137,26 +136,19 @@ async def __aexit__(self, exc_type, exc, tb):

async def _project_done(self) -> bool:
task = await self._director2_api.get_computation(self._job_id, self._user_id)
return not task.stopped is None
return task.stopped is not None

async def log_generator(self) -> AsyncIterable[str]:
if not self._is_registered:
raise LogStreamerNotRegistered(
f"LogStreamer for job_id={self._job_id} is not correctly registered"
)
last_log_time: datetime | None = None
while True:
while self._queue.empty():
if await self._project_done():
return
await asyncio.sleep(
0.2
if last_log_time is None
else min(
(datetime.now(tz=timezone.utc) - last_log_time).total_seconds(),
self._max_log_check_seconds,
)
done: bool = False
while not done:
try:
log: JobLog = await asyncio.wait_for(
self._queue.get(), timeout=self._log_check_timeout
)
log: JobLog = await self._queue.get()
last_log_time = datetime.now(tz=timezone.utc)
yield log.json() + _NEW_LINE
yield log.json() + _NEW_LINE
except asyncio.TimeoutError:
done = await self._project_done()
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def fake_project_for_streaming(
yield fake_project


@pytest.mark.parametrize("disconnect", [True, False])
async def test_log_streaming(
app: FastAPI,
auth: httpx.BasicAuth,
Expand All @@ -96,6 +97,7 @@ async def test_log_streaming(
fake_log_distributor,
fake_project_for_streaming: ProjectGet,
mocked_directorv2_service: MockRouter,
disconnect: bool,
):

job_id: JobID = fake_project_for_streaming.uuid
Expand All @@ -107,10 +109,13 @@ async def test_log_streaming(
auth=auth,
) as response:
response.raise_for_status()
async for line in response.aiter_lines():
job_log = JobLog.parse_raw(line)
pprint(job_log.json())
collected_messages += job_log.messages
if not disconnect:
async for line in response.aiter_lines():
job_log = JobLog.parse_raw(line)
pprint(job_log.json())
collected_messages += job_log.messages

assert fake_log_distributor.deregister_is_called

assert (
collected_messages
Expand Down
6 changes: 3 additions & 3 deletions services/api-server/tests/unit/test_services_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ def _get_computation(request: httpx.Request, **kwargs) -> httpx.Response:
director2_api=d2_client,
job_id=project_id,
log_distributor=log_distributor,
max_log_check_seconds=1,
log_check_timeout=1,
) as log_streamer:
yield log_streamer

Expand Down Expand Up @@ -393,7 +393,7 @@ async def test_log_generator(mocker: MockFixture, faker: Faker):
"simcore_service_api_server.services.log_streaming.LogStreamer._project_done",
return_value=True,
)
log_streamer = LogStreamer(user_id=3, director2_api=None, job_id=None, log_distributor=None, max_log_check_seconds=1) # type: ignore
log_streamer = LogStreamer(user_id=3, director2_api=None, job_id=None, log_distributor=None, log_check_timeout=1) # type: ignore
log_streamer._is_registered = True

published_logs: list[str] = []
Expand All @@ -414,7 +414,7 @@ async def test_log_generator(mocker: MockFixture, faker: Faker):


async def test_log_generator_context(mocker: MockFixture, faker: Faker):
log_streamer = LogStreamer(user_id=3, director2_api=None, job_id=None, log_distributor=None, max_log_check_seconds=1) # type: ignore
log_streamer = LogStreamer(user_id=3, director2_api=None, job_id=None, log_distributor=None, log_check_timeout=1) # type: ignore
with pytest.raises(LogStreamerNotRegistered):
async for log in log_streamer.log_generator():
print(log)

0 comments on commit f6bd884

Please sign in to comment.