Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎨 improve investigate log streaming #5330

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ def get_log_distributor(app: Annotated[FastAPI, Depends(get_app)]) -> LogDistrib
return cast(LogDistributor, app.state.log_distributor)


def get_max_log_check_seconds(
app: Annotated[FastAPI, Depends(get_app)]
) -> NonNegativeInt:
def get_log_check_timeout(app: Annotated[FastAPI, Depends(get_app)]) -> NonNegativeInt:
assert app.state.settings # nosec
return cast(NonNegativeInt, app.state.settings.API_SERVER_MAX_LOG_CHECK_SECONDS)
return cast(NonNegativeInt, app.state.settings.API_SERVER_LOG_CHECK_TIMEOUT_SECONDS)
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from typing import Annotated
from uuid import UUID

from fastapi import APIRouter, Depends, status
from fastapi import APIRouter, Depends, Request, status
from fastapi.exceptions import HTTPException
from fastapi.responses import RedirectResponse
from fastapi_pagination.api import create_page
Expand All @@ -17,6 +17,7 @@
from models_library.users import UserID
from pydantic import NonNegativeInt
from pydantic.types import PositiveInt
from servicelib.fastapi.requests_decorators import cancel_on_disconnect
from servicelib.logging_utils import log_context
from starlette.background import BackgroundTask

