Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛Api-server wrong start/stop/submit timestamps #4254

Merged
merged 26 commits into from
May 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/dask-task-models-library/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#
# pip-compile --output-file=requirements/_base.txt --resolver=backtracking --strip-extras requirements/_base.in
#
arrow==1.2.3
# via -r requirements/../../../packages/models-library/requirements/_base.in
attrs==23.1.0
# via jsonschema
click==8.1.3
Expand Down Expand Up @@ -58,11 +60,15 @@ pydantic==1.10.7
# -r requirements/_base.in
pyrsistent==0.19.3
# via jsonschema
python-dateutil==2.8.2
# via arrow
pyyaml==6.0
# via
# -c requirements/../../../packages/models-library/requirements/../../../requirements/constraints.txt
# dask
# distributed
six==1.16.0
# via python-dateutil
sortedcontainers==2.4.0
# via distributed
tblib==1.7.0
Expand Down
8 changes: 6 additions & 2 deletions packages/dask-task-models-library/requirements/_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,15 @@ pytest-runner==6.0.0
pytest-sugar==0.9.7
# via -r requirements/_test.in
python-dateutil==2.8.2
# via faker
# via
# -c requirements/_base.txt
# faker
pyyaml==6.0
# via -r requirements/_test.in
six==1.16.0
# via python-dateutil
# via
# -c requirements/_base.txt
# python-dateutil
termcolor==2.3.0
# via pytest-sugar
tomli==2.0.1
Expand Down
1 change: 1 addition & 0 deletions packages/models-library/requirements/_base.in
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
#
--constraint ../../../requirements/constraints.txt

arrow
sanderegg marked this conversation as resolved.
Show resolved Hide resolved
jsonschema
pydantic[email]
6 changes: 6 additions & 0 deletions packages/models-library/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#
# pip-compile --output-file=requirements/_base.txt --resolver=backtracking --strip-extras requirements/_base.in
#
arrow==1.2.3
# via -r requirements/_base.in
attrs==23.1.0
# via jsonschema
dnspython==2.3.0
Expand All @@ -18,5 +20,9 @@ pydantic==1.10.7
# via -r requirements/_base.in
pyrsistent==0.19.3
# via jsonschema
python-dateutil==2.8.2
# via arrow
six==1.16.0
# via python-dateutil
typing-extensions==4.5.0
# via pydantic
8 changes: 6 additions & 2 deletions packages/models-library/requirements/_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,15 @@ pytest-runner==6.0.0
pytest-sugar==0.9.7
# via -r requirements/_test.in
python-dateutil==2.8.2
# via faker
# via
# -c requirements/_base.txt
# faker
pyyaml==6.0
# via -r requirements/_test.in
six==1.16.0
# via python-dateutil
# via
# -c requirements/_base.txt
# python-dateutil
termcolor==2.3.0
# via pytest-sugar
tomli==2.0.1
Expand Down
20 changes: 20 additions & 0 deletions packages/models-library/src/models_library/projects_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import datetime
from uuid import UUID

import arrow
from pydantic import BaseModel, Field, PositiveInt

from .clusters import ClusterID
Expand Down Expand Up @@ -41,6 +43,18 @@ class ComputationTask(BaseModel):
...,
description="the cluster on which the computaional task runs/ran (none if no task ran yet)",
)
started: datetime.datetime | None = Field(
...,
description="the timestamp when the computation was started or None if not started yet",
)
stopped: datetime.datetime | None = Field(
...,
description="the timestamp when the computation was stopped or None if not started nor stopped yet",
)
submitted: datetime.datetime | None = Field(
...,
description="task last modification timestamp or None if the there is no task",
)

class Config:
schema_extra = {
Expand Down Expand Up @@ -73,6 +87,9 @@ class Config:
},
"iteration": None,
"cluster_id": None,
"started": arrow.utcnow().shift(minutes=-50).datetime,
"stopped": None,
"submitted": arrow.utcnow().shift(hours=-1).datetime,
},
{
"id": "f81d7994-9ccc-4c95-8c32-aa70d6bbb1b0",
Expand Down Expand Up @@ -102,6 +119,9 @@ class Config:
},
"iteration": 2,
"cluster_id": 0,
"started": arrow.utcnow().shift(minutes=-50).datetime,
"stopped": arrow.utcnow().shift(minutes=-20).datetime,
"submitted": arrow.utcnow().shift(hours=-1).datetime,
},
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
)
from models_library.clusters import Cluster
from models_library.generics import Envelope
from models_library.projects_pipeline import ComputationTask
from models_library.projects_state import RunningState
from models_library.utils.fastapi_encoders import jsonable_encoder
from pydantic import AnyUrl, ByteSize, parse_obj_as
Expand Down Expand Up @@ -102,46 +103,52 @@ def create_computation_cb(url, **kwargs) -> CallbackResult:
"62237c33-8d6c-4709-aa92-c3cf693dd6d2",
],
}

return CallbackResult(
status=201,
# NOTE: aioresponses uses json.dump which does NOT encode serialization of UUIDs
payload={
"id": str(kwargs["json"]["project_id"]),
returned_computation = ComputationTask.parse_obj(
ComputationTask.Config.schema_extra["examples"][0]
).copy(
update={
"id": f"{kwargs['json']['project_id']}",
"state": state,
"pipeline_details": {
"adjacency_list": pipeline,
"node_states": node_states,
"progress": 0,
},
},
}
)
return CallbackResult(
status=201,
# NOTE: aioresponses uses json.dump which does NOT encode serialization of UUIDs
payload=jsonable_encoder(returned_computation),
)


def get_computation_cb(url, **kwargs) -> CallbackResult:
state = RunningState.NOT_STARTED
pipeline: dict[str, list[str]] = FULL_PROJECT_PIPELINE_ADJACENCY
node_states = FULL_PROJECT_NODE_STATES

return CallbackResult(
status=200,
payload={
returned_computation = ComputationTask.parse_obj(
ComputationTask.Config.schema_extra["examples"][0]
).copy(
update={
"id": Path(url.path).name,
"state": state,
"pipeline_details": {
"adjacency_list": pipeline,
"node_states": node_states,
"progress": 0,
},
"iteration": 2,
"cluster_id": 23,
},
}
)

return CallbackResult(
status=200,
payload=jsonable_encoder(returned_computation),
)


def create_cluster_cb(url, **kwargs) -> CallbackResult:
assert "json" in kwargs, f"missing body in call to {url}"
body = kwargs["json"]
assert url.query.get("user_id")
random_cluster = Cluster.parse_obj(
random.choice(Cluster.Config.schema_extra["examples"])
Expand Down Expand Up @@ -187,6 +194,7 @@ def get_cluster_cb(url, **kwargs) -> CallbackResult:
def get_cluster_details_cb(url, **kwargs) -> CallbackResult:
assert url.query.get("user_id")
cluster_id = url.path.split("/")[-1]
assert cluster_id
return CallbackResult(
status=200,
payload={"scheduler": {}, "cluster": {}, "dashboard_link": "some_faked_link"},
Expand Down Expand Up @@ -259,9 +267,6 @@ async def director_v2_service_mock(
aioresponses_mocker.patch(projects_networks_pattern, status=204, repeat=True)

# clusters
cluster_route_pattern = re.compile(
r"^http://[a-z\-_]*director-v2:[0-9]+/v2/clusters(/[0-9]+)?\?(\w+(?:=\w+)?\&?){1,}$"
)
aioresponses_mocker.post(
re.compile(
r"^http://[a-z\-_]*director-v2:[0-9]+/v2/clusters\?(\w+(?:=\w+)?\&?){1,}$"
Expand Down Expand Up @@ -351,6 +356,7 @@ def get_upload_link_cb(url: URL, **kwargs) -> CallbackResult:
scheme = {LinkType.PRESIGNED: "http", LinkType.S3: "s3"}

if file_size := kwargs["params"].get("file_size") is not None:
assert file_size
upload_schema = FileUploadSchema(
chunk_size=parse_obj_as(ByteSize, "5GiB"),
urls=[parse_obj_as(AnyUrl, f"{scheme[link_type]}://{file_id}")],
Expand Down
6 changes: 6 additions & 0 deletions packages/service-integration/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#
# pip-compile --output-file=requirements/_base.txt --resolver=backtracking --strip-extras requirements/_base.in
#
arrow==1.2.3
# via -r requirements/../../../packages/models-library/requirements/_base.in
attrs==23.1.0
# via jsonschema
certifi==2023.5.7
Expand Down Expand Up @@ -52,6 +54,8 @@ pyrsistent==0.19.3
# via jsonschema
pytest==7.3.1
# via -r requirements/_base.in
python-dateutil==2.8.2
# via arrow
pyyaml==6.0
# via -r requirements/_base.in
requests==2.30.0
Expand All @@ -60,6 +64,8 @@ rich==13.3.5
# via typer
shellingham==1.5.0.post1
# via typer
six==1.16.0
# via python-dateutil
tomli==2.0.1
# via pytest
typer==0.9.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import socket

import aiopg.sa
import sqlalchemy as sa
import tenacity
from aiopg.sa.engine import Engine
from aiopg.sa.result import RowProxy
Expand Down Expand Up @@ -31,7 +32,7 @@ async def _get_node_from_db(
project_id,
)
result = await connection.execute(
comp_tasks.select(
sa.select(comp_tasks).where(
and_(
comp_tasks.c.node_id == node_uuid,
comp_tasks.c.project_id == project_id,
Expand Down
2 changes: 1 addition & 1 deletion services/agent/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ aiosignal==1.2.0
anyio==3.6.2
# via starlette
arrow==1.2.3
# via -r requirements/../../../packages/service-library/requirements/_base.in
# via -r requirements/../../../packages/models-library/requirements/_base.in
async-timeout==4.0.2
# via
# aiohttp
Expand Down
4 changes: 1 addition & 3 deletions services/api-server/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ anyio==3.6.2
# starlette
# watchgod
arrow==1.2.3
# via
# -c requirements/../../../packages/service-library/requirements/./_base.in
# -r requirements/../../../packages/simcore-sdk/requirements/../../../packages/service-library/requirements/_base.in
# via -r requirements/../../../packages/models-library/requirements/_base.in
asgiref==3.5.2
# via uvicorn
async-timeout==4.0.2
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import hashlib
from datetime import datetime
from enum import Enum
from typing import TypeAlias, Union
from uuid import UUID, uuid4

Expand All @@ -15,6 +14,8 @@
validator,
)

from models_library.projects_state import RunningState

from ...models.config import BaseConfig
from ...models.schemas.files import File
from ...models.schemas.solvers import Solver
Expand Down Expand Up @@ -217,19 +218,6 @@ def resource_name(self) -> str:
return self.name


# TODO: these need to be in sync with computational task states
class TaskStates(str, Enum):
UNKNOWN = "UNKNOWN"
PUBLISHED = "PUBLISHED"
NOT_STARTED = "NOT_STARTED"
PENDING = "PENDING"
STARTED = "STARTED"
RETRY = "RETRY"
SUCCESS = "SUCCESS"
FAILED = "FAILED"
ABORTED = "ABORTED"


class PercentageInt(ConstrainedInt):
ge = 0
le = 100
Expand All @@ -241,12 +229,13 @@ class JobStatus(BaseModel):
# SEE https://english.stackexchange.com/questions/12958/status-vs-state

job_id: UUID
state: TaskStates
state: RunningState
sanderegg marked this conversation as resolved.
Show resolved Hide resolved
progress: PercentageInt = Field(default=PercentageInt(0))

# Timestamps on states
# TODO: sync state events and timestamps
submitted_at: datetime
submitted_at: datetime = Field(
..., description="Last modification timestamp of the solver job"
)
started_at: datetime | None = Field(
None,
description="Timestamp that indicate the moment the solver starts execution or None if the event did not occur",
Expand All @@ -262,14 +251,10 @@ class Config(BaseConfig):
schema_extra = {
"example": {
"job_id": "145beae4-a3a8-4fde-adbb-4e8257c2c083",
"state": TaskStates.STARTED,
"state": RunningState.STARTED,
"progress": 3,
"submitted_at": "2021-04-01 07:15:54.631007",
sanderegg marked this conversation as resolved.
Show resolved Hide resolved
"started_at": "2021-04-01 07:16:43.670610",
"stopped_at": None,
}
}

def take_snapshot(self, event: str = "submitted"):
setattr(self, f"{event}_at", datetime.utcnow())
return getattr(self, f"{event}_at")
Loading