Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨Dask sidecar: pass job origin and show child computational service logs in workbench #5054

Merged
Merged
1 change: 1 addition & 0 deletions packages/dask-task-models-library/requirements/_base.in
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#
--constraint ../../../requirements/constraints.txt
--requirement ../../../packages/models-library/requirements/_base.in
--requirement ../../../packages/settings-library/requirements/_base.in

dask[distributed]
pydantic[email]
Expand Down
1 change: 1 addition & 0 deletions packages/dask-task-models-library/requirements/ci.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# installs this repo's packages
../pytest-simcore/
../models-library/
../settings-library/

# current module
.
1 change: 1 addition & 0 deletions packages/dask-task-models-library/requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# installs this repo's packages
--editable ../pytest-simcore/
--editable ../models-library/
--editable ../settings-library/

# current module
--editable .
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import logging
from abc import ABC, abstractmethod
from typing import TypeAlias, Union
from typing import Any, ClassVar, TypeAlias

from distributed.worker import get_worker
from pydantic import BaseModel, Extra, validator

from .protocol import TaskOwner


class BaseTaskEvent(BaseModel, ABC):
job_id: str
task_owner: TaskOwner
msg: str | None = None

@staticmethod
Expand All @@ -27,19 +30,42 @@ def topic_name() -> str:
return "task_progress"

@classmethod
def from_dask_worker(cls, progress: float) -> "TaskProgressEvent":
return cls(job_id=get_worker().get_current_task(), progress=progress)
def from_dask_worker(
cls, progress: float, *, task_owner: TaskOwner
) -> "TaskProgressEvent":
worker = get_worker()
job_id = worker.get_current_task()

return cls(
job_id=job_id,
progress=progress,
task_owner=task_owner,
)

