Skip to content

Commit

Permalink
@pcrespov review: use a type alias
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg committed May 15, 2023
1 parent 98ad59a commit 09cd55b
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@

from distributed.worker import get_worker
from pydantic import BaseModel, Extra, NonNegativeFloat
from servicelib.logging_utils import LogLevelInt
from simcore_service_dask_sidecar.computational_sidecar.docker_utils import (
LogMessageStr,
)


class BaseTaskEvent(BaseModel, ABC):
Expand Down Expand Up @@ -46,15 +50,15 @@ class Config(BaseTaskEvent.Config):


class TaskLogEvent(BaseTaskEvent):
log: str
log_level: int
log: LogMessageStr
log_level: LogLevelInt

@staticmethod
def topic_name() -> str:
return "task_logs"

@classmethod
def from_dask_worker(cls, log: str, log_level: int) -> "TaskLogEvent":
def from_dask_worker(cls, log: str, log_level: LogLevelInt) -> "TaskLogEvent":
return cls(job_id=get_worker().get_current_task(), log=log, log_level=log_level)

class Config(BaseTaskEvent.Config):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
from models_library.utils.enums import StrAutoEnum
from pydantic import BaseModel, Field
from pydantic.types import NonNegativeFloat
from servicelib.logging_utils import LogLevelInt
from simcore_service_dask_sidecar.computational_sidecar.docker_utils import (
LogMessageStr,
)


class RabbitEventMessageType(str, Enum):
Expand Down Expand Up @@ -47,8 +51,8 @@ class NodeMessageBase(ProjectMessageBase):

class LoggerRabbitMessage(RabbitMessageBase, NodeMessageBase):
channel_name: Literal["simcore.services.logs.v2"] = "simcore.services.logs.v2"
messages: list[str]
log_level: int = logging.INFO
messages: list[LogMessageStr]
log_level: LogLevelInt = logging.INFO

def routing_key(self) -> str:
return f"{self.project_id}.{self.log_level}"
Expand Down
7 changes: 5 additions & 2 deletions packages/service-library/src/servicelib/logging_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from asyncio import iscoroutinefunction
from contextlib import contextmanager
from inspect import getframeinfo, stack
from typing import Callable, TypedDict
from typing import Callable, TypeAlias, TypedDict

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -249,7 +249,10 @@ def get_log_record_extra(*, user_id: int | str | None = None) -> LogExtra | None
return extra or None


def guess_message_log_level(message: str) -> int:
LogLevelInt: TypeAlias = int


def guess_message_log_level(message: str) -> LogLevelInt:
lower_case_message = message.lower().strip()
if lower_case_message.startswith(
("error:", "err:", "error ", "err ", "[error]", "[err]")
Expand Down
9 changes: 7 additions & 2 deletions packages/service-library/tests/test_logging_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@

import pytest
from pytest import LogCaptureFixture
from servicelib.logging_utils import guess_message_log_level, log_decorator
from servicelib.logging_utils import LogLevelInt, guess_message_log_level, log_decorator
from servicelib.utils import logged_gather
from simcore_service_dask_sidecar.computational_sidecar.docker_utils import (
LogMessageStr,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -70,5 +73,7 @@ def _raising_error() -> None:
("Not a Warn: this is an warning", logging.INFO),
],
)
def test_guess_message_log_level(message: str, expected_log_level: int):
def test_guess_message_log_level(
message: LogMessageStr, expected_log_level: LogLevelInt
):
assert guess_message_log_level(message) == expected_log_level
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
from packaging import version
from pydantic import ValidationError
from pydantic.networks import AnyUrl
from servicelib.logging_utils import LogLevelInt
from settings_library.s3 import S3Settings
from yarl import URL