Expand All @@ -35,7 +36,7 @@
from ..dependencies.application import get_reverse_url_mapper
from ..dependencies.authentication import get_current_user_id, get_product_name
from ..dependencies.database import Engine, get_db_engine
from ..dependencies.rabbitmq import get_log_distributor, get_max_log_check_seconds
from ..dependencies.rabbitmq import get_log_check_timeout, get_log_distributor
from ..dependencies.services import get_api_client
from ..dependencies.webserver import AuthSession, get_webserver_session
from ..errors.custom_errors import InsufficientCredits, MissingWallet
Expand Down Expand Up @@ -380,17 +381,17 @@ async def get_job_pricing_unit(
response_class=LogStreamingResponse,
include_in_schema=API_SERVER_DEV_FEATURES_ENABLED,
)
@cancel_on_disconnect
async def get_log_stream(
request: Request,
solver_key: SolverKeyId,
version: VersionStr,
job_id: JobID,
webserver_api: Annotated[AuthSession, Depends(get_webserver_session)],
director2_api: Annotated[DirectorV2Api, Depends(get_api_client(DirectorV2Api))],
log_distributor: Annotated[LogDistributor, Depends(get_log_distributor)],
user_id: Annotated[UserID, Depends(get_current_user_id)],
max_log_check_seconds: Annotated[
NonNegativeInt, Depends(get_max_log_check_seconds)
],
log_check_timeout: Annotated[NonNegativeInt, Depends(get_log_check_timeout)],
):
job_name = _compose_job_resource_name(solver_key, version, job_id)
with log_context(
Expand All @@ -403,7 +404,7 @@ async def get_log_stream(
director2_api=director2_api,
job_id=job_id,
log_distributor=log_distributor,
max_log_check_seconds=max_log_check_seconds,
log_check_timeout=log_check_timeout,
)
await log_streamer.setup()
return LogStreamingResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class ApplicationSettings(BasicSettings):
API_SERVER_DIRECTOR_V2: DirectorV2Settings | None = Field(
auto_default_from_env=True
)
API_SERVER_MAX_LOG_CHECK_SECONDS: NonNegativeInt = 30
API_SERVER_LOG_CHECK_TIMEOUT_SECONDS: NonNegativeInt = 3 * 60
bisgaard-itis marked this conversation as resolved.
Show resolved Hide resolved
API_SERVER_PROMETHEUS_INSTRUMENTATION_ENABLED: bool = True
# DEV-TOOLS
API_SERVER_DEV_HTTP_CALLS_LOGS_PATH: Path | None = Field(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
import logging
from asyncio import Queue
from datetime import datetime, timezone
from typing import AsyncIterable, Awaitable, Callable, Final

from models_library.rabbitmq_messages import LoggerRabbitMessage
Expand Down Expand Up @@ -110,15 +109,15 @@ def __init__(
director2_api: DirectorV2Api,
job_id: JobID,
log_distributor: LogDistributor,
max_log_check_seconds: NonNegativeInt,
log_check_timeout: NonNegativeInt,
):
self._user_id = user_id
self._director2_api = director2_api
self._queue: Queue[JobLog] = Queue()
self._job_id: JobID = job_id
self._log_distributor: LogDistributor = log_distributor
self._is_registered: bool = False
self._max_log_check_seconds: NonNegativeInt = max_log_check_seconds
self._log_check_timeout: NonNegativeInt = log_check_timeout

async def setup(self):
await self._log_distributor.register(self._job_id, self._queue.put)
Expand All @@ -137,26 +136,19 @@ async def __aexit__(self, exc_type, exc, tb):

async def _project_done(self) -> bool:
task = await self._director2_api.get_computation(self._job_id, self._user_id)
return not task.stopped is None
return task.stopped is not None

async def log_generator(self) -> AsyncIterable[str]:
if not self._is_registered:
raise LogStreamerNotRegistered(
f"LogStreamer for job_id={self._job_id} is not correctly registered"
)
last_log_time: datetime | None = None
while True:
while self._queue.empty():
if await self._project_done():
return
await asyncio.sleep(
0.2
if last_log_time is None
else min(
(datetime.now(tz=timezone.utc) - last_log_time).total_seconds(),
self._max_log_check_seconds,
)
done: bool = False
while not done:
try:
log: JobLog = await asyncio.wait_for(
self._queue.get(), timeout=self._log_check_timeout
)
log: JobLog = await self._queue.get()
last_log_time = datetime.now(tz=timezone.utc)
yield log.json() + _NEW_LINE
yield log.json() + _NEW_LINE
except asyncio.TimeoutError:
done = await self._project_done()
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ def fake_project_for_streaming(
yield fake_project


@pytest.mark.parametrize("disconnect", [True, False])
async def test_log_streaming(
app: FastAPI,
auth: httpx.BasicAuth,
Expand All @@ -96,6 +97,7 @@ async def test_log_streaming(
fake_log_distributor,
fake_project_for_streaming: ProjectGet,
mocked_directorv2_service: MockRouter,
disconnect: bool,
):

job_id: JobID = fake_project_for_streaming.uuid
Expand All @@ -107,10 +109,13 @@ async def test_log_streaming(
auth=auth,
) as response:
response.raise_for_status()
async for line in response.aiter_lines():
job_log = JobLog.parse_raw(line)
pprint(job_log.json())
collected_messages += job_log.messages
if not disconnect:
async for line in response.aiter_lines():
job_log = JobLog.parse_raw(line)
pprint(job_log.json())
collected_messages += job_log.messages

assert fake_log_distributor.deregister_is_called

assert (
collected_messages
Expand Down
6 changes: 3 additions & 3 deletions services/api-server/tests/unit/test_services_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ def _get_computation(request: httpx.Request, **kwargs) -> httpx.Response:
director2_api=d2_client,
job_id=project_id,
log_distributor=log_distributor,
max_log_check_seconds=1,
log_check_timeout=1,
) as log_streamer:
yield log_streamer

Expand Down Expand Up @@ -393,7 +393,7 @@ async def test_log_generator(mocker: MockFixture, faker: Faker):
"simcore_service_api_server.services.log_streaming.LogStreamer._project_done",
return_value=True,
)
log_streamer = LogStreamer(user_id=3, director2_api=None, job_id=None, log_distributor=None, max_log_check_seconds=1) # type: ignore
log_streamer = LogStreamer(user_id=3, director2_api=None, job_id=None, log_distributor=None, log_check_timeout=1) # type: ignore
log_streamer._is_registered = True

published_logs: list[str] = []
Expand All @@ -414,7 +414,7 @@ async def test_log_generator(mocker: MockFixture, faker: Faker):


async def test_log_generator_context(mocker: MockFixture, faker: Faker):
log_streamer = LogStreamer(user_id=3, director2_api=None, job_id=None, log_distributor=None, max_log_check_seconds=1) # type: ignore
log_streamer = LogStreamer(user_id=3, director2_api=None, job_id=None, log_distributor=None, log_check_timeout=1) # type: ignore
with pytest.raises(LogStreamerNotRegistered):
async for log in log_streamer.log_generator():
print(log)
Loading