class Config(BaseTaskEvent.Config):
schema_extra = {
schema_extra: ClassVar[dict[str, Any]] = {
"examples": [
{
"job_id": "simcore/services/comp/sleeper:1.1.0:projectid_ec7e595a-63ee-46a1-a04a-901b11b649f8:nodeid_39467d89-b659-4914-9359-c40b1b6d1d6d:uuid_5ee5c655-450d-4711-a3ec-32ffe16bc580",
"progress": 0,
"task_owner": {
"user_id": 32,
"project_id": "ec7e595a-63ee-46a1-a04a-901b11b649f8",
"node_id": "39467d89-b659-4914-9359-c40b1b6d1d6d",
"parent_project_id": None,
"parent_node_id": None,
},
sanderegg marked this conversation as resolved.
Show resolved Hide resolved
},
{
"job_id": "simcore/services/comp/sleeper:1.1.0:projectid_ec7e595a-63ee-46a1-a04a-901b11b649f8:nodeid_39467d89-b659-4914-9359-c40b1b6d1d6d:uuid_5ee5c655-450d-4711-a3ec-32ffe16bc580",
"progress": 1.0,
"task_owner": {
"user_id": 32,
"project_id": "ec7e595a-63ee-46a1-a04a-901b11b649f8",
"node_id": "39467d89-b659-4914-9359-c40b1b6d1d6d",
"parent_project_id": "887e595a-63ee-46a1-a04a-901b11b649f8",
"parent_node_id": "aa467d89-b659-4914-9359-c40b1b6d1d6d",
},
},
]
}
Expand All @@ -65,19 +91,32 @@ def topic_name() -> str:
return "task_logs"

@classmethod
def from_dask_worker(cls, log: str, log_level: LogLevelInt) -> "TaskLogEvent":
return cls(job_id=get_worker().get_current_task(), log=log, log_level=log_level)
def from_dask_worker(
cls, log: str, log_level: LogLevelInt, *, task_owner: TaskOwner
) -> "TaskLogEvent":
worker = get_worker()
job_id = worker.get_current_task()
return cls(
job_id=job_id,
log=log,
log_level=log_level,
task_owner=task_owner,
)

class Config(BaseTaskEvent.Config):
schema_extra = {
schema_extra: ClassVar[dict[str, Any]] = {
"examples": [
{
"job_id": "simcore/services/comp/sleeper:1.1.0:projectid_ec7e595a-63ee-46a1-a04a-901b11b649f8:nodeid_39467d89-b659-4914-9359-c40b1b6d1d6d:uuid_5ee5c655-450d-4711-a3ec-32ffe16bc580",
"log": "some logs",
"log_level": logging.INFO,
"task_owner": {
"user_id": 32,
"project_id": "ec7e595a-63ee-46a1-a04a-901b11b649f8",
"node_id": "39467d89-b659-4914-9359-c40b1b6d1d6d",
"parent_project_id": None,
"parent_node_id": None,
},
},
]
}


DaskTaskEvents = type[Union[TaskLogEvent, TaskProgressEvent]]
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
from contextlib import suppress
from pathlib import Path
from typing import Any, Optional, Union, cast
from typing import Any, ClassVar, Union, cast

from models_library.basic_regex import MIME_TYPE_RE
from models_library.generics import DictModel
Expand All @@ -26,7 +26,7 @@ class PortSchema(BaseModel):

class Config:
extra = Extra.forbid
schema_extra: dict[str, Any] = {
schema_extra: ClassVar[dict[str, Any]] = {
"examples": [
{
"required": True,
Expand All @@ -39,11 +39,11 @@ class Config:


class FilePortSchema(PortSchema):
mapping: Optional[str] = None
mapping: str | None = None
url: AnyUrl

class Config(PortSchema.Config):
schema_extra = {
schema_extra: ClassVar[dict[str, Any]] = {
"examples": [
{
"mapping": "some_filename.txt",
Expand All @@ -60,17 +60,17 @@ class Config(PortSchema.Config):

class FileUrl(BaseModel):
url: AnyUrl
file_mapping: Optional[str] = Field(
file_mapping: str | None = Field(
default=None,
description="Local file relpath name (if given), otherwise it takes the url filename",
)
file_mime_type: Optional[str] = Field(
file_mime_type: str | None = Field(
default=None, description="the file MIME type", regex=MIME_TYPE_RE
)

class Config:
extra = Extra.forbid
schema_extra = {
schema_extra: ClassVar[dict[str, Any]] = {
"examples": [
{"url": "https://some_file_url", "file_mime_type": "application/json"},
{
Expand All @@ -97,7 +97,7 @@ class Config:

class TaskInputData(DictModel[PortKey, PortValue]):
class Config(DictModel.Config):
schema_extra = {
schema_extra: ClassVar[dict[str, Any]] = {
"examples": [
{
"boolean_input": False,
Expand All @@ -121,7 +121,7 @@ class TaskOutputDataSchema(DictModel[PortKey, PortSchemaValue]):
# sent as a json-schema instead of with a dynamically-created model class
#
class Config(DictModel.Config):
schema_extra = {
schema_extra: ClassVar[dict[str, Any]] = {
"examples": [
{
"boolean_output": {"required": False},
Expand Down Expand Up @@ -159,29 +159,24 @@ def from_task_output(
for output_key, output_params in schema.items():
if isinstance(output_params, FilePortSchema):
file_relpath = output_params.mapping or output_key
# TODO: file_path is built here, saved truncated in file_mapping and
# then rebuild again int _retrieve_output_data. Review.
file_path = output_folder / file_relpath
if file_path.exists():
data[output_key] = {
"url": f"{output_params.url}",
"file_mapping": file_relpath,
}
elif output_params.required:
raise ValueError(
f"Could not locate '{file_path}' in {output_folder}"
)
else:
if output_key not in data and output_params.required:
raise ValueError(
f"Could not locate '{output_key}' in {output_data_file}"
)
msg = f"Could not locate '{file_path}' in {output_folder}"
raise ValueError(msg)
elif output_key not in data and output_params.required:
msg = f"Could not locate '{output_key}' in {output_data_file}"
raise ValueError(msg)

# NOTE: this cast is necessary to make mypy happy
return cast(TaskOutputData, cls.parse_obj(data))

class Config(DictModel.Config):
schema_extra = {
schema_extra: ClassVar[dict[str, Any]] = {
"examples": [
{
"boolean_output": False,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from typing import Protocol, TypeAlias
from typing import Any, ClassVar, Protocol, TypeAlias

from models_library.basic_types import EnvVarKey
from models_library.docker import DockerLabelKey
from models_library.projects import ProjectID
from models_library.projects_nodes_io import NodeID
from models_library.services_resources import BootMode
from pydantic import AnyUrl
from models_library.users import UserID
from pydantic import AnyUrl, BaseModel, root_validator
from settings_library.s3 import S3Settings

from .docker import DockerBasicAuth
Expand All @@ -17,20 +20,89 @@
ContainerLabelsDict: TypeAlias = dict[DockerLabelKey, str]


class TaskOwner(BaseModel):
user_id: UserID
project_id: ProjectID
node_id: NodeID

parent_project_id: ProjectID | None
parent_node_id: NodeID | None

sanderegg marked this conversation as resolved.
Show resolved Hide resolved
@property
def has_parent(self) -> bool:
return bool(self.parent_node_id and self.parent_project_id)

@root_validator
@classmethod
def check_parent_valid(cls, values: dict[str, Any]) -> dict[str, Any]:
parent_project_id = values.get("parent_project_id")
parent_node_id = values.get("parent_node_id")
if (parent_node_id is None and parent_project_id is not None) or (
parent_node_id is not None and parent_project_id is None
):
msg = "either both parent_node_id and parent_project_id are None or both are set!"
raise ValueError(msg)
return values

class Config:
schema_extra: ClassVar[dict[str, Any]] = {
"examples": [
{
"user_id": 32,
"project_id": "ec7e595a-63ee-46a1-a04a-901b11b649f8",
"node_id": "39467d89-b659-4914-9359-c40b1b6d1d6d",
"parent_project_id": None,
"parent_node_id": None,
},
{
"user_id": 32,
"project_id": "ec7e595a-63ee-46a1-a04a-901b11b649f8",
"node_id": "39467d89-b659-4914-9359-c40b1b6d1d6d",
"parent_project_id": "887e595a-63ee-46a1-a04a-901b11b649f8",
"parent_node_id": "aa467d89-b659-4914-9359-c40b1b6d1d6d",
},
]
}


class ContainerTaskParameters(BaseModel):
image: ContainerImage
tag: ContainerTag
input_data: TaskInputData
output_data_keys: TaskOutputDataSchema
command: ContainerCommands
envs: ContainerEnvsDict
labels: ContainerLabelsDict
boot_mode: BootMode
task_owner: TaskOwner

class Config:
schema_extra: ClassVar[dict[str, Any]] = {
"examples": [
{
"image": "ubuntu",
"tag": "latest",
"input_data": TaskInputData.Config.schema_extra["examples"][0],
"output_data_keys": TaskOutputDataSchema.Config.schema_extra[
"examples"
][0],
"command": ["sleep 10", "echo hello"],
"envs": {"MYENV": "is an env"},
"labels": {"io.simcore.thelabel": "is amazing"},
"boot_mode": BootMode.CPU.value,
"task_owner": TaskOwner.Config.schema_extra["examples"][0],
},
]
}


class ContainerRemoteFct(Protocol):
def __call__( # pylint: disable=too-many-arguments # noqa: PLR0913
def __call__(
self,
*,
task_parameters: ContainerTaskParameters,
docker_auth: DockerBasicAuth,
service_key: ContainerImage,
service_version: ContainerTag,
input_data: TaskInputData,
output_data_keys: TaskOutputDataSchema,
log_file_url: LogFileUploadURL,
command: ContainerCommands,
task_envs: ContainerEnvsDict,
task_labels: ContainerLabelsDict,
s3_settings: S3Settings | None,
boot_mode: BootMode,
) -> TaskOutputData:
...
Loading
Loading