from ..dask_utils import TaskPublisher, create_dask_worker_logger, publish_event
from ..file_utils import pull_file_from_remote, push_file_to_remote
from ..settings import Settings
from .docker_utils import (
LogMessageStr,
create_container_config,
get_computational_shared_data_mount_point,
get_integration_version,
Expand Down Expand Up @@ -157,7 +159,7 @@ async def _retrieve_output_data(
) from exc

async def _publish_sidecar_log(
self, log: str, log_level: int = logging.INFO
self, log: LogMessageStr, log_level: LogLevelInt = logging.INFO
) -> None:
publish_event(
self.task_publishers.logs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
Awaitable,
Callable,
Coroutine,
TypeAlias,
cast,
)

Expand All @@ -30,7 +31,12 @@
from pydantic import ByteSize
from pydantic.networks import AnyUrl
from servicelib.docker_utils import to_datetime
from servicelib.logging_utils import guess_message_log_level, log_catch, log_context
from servicelib.logging_utils import (
LogLevelInt,
guess_message_log_level,
log_catch,
log_context,
)
from settings_library.s3 import S3Settings

from ..dask_utils import LogType, create_dask_worker_logger, publish_task_logs
Expand Down Expand Up @@ -156,7 +162,12 @@ def _guess_progress_value(progress_match: re.Match[str]) -> float:
return value


async def _parse_line(line: str) -> tuple[LogType, datetime.datetime, str, int]:
LogMessageStr: TypeAlias = str


async def _parse_line(
line: str,
) -> tuple[LogType, datetime.datetime, LogMessageStr, LogLevelInt]:
match = re.search(DOCKER_LOG_REGEXP, line)
if not match:
# try to correct the log, it might be coming from an old comp service that does not put timestamps
Expand Down Expand Up @@ -193,8 +204,8 @@ async def _publish_container_logs(
progress_pub: Pub,
logs_pub: Pub,
log_type: LogType,
message: str,
log_level: int,
message: LogMessageStr,
log_level: LogLevelInt,
) -> None:
return publish_task_logs(
progress_pub,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
from dask_task_models_library.container_tasks.io import TaskCancelEventName
from distributed.worker import get_worker
from distributed.worker_state_machine import TaskState
from servicelib.logging_utils import LogLevelInt
from simcore_service_dask_sidecar.computational_sidecar.docker_utils import (
LogMessageStr,
)


def create_dask_worker_logger(name: str) -> logging.Logger:
Expand Down Expand Up @@ -143,8 +147,8 @@ def publish_task_logs(
logs_pub: distributed.Pub,
log_type: LogType,
message_prefix: str,
message: str,
log_level: int,
message: LogMessageStr,
log_level: LogLevelInt,
) -> None:
logger.info("[%s - %s]: %s", message_prefix, log_type.name, message)
if log_type == LogType.PROGRESS:
Expand Down
6 changes: 4 additions & 2 deletions services/dask-sidecar/tests/unit/test_docker_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
import pytest
from models_library.services_resources import BootMode
from pytest_mock.plugin import MockerFixture
from servicelib.logging_utils import LogLevelInt
from simcore_service_dask_sidecar.computational_sidecar.docker_utils import (
LogMessageStr,
LogType,
_parse_line,
create_container_config,
Expand Down Expand Up @@ -167,8 +169,8 @@ async def test_parse_line(
version1_logs: bool,
log_line: str,
expected_log_type: LogType,
expected_message: str,
expected_log_level: int,
expected_message: LogMessageStr,
expected_log_level: LogLevelInt,
):
expected_time_stamp = arrow.utcnow().datetime
if version1_logs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
from fastapi import FastAPI
from models_library.rabbitmq_messages import ProgressType
from servicelib.async_utils import run_sequentially_in_context
from servicelib.logging_utils import LogLevelInt
from settings_library.basic_types import LogLevel
from simcore_service_dask_sidecar.computational_sidecar.docker_utils import (
LogMessageStr,
)
from simcore_service_dynamic_sidecar.core.rabbitmq import (
post_progress_message,
post_sidecar_log_message,
Expand Down Expand Up @@ -109,7 +113,7 @@ async def _progress_cb(current: int, total: int) -> None:
float(current / (total or 1)),
)

async def _log_cb(msg: str, log_level: int) -> None:
async def _log_cb(msg: LogMessageStr, log_level: LogLevelInt) -> None:
await post_sidecar_log_message(app, msg, log_level=log_level)

await pull_images(list_of_images, registry_settings, _progress_cb, _log_cb)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
RabbitMessageBase,
)
from pydantic import NonNegativeFloat
from servicelib.logging_utils import log_catch, log_context
from servicelib.logging_utils import LogLevelInt, log_catch, log_context
from servicelib.rabbitmq import RabbitMQClient
from servicelib.rabbitmq_utils import wait_till_rabbitmq_responsive
from simcore_service_dask_sidecar.computational_sidecar.docker_utils import (
LogMessageStr,
)

from ..core.settings import ApplicationSettings

Expand All @@ -26,13 +29,15 @@ async def _post_rabbit_message(app: FastAPI, message: RabbitMessageBase) -> None
await get_rabbitmq_client(app).publish(message.channel_name, message)


async def post_log_message(app: FastAPI, logs: str, *, log_level: int) -> None:
async def post_log_message(
app: FastAPI, log: LogMessageStr, *, log_level: LogLevelInt
) -> None:
app_settings: ApplicationSettings = app.state.settings
message = LoggerRabbitMessage(
node_id=app_settings.DY_SIDECAR_NODE_ID,
user_id=app_settings.DY_SIDECAR_USER_ID,
project_id=app_settings.DY_SIDECAR_PROJECT_ID,
messages=[logs],
messages=[log],
log_level=log_level,
)

Expand All @@ -53,8 +58,10 @@ async def post_progress_message(
await _post_rabbit_message(app, message)


async def post_sidecar_log_message(app: FastAPI, logs: str, *, log_level: int) -> None:
await post_log_message(app, f"[sidecar] {logs}", log_level=log_level)
async def post_sidecar_log_message(
app: FastAPI, log: LogMessageStr, *, log_level: LogLevelInt
) -> None:
await post_log_message(app, f"[sidecar] {log}", log_level=log_level)


async def post_event_reload_iframe(app: FastAPI) -> None:
Expand Down

0 comments on commit 09cd55b

Please sign in to comment.