Skip to content

Commit

Permalink
🐛Api-server wrong start/stop/submit timestamps (#4254)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg authored May 22, 2023
1 parent b13a831 commit 3ff6e30
Show file tree
Hide file tree
Showing 42 changed files with 465 additions and 262 deletions.
6 changes: 6 additions & 0 deletions packages/dask-task-models-library/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#
# pip-compile --output-file=requirements/_base.txt --resolver=backtracking --strip-extras requirements/_base.in
#
arrow==1.2.3
# via -r requirements/../../../packages/models-library/requirements/_base.in
attrs==23.1.0
# via jsonschema
click==8.1.3
Expand Down Expand Up @@ -58,11 +60,15 @@ pydantic==1.10.7
# -r requirements/_base.in
pyrsistent==0.19.3
# via jsonschema
python-dateutil==2.8.2
# via arrow
pyyaml==6.0
# via
# -c requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt
# dask
# distributed
six==1.16.0
# via python-dateutil
sortedcontainers==2.4.0
# via distributed
tblib==1.7.0
Expand Down
8 changes: 6 additions & 2 deletions packages/dask-task-models-library/requirements/_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,15 @@ pytest-runner==6.0.0
pytest-sugar==0.9.7
# via -r requirements/_test.in
python-dateutil==2.8.2
# via faker
# via
# -c requirements/_base.txt
# faker
pyyaml==6.0
# via -r requirements/_test.in
six==1.16.0
# via python-dateutil
# via
# -c requirements/_base.txt
# python-dateutil
termcolor==2.3.0
# via pytest-sugar
tomli==2.0.1
Expand Down
1 change: 1 addition & 0 deletions packages/models-library/requirements/_base.in
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
#
--constraint ../../../requirements/constraints.txt

arrow
jsonschema
pydantic[email]
6 changes: 6 additions & 0 deletions packages/models-library/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#
# pip-compile --output-file=requirements/_base.txt --resolver=backtracking --strip-extras requirements/_base.in
#
arrow==1.2.3
# via -r requirements/_base.in
attrs==23.1.0
# via jsonschema
dnspython==2.3.0
Expand All @@ -18,5 +20,9 @@ pydantic==1.10.7
# via -r requirements/_base.in
pyrsistent==0.19.3
# via jsonschema
python-dateutil==2.8.2
# via arrow
six==1.16.0
# via python-dateutil
typing-extensions==4.5.0
# via pydantic
8 changes: 6 additions & 2 deletions packages/models-library/requirements/_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,15 @@ pytest-runner==6.0.0
pytest-sugar==0.9.7
# via -r requirements/_test.in
python-dateutil==2.8.2
# via faker
# via
# -c requirements/_base.txt
# faker
pyyaml==6.0
# via -r requirements/_test.in
six==1.16.0
# via python-dateutil
# via
# -c requirements/_base.txt
# python-dateutil
termcolor==2.3.0
# via pytest-sugar
tomli==2.0.1
Expand Down
20 changes: 20 additions & 0 deletions packages/models-library/src/models_library/projects_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import datetime
from uuid import UUID

import arrow
from pydantic import BaseModel, Field, PositiveInt

from .clusters import ClusterID
Expand Down Expand Up @@ -41,6 +43,18 @@ class ComputationTask(BaseModel):
...,
description="the cluster on which the computaional task runs/ran (none if no task ran yet)",
)
started: datetime.datetime | None = Field(
...,
description="the timestamp when the computation was started or None if not started yet",
)
stopped: datetime.datetime | None = Field(
...,
description="the timestamp when the computation was stopped or None if not started nor stopped yet",
)
submitted: datetime.datetime | None = Field(
...,
description="task last modification timestamp or None if the there is no task",
)

