diff --git a/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/events.py b/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/events.py index 5de8f5ffba06..f08cf98eea09 100644 --- a/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/events.py +++ b/packages/dask-task-models-library/src/dask_task_models_library/container_tasks/events.py @@ -4,6 +4,10 @@ from distributed.worker import get_worker from pydantic import BaseModel, Extra, NonNegativeFloat +from servicelib.logging_utils import LogLevelInt +from simcore_service_dask_sidecar.computational_sidecar.docker_utils import ( + LogMessageStr, +) class BaseTaskEvent(BaseModel, ABC): @@ -46,15 +50,15 @@ class Config(BaseTaskEvent.Config): class TaskLogEvent(BaseTaskEvent): - log: str - log_level: int + log: LogMessageStr + log_level: LogLevelInt @staticmethod def topic_name() -> str: return "task_logs" @classmethod - def from_dask_worker(cls, log: str, log_level: int) -> "TaskLogEvent": + def from_dask_worker(cls, log: str, log_level: LogLevelInt) -> "TaskLogEvent": return cls(job_id=get_worker().get_current_task(), log=log, log_level=log_level) class Config(BaseTaskEvent.Config): diff --git a/packages/models-library/src/models_library/rabbitmq_messages.py b/packages/models-library/src/models_library/rabbitmq_messages.py index b33f1276f2c0..18f95424c9bf 100644 --- a/packages/models-library/src/models_library/rabbitmq_messages.py +++ b/packages/models-library/src/models_library/rabbitmq_messages.py @@ -10,6 +10,10 @@ from models_library.utils.enums import StrAutoEnum from pydantic import BaseModel, Field from pydantic.types import NonNegativeFloat +from servicelib.logging_utils import LogLevelInt +from simcore_service_dask_sidecar.computational_sidecar.docker_utils import ( + LogMessageStr, +) class RabbitEventMessageType(str, Enum): @@ -47,8 +51,8 @@ class NodeMessageBase(ProjectMessageBase): class LoggerRabbitMessage(RabbitMessageBase, NodeMessageBase): channel_name: Literal["simcore.services.logs.v2"] = "simcore.services.logs.v2" - messages: list[str] - log_level: int = logging.INFO + messages: list[LogMessageStr] + log_level: LogLevelInt = logging.INFO def routing_key(self) -> str: return f"{self.project_id}.{self.log_level}" diff --git a/packages/service-library/src/servicelib/logging_utils.py b/packages/service-library/src/servicelib/logging_utils.py index f305f8dba26e..e8791698a301 100644 --- a/packages/service-library/src/servicelib/logging_utils.py +++ b/packages/service-library/src/servicelib/logging_utils.py @@ -12,7 +12,7 @@ from asyncio import iscoroutinefunction from contextlib import contextmanager from inspect import getframeinfo, stack -from typing import Callable, TypedDict +from typing import Callable, TypeAlias, TypedDict log = logging.getLogger(__name__) @@ -249,7 +249,10 @@ def get_log_record_extra(*, user_id: int | str | None = None) -> LogExtra | None return extra or None -def guess_message_log_level(message: str) -> int: +LogLevelInt: TypeAlias = int + + +def guess_message_log_level(message: str) -> LogLevelInt: lower_case_message = message.lower().strip() if lower_case_message.startswith( ("error:", "err:", "error ", "err ", "[error]", "[err]") diff --git a/packages/service-library/tests/test_logging_utils.py b/packages/service-library/tests/test_logging_utils.py index 4bb8715f6430..c8fcf8531dad 100644 --- a/packages/service-library/tests/test_logging_utils.py +++ b/packages/service-library/tests/test_logging_utils.py @@ -5,8 +5,11 @@ import pytest from pytest import LogCaptureFixture -from servicelib.logging_utils import guess_message_log_level, log_decorator +from servicelib.logging_utils import LogLevelInt, guess_message_log_level, log_decorator from servicelib.utils import logged_gather +from simcore_service_dask_sidecar.computational_sidecar.docker_utils import ( + LogMessageStr, +) logger = logging.getLogger(__name__) @@ -70,5 +73,7 @@ def _raising_error() -> None: ("Not a Warn: this is an warning", logging.INFO), ], ) -def test_guess_message_log_level(message: str, expected_log_level: int): +def test_guess_message_log_level( + message: LogMessageStr, expected_log_level: LogLevelInt +): assert guess_message_log_level(message) == expected_log_level diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/core.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/core.py index a308ef120d7b..d0da5d634617 100644 --- a/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/core.py +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/core.py @@ -24,6 +24,7 @@ from packaging import version from pydantic import ValidationError from pydantic.networks import AnyUrl +from servicelib.logging_utils import LogLevelInt from settings_library.s3 import S3Settings from yarl import URL @@ -31,6 +32,7 @@ from ..file_utils import pull_file_from_remote, push_file_to_remote from ..settings import Settings from .docker_utils import ( + LogMessageStr, create_container_config, get_computational_shared_data_mount_point, get_integration_version, @@ -157,7 +159,7 @@ async def _retrieve_output_data( ) from exc async def _publish_sidecar_log( - self, log: str, log_level: int = logging.INFO + self, log: LogMessageStr, log_level: LogLevelInt = logging.INFO ) -> None: publish_event( self.task_publishers.logs, diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/docker_utils.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/docker_utils.py index c6a662c583e5..53b8c7503a84 100644 --- a/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/docker_utils.py +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/docker_utils.py @@ -14,6 +14,7 @@ Awaitable, Callable, Coroutine, + TypeAlias, cast, ) @@ -30,7 +31,12 @@ from pydantic import ByteSize from pydantic.networks import AnyUrl from servicelib.docker_utils import to_datetime -from servicelib.logging_utils import guess_message_log_level, log_catch, log_context +from servicelib.logging_utils import ( + LogLevelInt, + guess_message_log_level, + log_catch, + log_context, +) from settings_library.s3 import S3Settings from ..dask_utils import LogType, create_dask_worker_logger, publish_task_logs @@ -156,7 +162,12 @@ def _guess_progress_value(progress_match: re.Match[str]) -> float: return value -async def _parse_line(line: str) -> tuple[LogType, datetime.datetime, str, int]: +LogMessageStr: TypeAlias = str + + +async def _parse_line( + line: str, +) -> tuple[LogType, datetime.datetime, LogMessageStr, LogLevelInt]: match = re.search(DOCKER_LOG_REGEXP, line) if not match: # try to correct the log, it might be coming from an old comp service that does not put timestamps @@ -193,8 +204,8 @@ async def _publish_container_logs( progress_pub: Pub, logs_pub: Pub, log_type: LogType, - message: str, - log_level: int, + message: LogMessageStr, + log_level: LogLevelInt, ) -> None: return publish_task_logs( progress_pub, diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/dask_utils.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/dask_utils.py index abc0385904b0..27cf4ee1c5bf 100644 --- a/services/dask-sidecar/src/simcore_service_dask_sidecar/dask_utils.py +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/dask_utils.py @@ -15,6 +15,10 @@ from dask_task_models_library.container_tasks.io import TaskCancelEventName from distributed.worker import get_worker from distributed.worker_state_machine import TaskState +from servicelib.logging_utils import LogLevelInt +from simcore_service_dask_sidecar.computational_sidecar.docker_utils import ( + LogMessageStr, +) def create_dask_worker_logger(name: str) -> logging.Logger: @@ -143,8 +147,8 @@ def publish_task_logs( logs_pub: distributed.Pub, log_type: LogType, message_prefix: str, - message: str, - log_level: int, + message: LogMessageStr, + log_level: LogLevelInt, ) -> None: logger.info("[%s - %s]: %s", message_prefix, log_type.name, message) if log_type == LogType.PROGRESS: diff --git a/services/dask-sidecar/tests/unit/test_docker_utils.py b/services/dask-sidecar/tests/unit/test_docker_utils.py index db116d93b6b2..1a11abce1021 100644 --- a/services/dask-sidecar/tests/unit/test_docker_utils.py +++ b/services/dask-sidecar/tests/unit/test_docker_utils.py @@ -13,7 +13,9 @@ import pytest from models_library.services_resources import BootMode from pytest_mock.plugin import MockerFixture +from servicelib.logging_utils import LogLevelInt from simcore_service_dask_sidecar.computational_sidecar.docker_utils import ( + LogMessageStr, LogType, _parse_line, create_container_config, @@ -167,8 +169,8 @@ async def test_parse_line( version1_logs: bool, log_line: str, expected_log_type: LogType, - expected_message: str, - expected_log_level: int, + expected_message: LogMessageStr, + expected_log_level: LogLevelInt, ): expected_time_stamp = arrow.utcnow().datetime if version1_logs: diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_compose_utils.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_compose_utils.py index 5b333e6de384..227b7001f9ec 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_compose_utils.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_compose_utils.py @@ -11,7 +11,11 @@ from fastapi import FastAPI from models_library.rabbitmq_messages import ProgressType from servicelib.async_utils import run_sequentially_in_context +from servicelib.logging_utils import LogLevelInt from settings_library.basic_types import LogLevel +from simcore_service_dask_sidecar.computational_sidecar.docker_utils import ( + LogMessageStr, +) from simcore_service_dynamic_sidecar.core.rabbitmq import ( post_progress_message, post_sidecar_log_message, @@ -109,7 +113,7 @@ async def _progress_cb(current: int, total: int) -> None: float(current / (total or 1)), ) - async def _log_cb(msg: str, log_level: int) -> None: + async def _log_cb(msg: LogMessageStr, log_level: LogLevelInt) -> None: await post_sidecar_log_message(app, msg, log_level=log_level) await pull_images(list_of_images, registry_settings, _progress_cb, _log_cb) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py index 637007bee8ab..79be15228eb9 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py @@ -12,9 +12,12 @@ RabbitMessageBase, ) from pydantic import NonNegativeFloat -from servicelib.logging_utils import log_catch, log_context +from servicelib.logging_utils import LogLevelInt, log_catch, log_context from servicelib.rabbitmq import RabbitMQClient from servicelib.rabbitmq_utils import wait_till_rabbitmq_responsive +from simcore_service_dask_sidecar.computational_sidecar.docker_utils import ( + LogMessageStr, +) from ..core.settings import ApplicationSettings @@ -26,13 +29,15 @@ async def _post_rabbit_message(app: FastAPI, message: RabbitMessageBase) -> None await get_rabbitmq_client(app).publish(message.channel_name, message) -async def post_log_message(app: FastAPI, logs: str, *, log_level: int) -> None: +async def post_log_message( + app: FastAPI, log: LogMessageStr, *, log_level: LogLevelInt +) -> None: app_settings: ApplicationSettings = app.state.settings message = LoggerRabbitMessage( node_id=app_settings.DY_SIDECAR_NODE_ID, user_id=app_settings.DY_SIDECAR_USER_ID, project_id=app_settings.DY_SIDECAR_PROJECT_ID, - messages=[logs], + messages=[log], log_level=log_level, ) @@ -53,8 +58,10 @@ async def post_progress_message( await _post_rabbit_message(app, message) -async def post_sidecar_log_message(app: FastAPI, logs: str, *, log_level: int) -> None: - await post_log_message(app, f"[sidecar] {logs}", log_level=log_level) +async def post_sidecar_log_message( + app: FastAPI, log: LogMessageStr, *, log_level: LogLevelInt +) -> None: + await post_log_message(app, f"[sidecar] {log}", log_level=log_level) async def post_event_reload_iframe(app: FastAPI) -> None: