Skip to content

Commit

Permalink
🐛♻️ Bugfix/handle ever running tasks - refactor director-v2 workflow …
Browse files Browse the repository at this point in the history
…scheduler (#2798)

* refactored dask client: use publish/unpublish on dask-scheduler instead of fire_and_forget
* dask-client now returns task status/results
* improved testing to replay issue
* stop raising asyncio.CancelledError from dask-worker as not supported
* cancellation is now handled using distributed.Event
* set up log level when run as a scheduler
* added test for dask compatibility
* upgraded dask to latest
* added blosc/lz4 to dask-distributed such that director-v2 also has the required libraries
  • Loading branch information
sanderegg authored Mar 1, 2022
1 parent 8fd4e45 commit 99d70df
Show file tree
Hide file tree
Showing 48 changed files with 1,610 additions and 911 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@


class TaskValueError(PydanticErrorMixin, ValueError):
pass
code = "task.value_error"


class TaskCancelledError(PydanticErrorMixin, RuntimeError):
code = "task.cancelled_error"
msg_template = "The task was cancelled"


class ServiceRuntimeError(PydanticErrorMixin, RuntimeError):
code = "service_runtime_error"
code = "service.runtime_error"
msg_template = (
"The service {service_key}:{service_version}"
" in container {container_id} failed with code"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Optional, Union
from typing import Optional, Type, Union

from distributed.worker import get_worker
from models_library.projects_state import RunningState
Expand All @@ -19,21 +19,6 @@ class Config:
extra = Extra.forbid


class TaskCancelEvent(BaseTaskEvent):
@staticmethod
def topic_name() -> str:
return "task_cancel"

class Config(BaseTaskEvent.Config):
schema_extra = {
"examples": [
{
"job_id": "simcore/services/comp/sleeper:1.1.0:projectid_ec7e595a-63ee-46a1-a04a-901b11b649f8:nodeid_39467d89-b659-4914-9359-c40b1b6d1d6d:uuid_5ee5c655-450d-4711-a3ec-32ffe16bc580",
}
]
}


class TaskStateEvent(BaseTaskEvent):
state: RunningState

Expand Down Expand Up @@ -111,4 +96,4 @@ class Config(BaseTaskEvent.Config):
}


DaskTaskEvents = Union[TaskLogEvent, TaskProgressEvent, TaskStateEvent]
DaskTaskEvents = Type[Union[TaskLogEvent, TaskProgressEvent, TaskStateEvent]]
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
)
from pydantic.types import constr

TaskCancelEventName = "cancel_event_{}"


class PortSchema(BaseModel):
required: bool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import pytest
from dask_task_models_library.container_tasks.events import (
BaseTaskEvent,
TaskCancelEvent,
TaskLogEvent,
TaskProgressEvent,
TaskStateEvent,
Expand All @@ -23,9 +22,7 @@ def test_task_event_abstract():
BaseTaskEvent(job_id="some_fake") # type: ignore


@pytest.mark.parametrize(
"model_cls", [TaskStateEvent, TaskProgressEvent, TaskLogEvent, TaskCancelEvent]
)
@pytest.mark.parametrize("model_cls", [TaskStateEvent, TaskProgressEvent, TaskLogEvent])
def test_events_models_examples(model_cls):
examples = model_cls.Config.schema_extra["examples"]

Expand Down
19 changes: 19 additions & 0 deletions packages/dask-task-models-library/tests/container_tasks/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Optional

import pytest
from cloudpickle import dumps, loads
from dask_task_models_library.container_tasks.io import (
FilePortSchema,
FileUrl,
Expand Down Expand Up @@ -167,3 +168,21 @@ def test_create_task_output_from_task_does_not_throw_when_there_are_optional_ent
output_file_ext=faker.file_name(),
)
assert len(task_output_data) == 0


@pytest.mark.parametrize(
"model_cls",
(
TaskInputData,
TaskOutputDataSchema,
TaskOutputData,
),
)
def test_objects_are_compatible_with_dask_requirements(model_cls, model_cls_examples):
# NOTE: fcts could also be passed through the same test
for name, example in model_cls_examples.items():
print(name, ":", pformat(example))

model_instance = model_cls.parse_obj(example)
reloaded_instance = loads(dumps(model_instance))
assert reloaded_instance == model_instance
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
def init_app(settings: Optional[AppSettings] = None) -> FastAPI:
if settings is None:
settings = AppSettings.create_from_envs()

assert settings # nosec
logging.basicConfig(level=settings.CATALOG_LOG_LEVEL.value)
logging.root.setLevel(settings.CATALOG_LOG_LEVEL.value)
logger.debug(settings.json(indent=2))
Expand All @@ -50,7 +50,6 @@ def init_app(settings: Optional[AppSettings] = None) -> FastAPI:
)
override_fastapi_openapi_method(app)

logger.debug(settings)
app.state.settings = settings

setup_function_services(app)
Expand Down
4 changes: 4 additions & 0 deletions services/dask-sidecar/docker/boot.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ fi

if [ ${DASK_START_AS_SCHEDULER+x} ]; then
scheduler_version=$(dask-scheduler --version)
mkdir --parents /home/scu/.config/dask
dask_logging=$(printf "logging:\n distributed: %s\n distributed.scheduler: %s" "${LOG_LEVEL:-warning}" "${LOG_LEVEL:-warning}")
echo "$dask_logging" >> /home/scu/.config/dask/distributed.yaml

echo "$INFO" "Starting as dask-scheduler:${scheduler_version}..."
if [ "${SC_BOOT_MODE}" = "debug-ptvsd" ]; then
exec watchmedo auto-restart \
Expand Down
2 changes: 2 additions & 0 deletions services/dask-sidecar/requirements/_base.in
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pydantic[email,dotenv]
blosc
lz4



# Cython implementation of Toolz: A set of utility functions for iterators, functions, and dictionaries.
cytoolz

Expand Down
9 changes: 5 additions & 4 deletions services/dask-sidecar/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ attrs==20.2.0
# jsonschema
bleach==3.3.0
# via nbconvert
blosc==1.10.4
blosc==1.10.6
# via -r requirements/_base.in
bokeh==2.4.2
# via dask
Expand All @@ -59,7 +59,7 @@ cloudpickle==2.0.0
# distributed
cytoolz==0.11.0
# via -r requirements/_base.in
dask==2021.12.0
dask==2022.2.0
# via
# -c requirements/../../../packages/dask-task-models-library/requirements/_base.in
# -r requirements/_base.in
Expand All @@ -69,7 +69,7 @@ dask-gateway==0.9.0
# via -r requirements/_base.in
defusedxml==0.7.1
# via nbconvert
distributed==2021.12.0
distributed==2022.2.0
# via
# dask
# dask-gateway
Expand Down Expand Up @@ -140,7 +140,7 @@ jupyterlab-pygments==0.1.2
# via nbconvert
locket==0.2.1
# via partd
lz4==3.1.3
lz4==4.0.0
# via -r requirements/_base.in
markupsafe==2.0.1
# via jinja2
Expand Down Expand Up @@ -174,6 +174,7 @@ packaging==20.4
# bleach
# bokeh
# dask
# distributed
pandas==1.2.4
# via
# -r requirements/_base.in
Expand Down
4 changes: 4 additions & 0 deletions services/dask-sidecar/requirements/_dask-complete.in
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,7 @@

# SEE https://github.com/dask/dask/blob/main/setup.py#L12 for extra reqs
dask[complete]

# compression
blosc
lz4
13 changes: 11 additions & 2 deletions services/dask-sidecar/requirements/_dask-complete.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
#
# pip-compile --output-file=requirements/_dask-complete.txt --strip-extras requirements/_dask-complete.in
#
blosc==1.10.6
# via
# -c requirements/./_base.txt
# -r requirements/_dask-complete.in
bokeh==2.4.2
# via
# -c requirements/./_base.txt
Expand All @@ -17,12 +21,12 @@ cloudpickle==2.0.0
# -c requirements/./_base.txt
# dask
# distributed
dask==2021.12.0
dask==2022.2.0
# via
# -c requirements/./_base.txt
# -r requirements/_dask-complete.in
# distributed
distributed==2021.12.0
distributed==2022.2.0
# via
# -c requirements/./_base.txt
# dask
Expand All @@ -44,6 +48,10 @@ locket==0.2.1
# via
# -c requirements/./_base.txt
# partd
lz4==4.0.0
# via
# -c requirements/./_base.txt
# -r requirements/_dask-complete.in
markupsafe==2.0.1
# via
# -c requirements/./_base.txt
Expand All @@ -63,6 +71,7 @@ packaging==20.4
# -c requirements/./_base.txt
# bokeh
# dask
# distributed
pandas==1.2.4
# via
# -c requirements/./_base.txt
Expand Down
4 changes: 4 additions & 0 deletions services/dask-sidecar/requirements/_dask-distributed.in
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@