class Config:
schema_extra = {
Expand Down Expand Up @@ -73,6 +87,9 @@ class Config:
},
"iteration": None,
"cluster_id": None,
"started": arrow.utcnow().shift(minutes=-50).datetime,
"stopped": None,
"submitted": arrow.utcnow().shift(hours=-1).datetime,
},
{
"id": "f81d7994-9ccc-4c95-8c32-aa70d6bbb1b0",
Expand Down Expand Up @@ -102,6 +119,9 @@ class Config:
},
"iteration": 2,
"cluster_id": 0,
"started": arrow.utcnow().shift(minutes=-50).datetime,
"stopped": arrow.utcnow().shift(minutes=-20).datetime,
"submitted": arrow.utcnow().shift(hours=-1).datetime,
},
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
)
from models_library.clusters import Cluster
from models_library.generics import Envelope
from models_library.projects_pipeline import ComputationTask
from models_library.projects_state import RunningState
from models_library.utils.fastapi_encoders import jsonable_encoder
from pydantic import AnyUrl, ByteSize, parse_obj_as
Expand Down Expand Up @@ -102,46 +103,52 @@ def create_computation_cb(url, **kwargs) -> CallbackResult:
"62237c33-8d6c-4709-aa92-c3cf693dd6d2",
],
}

return CallbackResult(
status=201,
# NOTE: aioresponses uses json.dump which does NOT encode serialization of UUIDs
payload={
"id": str(kwargs["json"]["project_id"]),
returned_computation = ComputationTask.parse_obj(
ComputationTask.Config.schema_extra["examples"][0]
).copy(
update={
"id": f"{kwargs['json']['project_id']}",
"state": state,
"pipeline_details": {
"adjacency_list": pipeline,
"node_states": node_states,
"progress": 0,
},
},
}
)
return CallbackResult(
status=201,
# NOTE: aioresponses uses json.dump which does NOT encode serialization of UUIDs
payload=jsonable_encoder(returned_computation),
)


def get_computation_cb(url, **kwargs) -> CallbackResult:
state = RunningState.NOT_STARTED
pipeline: dict[str, list[str]] = FULL_PROJECT_PIPELINE_ADJACENCY
node_states = FULL_PROJECT_NODE_STATES

return CallbackResult(
status=200,
payload={
returned_computation = ComputationTask.parse_obj(
ComputationTask.Config.schema_extra["examples"][0]
).copy(
update={
"id": Path(url.path).name,
"state": state,
"pipeline_details": {
"adjacency_list": pipeline,
"node_states": node_states,
"progress": 0,
},
"iteration": 2,
"cluster_id": 23,
},
}
)

return CallbackResult(
status=200,
payload=jsonable_encoder(returned_computation),
)


def create_cluster_cb(url, **kwargs) -> CallbackResult:
assert "json" in kwargs, f"missing body in call to {url}"
body = kwargs["json"]
assert url.query.get("user_id")
random_cluster = Cluster.parse_obj(
random.choice(Cluster.Config.schema_extra["examples"])
Expand Down Expand Up @@ -187,6 +194,7 @@ def get_cluster_cb(url, **kwargs) -> CallbackResult:
def get_cluster_details_cb(url, **kwargs) -> CallbackResult:
assert url.query.get("user_id")
cluster_id = url.path.split("/")[-1]
assert cluster_id
return CallbackResult(
status=200,
payload={"scheduler": {}, "cluster": {}, "dashboard_link": "some_faked_link"},
Expand Down Expand Up @@ -259,9 +267,6 @@ async def director_v2_service_mock(
aioresponses_mocker.patch(projects_networks_pattern, status=204, repeat=True)

# clusters
cluster_route_pattern = re.compile(
r"^http://[a-z\-_]*director-v2:[0-9]+/v2/clusters(/[0-9]+)?\?(\w+(?:=\w+)?\&?){1,}$"
)
aioresponses_mocker.post(
re.compile(
r"^http://[a-z\-_]*director-v2:[0-9]+/v2/clusters\?(\w+(?:=\w+)?\&?){1,}$"
Expand Down Expand Up @@ -351,6 +356,7 @@ def get_upload_link_cb(url: URL, **kwargs) -> CallbackResult:
scheme = {LinkType.PRESIGNED: "http", LinkType.S3: "s3"}

if file_size := kwargs["params"].get("file_size") is not None:
assert file_size
upload_schema = FileUploadSchema(
chunk_size=parse_obj_as(ByteSize, "5GiB"),
urls=[parse_obj_as(AnyUrl, f"{scheme[link_type]}://{file_id}")],
Expand Down
6 changes: 6 additions & 0 deletions packages/service-integration/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#
# pip-compile --output-file=requirements/_base.txt --resolver=backtracking --strip-extras requirements/_base.in
#
arrow==1.2.3
# via -r requirements/../../../packages/models-library/requirements/_base.in
attrs==23.1.0
# via jsonschema
certifi==2023.5.7
Expand Down Expand Up @@ -52,6 +54,8 @@ pyrsistent==0.19.3
# via jsonschema
pytest==7.3.1
# via -r requirements/_base.in
python-dateutil==2.8.2
# via arrow
pyyaml==6.0
# via -r requirements/_base.in
requests==2.30.0
Expand All @@ -60,6 +64,8 @@ rich==13.3.5
# via typer
shellingham==1.5.0.post1
# via typer
six==1.16.0
# via python-dateutil
tomli==2.0.1
# via pytest
typer==0.9.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import socket

import aiopg.sa
import sqlalchemy as sa
import tenacity
from aiopg.sa.engine import Engine
from aiopg.sa.result import RowProxy
Expand Down Expand Up @@ -31,7 +32,7 @@ async def _get_node_from_db(
project_id,
)
result = await connection.execute(
comp_tasks.select(
sa.select(comp_tasks).where(
and_(
comp_tasks.c.node_id == node_uuid,
comp_tasks.c.project_id == project_id,
Expand Down
2 changes: 1 addition & 1 deletion services/agent/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ aiosignal==1.2.0
anyio==3.6.2
# via starlette
arrow==1.2.3
# via -r requirements/../../../packages/service-library/requirements/_base.in
# via -r requirements/../../../packages/models-library/requirements/_base.in
async-timeout==4.0.2
# via
# aiohttp
Expand Down
4 changes: 1 addition & 3 deletions services/api-server/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ anyio==3.6.2
# starlette
# watchgod
arrow==1.2.3
# via
# -c requirements/../../../packages/service-library/requirements/./_base.in
# -r requirements/../../../packages/simcore-sdk/requirements/../../../packages/service-library/requirements/_base.in
# via -r requirements/../../../packages/models-library/requirements/_base.in
asgiref==3.5.2
# via uvicorn
async-timeout==4.0.2
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import hashlib
from datetime import datetime
from enum import Enum
from typing import TypeAlias, Union
from uuid import UUID, uuid4

Expand All @@ -15,6 +14,8 @@
validator,
)

from models_library.projects_state import RunningState

from ...models.config import BaseConfig
from ...models.schemas.files import File
from ...models.schemas.solvers import Solver
Expand Down Expand Up @@ -217,19 +218,6 @@ def resource_name(self) -> str:
return self.name


# TODO: these need to be in sync with computational task states
class TaskStates(str, Enum):
UNKNOWN = "UNKNOWN"
PUBLISHED = "PUBLISHED"
NOT_STARTED = "NOT_STARTED"
PENDING = "PENDING"
STARTED = "STARTED"
RETRY = "RETRY"
SUCCESS = "SUCCESS"
FAILED = "FAILED"
ABORTED = "ABORTED"


class PercentageInt(ConstrainedInt):
ge = 0
le = 100
Expand All @@ -241,12 +229,13 @@ class JobStatus(BaseModel):
# SEE https://english.stackexchange.com/questions/12958/status-vs-state

job_id: UUID
state: TaskStates
state: RunningState
progress: PercentageInt = Field(default=PercentageInt(0))

# Timestamps on states
# TODO: sync state events and timestamps
submitted_at: datetime
submitted_at: datetime = Field(
..., description="Last modification timestamp of the solver job"
)
started_at: datetime | None = Field(
None,
description="Timestamp that indicate the moment the solver starts execution or None if the event did not occur",
Expand All @@ -262,14 +251,10 @@ class Config(BaseConfig):
schema_extra = {
"example": {
"job_id": "145beae4-a3a8-4fde-adbb-4e8257c2c083",
"state": TaskStates.STARTED,
"state": RunningState.STARTED,
"progress": 3,
"submitted_at": "2021-04-01 07:15:54.631007",
"started_at": "2021-04-01 07:16:43.670610",
"stopped_at": None,
}
}

def take_snapshot(self, event: str = "submitted"):
setattr(self, f"{event}_at", datetime.utcnow())
return getattr(self, f"{event}_at")
Loading

0 comments on commit 3ff6e30

Please sign in to comment.