# SEE https://github.com/dask/dask/blob/main/setup.py#L12 for extra reqs
dask[distributed]

# compression
blosc
lz4
13 changes: 11 additions & 2 deletions services/dask-sidecar/requirements/_dask-distributed.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
#
# pip-compile --output-file=requirements/_dask-distributed.txt --strip-extras requirements/_dask-distributed.in
#
blosc==1.10.6
# via
# -c requirements/./_base.txt
# -r requirements/_dask-distributed.in
click==8.0.3
# via
# -c requirements/./_base.txt
Expand All @@ -13,12 +17,12 @@ cloudpickle==2.0.0
# -c requirements/./_base.txt
# dask
# distributed
dask==2021.12.0
dask==2022.2.0
# via
# -c requirements/./_base.txt
# -r requirements/_dask-distributed.in
# distributed
distributed==2021.12.0
distributed==2022.2.0
# via
# -c requirements/./_base.txt
# dask
Expand All @@ -38,6 +42,10 @@ locket==0.2.1
# via
# -c requirements/./_base.txt
# partd
lz4==4.0.0
# via
# -c requirements/./_base.txt
# -r requirements/_dask-distributed.in
markupsafe==2.0.1
# via
# -c requirements/./_base.txt
Expand All @@ -50,6 +58,7 @@ packaging==20.4
# via
# -c requirements/./_base.txt
# dask
# distributed
partd==1.2.0
# via
# -c requirements/./_base.txt
Expand Down
5 changes: 3 additions & 2 deletions services/dask-sidecar/requirements/_packages.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ cloudpickle==2.0.0
# -c requirements/_base.txt
# dask
# distributed
dask==2021.12.0
dask==2022.2.0
# via
# -c requirements/_base.txt
# -r requirements/../../../packages/dask-task-models-library/requirements/_base.in
# distributed
distributed==2021.12.0
distributed==2022.2.0
# via
# -c requirements/_base.txt
# dask
Expand Down Expand Up @@ -77,6 +77,7 @@ packaging==20.4
# via
# -c requirements/_base.txt
# dask
# distributed
partd==1.2.0
# via
# -c requirements/_base.txt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,11 @@ async def _retrieve_output_data(
logger.info("retrieved outputs data:\n%s", output_data.json(indent=1))
return output_data

except ValidationError as exc:
except (ValueError, ValidationError) as exc:
raise ServiceBadFormattedOutputError(
self.service_key, self.service_version
service_key=self.service_key,
service_version=self.service_version,
exc=exc,
) from exc

async def _publish_sidecar_log(self, log: str) -> None:
Expand Down Expand Up @@ -225,11 +227,13 @@ async def run(self, command: List[str]) -> TaskOutputData:
)

raise ServiceRunError(
self.service_key,
self.service_version,
container.id,
container_data["State"]["ExitCode"],
await container.log(stdout=True, stderr=True, tail=20),
service_key=self.service_key,
service_version=self.service_version,
container_id=container.id,
exit_code=container_data["State"]["ExitCode"],
service_logs=await container.log(
stdout=True, stderr=True, tail=20
),
)
await self._publish_sidecar_log("Container ran successfully.")

Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,17 @@
class ComputationalSidecarException(Exception):
"""Basic exception"""
from pydantic.errors import PydanticErrorMixin


class ServiceRunError(ComputationalSidecarException):
"""Error in the runned service"""
class ComputationalSidecarRuntimeError(PydanticErrorMixin, RuntimeError):
...

def __init__(
self,
service_key: str,
service_version: str,
container_id: str,
exit_code: int,
service_logs: str,
) -> None:
super().__init__(
f"The service {service_key}:{service_version} running "
f"in container {container_id} failed with exit code {exit_code}\n"
f"last logs: {service_logs}"
)

class ServiceRunError(ComputationalSidecarRuntimeError):
msg_template = (
"The service {service_key}:{service_version} running"
"in container {container_id} failed with exit code {exit_code}\n"
"last logs: {service_logs}"
)

class ServiceBadFormattedOutputError(ComputationalSidecarException):
"""Badly formatted output generated by the service run"""

def __init__(self, service_key: str, service_version: str) -> None:
super().__init__(
f"The service {service_key}:{service_version} produced badly formatted data"
)
class ServiceBadFormattedOutputError(ComputationalSidecarRuntimeError):
msg_template = "The service {service_key}:{service_version} produced badly formatted data: {exc}"
Loading

0 comments on commit 99d70df

Please sign in to comment.