From a6cfd3ea5f310f747452c8c1f5139a3104079451 Mon Sep 17 00:00:00 2001 From: Andrei Neagu <5694077+GitHK@users.noreply.github.com> Date: Thu, 1 Jun 2023 10:14:28 +0200 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8Saving=20volume=20stats=20inside=20the?= =?UTF-8?q?=20shared=20store=20volume=20(#4267)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Andrei Neagu --- .../src/models_library/aiodocker_api.py | 13 +-- .../src/models_library/clusters.py | 9 +- .../services/iter_sensitivity.py | 13 ++- .../src/models_library/services.py | 1 + .../src/models_library/sidecar_volumes.py | 55 +++++++++++ .../utils/database_models_factory.py | 13 ++- .../models_library/utils/fastapi_encoders.py | 1 - .../models-library/tests/test_basic_types.py | 1 - packages/models-library/tests/test_emails.py | 1 - packages/models-library/tests/test_errors.py | 7 +- .../tests/test_service_settings_labels.py | 20 ++-- .../models-library/tests/test_services.py | 1 + .../models-library/tests/test_services_io.py | 1 - .../tests/test_sidecar_volumes.py | 18 ++++ .../test_utils_database_models_factory.py | 2 +- .../tests/test_utils_fastapi_encoders.py | 1 - .../tests/test_utils_string_substitution.py | 2 - services/director-v2/openapi.json | 36 ++++---- .../dynamic_sidecar/api_client/_public.py | 13 +++ .../dynamic_sidecar/api_client/_thin.py | 13 +++ .../scheduler/_core/_events_utils.py | 34 +++++++ ...dules_dynamic_sidecar_client_api_public.py | 23 +++++ ...modules_dynamic_sidecar_client_api_thin.py | 26 ++++++ services/dynamic-sidecar/Dockerfile | 2 +- services/dynamic-sidecar/openapi.json | 77 ++++++++++++++++ .../dynamic-sidecar/requirements/_base.in | 1 + .../dynamic-sidecar/requirements/_base.txt | 4 +- .../api/_routing.py | 13 ++- .../api/volumes.py | 28 ++++++ .../models/shared_store.py | 92 +++++++++++++------ .../modules/long_running_tasks.py | 20 ++-- services/dynamic-sidecar/tests/conftest.py | 29 +++--- .../tests/unit/test_api_volumes.py | 61 ++++++++++++ .../tests/unit/test_models_shared_store.py | 80 ++++++++++------ 34 files changed, 561 insertions(+), 150 deletions(-) create mode 100644 packages/models-library/src/models_library/sidecar_volumes.py create mode 100644 packages/models-library/tests/test_sidecar_volumes.py create mode 100644 services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/volumes.py create mode 100644 services/dynamic-sidecar/tests/unit/test_api_volumes.py diff --git a/packages/models-library/src/models_library/aiodocker_api.py b/packages/models-library/src/models_library/aiodocker_api.py index e187419249d..aac732fb5d0 100644 --- a/packages/models-library/src/models_library/aiodocker_api.py +++ b/packages/models-library/src/models_library/aiodocker_api.py @@ -1,5 +1,3 @@ -from typing import Optional - from pydantic import Field, validator from .generated_models.docker_rest_api import ( @@ -13,7 +11,7 @@ class AioDockerContainerSpec(ContainerSpec): - Env: Optional[dict[str, Optional[str]]] = Field( + Env: dict[str, str | None] | None = Field( None, description="aiodocker expects here a dictionary and re-convert it back internally`.\n", ) @@ -35,7 +33,7 @@ def convert_list_to_dict(cls, v): class AioDockerResources1(Resources1): # NOTE: The Docker REST API documentation is wrong!!! # Do not set that back to singular Reservation. - Reservation: Optional[ResourceObject] = Field( + Reservation: ResourceObject | None = Field( None, description="Define resources reservation.", alias="Reservations" ) @@ -44,19 +42,18 @@ class Config(Resources1.Config): class AioDockerTaskSpec(TaskSpec): - ContainerSpec: Optional[AioDockerContainerSpec] = Field( + ContainerSpec: AioDockerContainerSpec | None = Field( None, ) - Resources: Optional[AioDockerResources1] = Field( + Resources: AioDockerResources1 | None = Field( None, description="Resource requirements which apply to each individual container created\nas part of the service.\n", ) class AioDockerServiceSpec(ServiceSpec): - - TaskTemplate: Optional[AioDockerTaskSpec] = None + TaskTemplate: AioDockerTaskSpec | None = None class Config(ServiceSpec.Config): alias_generator = to_snake_case diff --git a/packages/models-library/src/models_library/clusters.py b/packages/models-library/src/models_library/clusters.py index c4387161de0..1a70fe2b51d 100644 --- a/packages/models-library/src/models_library/clusters.py +++ b/packages/models-library/src/models_library/clusters.py @@ -1,4 +1,4 @@ -from typing import Dict, Final, Literal, Optional, Union +from typing import Final, Literal, Union from pydantic import AnyUrl, BaseModel, Extra, Field, HttpUrl, SecretStr, root_validator from pydantic.types import NonNegativeInt @@ -48,6 +48,7 @@ class Config(BaseAuthentication.Config): class KerberosAuthentication(BaseAuthentication): type: Literal["kerberos"] = "kerberos" + # NOTE: the entries here still need to be defined class Config(BaseAuthentication.Config): schema_extra = { @@ -87,10 +88,10 @@ class NoAuthentication(BaseAuthentication): class BaseCluster(BaseModel): name: str = Field(..., description="The human readable name of the cluster") - description: Optional[str] = None + description: str | None = None type: ClusterType owner: GroupID - thumbnail: Optional[HttpUrl] = Field( + thumbnail: HttpUrl | None = Field( None, description="url to the image describing this cluster", examples=["https://placeimg.com/171/96/tech/grayscale/?0.jpg"], @@ -99,7 +100,7 @@ class BaseCluster(BaseModel): authentication: ClusterAuthentication = Field( ..., description="Dask gateway authentication" ) - access_rights: Dict[GroupID, ClusterAccessRights] = Field(default_factory=dict) + access_rights: dict[GroupID, ClusterAccessRights] = Field(default_factory=dict) class Config: extra = Extra.forbid diff --git a/packages/models-library/src/models_library/function_services_catalog/services/iter_sensitivity.py b/packages/models-library/src/models_library/function_services_catalog/services/iter_sensitivity.py index de99306ac60..efeb968fb41 100644 --- a/packages/models-library/src/models_library/function_services_catalog/services/iter_sensitivity.py +++ b/packages/models-library/src/models_library/function_services_catalog/services/iter_sensitivity.py @@ -1,5 +1,5 @@ from copy import deepcopy -from typing import Any, Dict, Iterator, List, Tuple +from typing import Any, Iterator from pydantic import schema_of @@ -8,7 +8,7 @@ from .._key_labels import FUNCTION_SERVICE_KEY_PREFIX from .._utils import EN, OM, FunctionServices, create_fake_thumbnail_url -LIST_NUMBERS_SCHEMA: Dict[str, Any] = schema_of(List[float], title="list[number]") +LIST_NUMBERS_SCHEMA: dict[str, Any] = schema_of(list[float], title="list[number]") META = ServiceDockerData.parse_obj( @@ -66,11 +66,10 @@ def eval_sensitivity( *, - paramrefs: List[float], - paramdiff: List[float], + paramrefs: list[float], + paramdiff: list[float], diff_or_fact: bool, -) -> Iterator[Tuple[int, List[float], List[float]]]: - +) -> Iterator[tuple[int, list[float], list[float]]]: # This code runs in the backend assert len(paramrefs) == len(paramdiff) # nosec @@ -95,7 +94,7 @@ def eval_sensitivity( def _sensitivity_generator( - paramrefs: List[float], paramdiff: List[float], diff_or_fact: bool + paramrefs: list[float], paramdiff: list[float], diff_or_fact: bool ) -> Iterator[OutputsDict]: for i, paramtestplus, paramtestminus in eval_sensitivity( paramrefs=paramrefs, paramdiff=paramdiff, diff_or_fact=diff_or_fact diff --git a/packages/models-library/src/models_library/services.py b/packages/models-library/src/models_library/services.py index 7ca54d3845b..571d2ff60c6 100644 --- a/packages/models-library/src/models_library/services.py +++ b/packages/models-library/src/models_library/services.py @@ -70,6 +70,7 @@ LATEST_INTEGRATION_VERSION = "1.0.0" + # CONSTRAINT TYPES ------------------------------------------- class ServicePortKey(ConstrainedStr): regex = re.compile(PROPERTY_KEY_RE) diff --git a/packages/models-library/src/models_library/sidecar_volumes.py b/packages/models-library/src/models_library/sidecar_volumes.py new file mode 100644 index 00000000000..0e9bcf16c0b --- /dev/null +++ b/packages/models-library/src/models_library/sidecar_volumes.py @@ -0,0 +1,55 @@ +from datetime import datetime +from enum import auto + +import arrow +from pydantic import BaseModel, Field + +from .utils.enums import StrAutoEnum + + +class VolumeCategory(StrAutoEnum): + """ + These uniquely identify volumes which are mounted by + the dynamic-sidecar and user services. + + This is primarily used to keep track of the status of + each individual volume on the volumes. + + The status is ingested by the agent and processed + when the volume is removed. + """ + + # contains data relative to output ports + OUTPUTS = auto() + + # contains data relative to input ports + INPUTS = auto() + + # contains files which represent the state of the service + # usually the user's workspace + STATES = auto() + + # contains dynamic-sidecar data required to maintain state + # between restarts + SHARED_STORE = auto() + + +class VolumeStatus(StrAutoEnum): + """ + Used by the agent to figure out what to do with the data + present on the volume. + """ + + CONTENT_NEEDS_TO_BE_SAVED = auto() + CONTENT_WAS_SAVED = auto() + CONTENT_NO_SAVE_REQUIRED = auto() + + +class VolumeState(BaseModel): + status: VolumeStatus + last_changed: datetime = Field(default_factory=lambda: arrow.utcnow().datetime) + + def __eq__(self, other: object) -> bool: + # only include status for equality last_changed is not important + is_equal: bool = self.status == getattr(other, "status", None) + return is_equal diff --git a/packages/models-library/src/models_library/utils/database_models_factory.py b/packages/models-library/src/models_library/utils/database_models_factory.py index b98395e7139..3cc74cfe2e6 100644 --- a/packages/models-library/src/models_library/utils/database_models_factory.py +++ b/packages/models-library/src/models_library/utils/database_models_factory.py @@ -6,7 +6,7 @@ import json import warnings from datetime import datetime -from typing import Any, Callable, Container, Optional +from typing import Any, Callable, Container from uuid import UUID import sqlalchemy as sa @@ -42,8 +42,8 @@ def _eval_defaults( parsing both the client and the server (if include_server_defaults==True) defaults in the sa model. """ - default: Optional[Any] = None - default_factory: Optional[Callable] = None + default: Any | None = None + default_factory: Callable | None = None if ( column.default is None @@ -104,11 +104,10 @@ def create_pydantic_model_from_sa_table( table: sa.Table, *, config: type = OrmConfig, - exclude: Optional[Container[str]] = None, + exclude: Container[str] | None = None, include_server_defaults: bool = False, - extra_policies: Optional[list[PolicyCallable]] = None, + extra_policies: list[PolicyCallable] | None = None, ) -> type[BaseModel]: - fields = {} exclude = exclude or [] extra_policies = extra_policies or DEFAULT_EXTRA_POLICIES @@ -126,7 +125,7 @@ def create_pydantic_model_from_sa_table( name = f"{table.name.lower()}_{name}" # type --- - pydantic_type: Optional[type] = None + pydantic_type: type | None = None if hasattr(column.type, "impl"): if hasattr(column.type.impl, "python_type"): pydantic_type = column.type.impl.python_type diff --git a/packages/models-library/src/models_library/utils/fastapi_encoders.py b/packages/models-library/src/models_library/utils/fastapi_encoders.py index d42ce159fa9..a85d845e30c 100644 --- a/packages/models-library/src/models_library/utils/fastapi_encoders.py +++ b/packages/models-library/src/models_library/utils/fastapi_encoders.py @@ -10,7 +10,6 @@ from fastapi.encoders import jsonable_encoder except ImportError: # for aiohttp-only services - # Taken 'as is' from https://github.com/tiangolo/fastapi/blob/master/fastapi/encoders.py # to be used in aiohttp-based services w/o having to install fastapi # diff --git a/packages/models-library/tests/test_basic_types.py b/packages/models-library/tests/test_basic_types.py index 5f8317eab73..069a2fbc205 100644 --- a/packages/models-library/tests/test_basic_types.py +++ b/packages/models-library/tests/test_basic_types.py @@ -7,7 +7,6 @@ @pytest.mark.skip(reason="DEV: testing parse_obj_as") def test_parse_uuid_as_a_string(faker: Faker): - expected_uuid = faker.uuid4() got_uuid = parse_obj_as(UUIDStr, expected_uuid) diff --git a/packages/models-library/tests/test_emails.py b/packages/models-library/tests/test_emails.py index 6958dc6ad28..42ae8c84f1f 100644 --- a/packages/models-library/tests/test_emails.py +++ b/packages/models-library/tests/test_emails.py @@ -12,4 +12,3 @@ class Profile(BaseModel): data = Profile(email=email_input) assert data.email == "bla@gmail.com" - diff --git a/packages/models-library/tests/test_errors.py b/packages/models-library/tests/test_errors.py index bbdf47a452c..0a69241ff42 100644 --- a/packages/models-library/tests/test_errors.py +++ b/packages/models-library/tests/test_errors.py @@ -3,15 +3,13 @@ # pylint: disable=unused-variable -from typing import List - import pytest from models_library.errors import ErrorDict from pydantic import BaseModel, ValidationError, conint class B(BaseModel): - y: List[int] + y: list[int] class A(BaseModel): @@ -20,14 +18,13 @@ class A(BaseModel): def test_pydantic_error_dict(): - with pytest.raises(ValidationError) as exc_info: A(x=-1, b={"y": [0, "wrong"]}) assert isinstance(exc_info.value, ValidationError) # demos ValidationError.errors() work - errors: List[ErrorDict] = exc_info.value.errors() + errors: list[ErrorDict] = exc_info.value.errors() assert len(errors) == 2 # checks ErrorDict interface diff --git a/packages/models-library/tests/test_service_settings_labels.py b/packages/models-library/tests/test_service_settings_labels.py index 9e37ce6e286..a91b31bdbc3 100644 --- a/packages/models-library/tests/test_service_settings_labels.py +++ b/packages/models-library/tests/test_service_settings_labels.py @@ -56,9 +56,7 @@ [(x.example, x.items, x.uses_dynamic_sidecar) for x in SIMCORE_SERVICE_EXAMPLES], ids=[x.id for x in SIMCORE_SERVICE_EXAMPLES], ) -def test_simcore_service_labels( - example: dict, items: int, uses_dynamic_sidecar: bool -) -> None: +def test_simcore_service_labels(example: dict, items: int, uses_dynamic_sidecar: bool): simcore_service_labels = SimcoreServiceLabels.parse_obj(example) assert simcore_service_labels @@ -66,7 +64,7 @@ def test_simcore_service_labels( assert simcore_service_labels.needs_dynamic_sidecar == uses_dynamic_sidecar -def test_service_settings() -> None: +def test_service_settings(): simcore_settings_settings_label = SimcoreServiceSettingsLabel.parse_obj( SimcoreServiceSettingLabelEntry.Config.schema_extra["examples"] ) @@ -92,7 +90,7 @@ def test_service_settings() -> None: ) def test_service_settings_model_examples( model_cls: type[BaseModel], model_cls_examples: dict[str, dict[str, Any]] -) -> None: +): for name, example in model_cls_examples.items(): print(name, ":", pformat(example)) model_instance = model_cls(**example) @@ -105,7 +103,7 @@ def test_service_settings_model_examples( ) def test_correctly_detect_dynamic_sidecar_boot( model_cls: type[BaseModel], model_cls_examples: dict[str, dict[str, Any]] -) -> None: +): for name, example in model_cls_examples.items(): print(name, ":", pformat(example)) model_instance = model_cls(**example) @@ -114,7 +112,7 @@ def test_correctly_detect_dynamic_sidecar_boot( ) -def test_raises_error_if_http_entrypoint_is_missing() -> None: +def test_raises_error_if_http_entrypoint_is_missing(): simcore_service_labels: dict[str, Any] = deepcopy( SimcoreServiceLabels.Config.schema_extra["examples"][2] ) @@ -124,21 +122,21 @@ def test_raises_error_if_http_entrypoint_is_missing() -> None: SimcoreServiceLabels(**simcore_service_labels) -def test_path_mappings_none_state_paths() -> None: +def test_path_mappings_none_state_paths(): sample_data = deepcopy(PathMappingsLabel.Config.schema_extra["examples"][0]) sample_data["state_paths"] = None with pytest.raises(ValidationError): PathMappingsLabel(**sample_data) -def test_path_mappings_json_encoding() -> None: +def test_path_mappings_json_encoding(): for example in PathMappingsLabel.Config.schema_extra["examples"]: path_mappings = PathMappingsLabel.parse_obj(example) print(path_mappings) assert PathMappingsLabel.parse_raw(path_mappings.json()) == path_mappings -def test_simcore_services_labels_compose_spec_null_container_http_entry_provided() -> None: +def test_simcore_services_labels_compose_spec_null_container_http_entry_provided(): sample_data = deepcopy(SimcoreServiceLabels.Config.schema_extra["examples"][2]) assert sample_data["simcore.service.container-http-entrypoint"] @@ -147,7 +145,7 @@ def test_simcore_services_labels_compose_spec_null_container_http_entry_provided SimcoreServiceLabels(**sample_data) -def test_raises_error_wrong_restart_policy() -> None: +def test_raises_error_wrong_restart_policy(): simcore_service_labels: dict[str, Any] = deepcopy( SimcoreServiceLabels.Config.schema_extra["examples"][2] ) diff --git a/packages/models-library/tests/test_services.py b/packages/models-library/tests/test_services.py index 2b194440b61..e7052089d54 100644 --- a/packages/models-library/tests/test_services.py +++ b/packages/models-library/tests/test_services.py @@ -198,6 +198,7 @@ def test_same_regex_patterns_in_jsonschema_and_python( ): # read file in json_schema_config = json_schema_dict(json_schema_file_name) + # go to keys def _find_pattern_entry(obj: dict[str, Any], key: str) -> Any: if key in obj: diff --git a/packages/models-library/tests/test_services_io.py b/packages/models-library/tests/test_services_io.py index cdda7808b60..0d9b96e1553 100644 --- a/packages/models-library/tests/test_services_io.py +++ b/packages/models-library/tests/test_services_io.py @@ -32,7 +32,6 @@ def test_service_port_units(project_tests_dir: Path): def test_build_input_ports_from_json_schemas(): - # builds ServiceInput using json-schema port_meta = ServiceInput.from_json_schema( port_schema={ diff --git a/packages/models-library/tests/test_sidecar_volumes.py b/packages/models-library/tests/test_sidecar_volumes.py new file mode 100644 index 00000000000..83046115535 --- /dev/null +++ b/packages/models-library/tests/test_sidecar_volumes.py @@ -0,0 +1,18 @@ +# pylint: disable=redefined-outer-name + +import pytest +from models_library.sidecar_volumes import VolumeState, VolumeStatus +from pytest import FixtureRequest + + +@pytest.fixture(params=VolumeStatus) +def status(request: FixtureRequest) -> VolumeStatus: + return request.param + + +def test_volume_state_equality_does_not_use_last_changed(status: VolumeStatus): + # NOTE: `last_changed` is initialized with the utc datetime + # at the moment of the creation of the object. + assert VolumeState(status=status) == VolumeState(status=status) + schema_property_count = len(VolumeState.schema()["properties"]) + assert len(VolumeState(status=status).dict()) == schema_property_count diff --git a/packages/models-library/tests/test_utils_database_models_factory.py b/packages/models-library/tests/test_utils_database_models_factory.py index 9e7a5a6ed17..39b96ed74b8 100644 --- a/packages/models-library/tests/test_utils_database_models_factory.py +++ b/packages/models-library/tests/test_utils_database_models_factory.py @@ -1,3 +1,4 @@ +# nopycln: file # pylint: disable=redefined-outer-name # pylint: disable=unused-argument # pylint: disable=unused-variable @@ -19,7 +20,6 @@ @pytest.mark.parametrize("table_cls", metadata.tables.values(), ids=lambda t: t.name) def test_table_to_pydantic_models(table_cls): - PydanticOrm = create_pydantic_model_from_sa_table( table=table_cls, include_server_defaults=True ) diff --git a/packages/models-library/tests/test_utils_fastapi_encoders.py b/packages/models-library/tests/test_utils_fastapi_encoders.py index bf0a19bfb47..89b4a891e6b 100644 --- a/packages/models-library/tests/test_utils_fastapi_encoders.py +++ b/packages/models-library/tests/test_utils_fastapi_encoders.py @@ -19,7 +19,6 @@ def servicelib__json_serialization__json_dumps(obj: Any, **kwargs): def test_using_uuids_as_keys(faker: Faker): - uuid_key = uuid4() with pytest.raises(TypeError): diff --git a/packages/models-library/tests/test_utils_string_substitution.py b/packages/models-library/tests/test_utils_string_substitution.py index 7978112cb65..0a4ef51fb28 100644 --- a/packages/models-library/tests/test_utils_string_substitution.py +++ b/packages/models-library/tests/test_utils_string_substitution.py @@ -37,7 +37,6 @@ def test_upgrade_identifiers(legacy: str, expected: str): def test_substitution_with_new_and_legacy_identifiers(): - stringified_config = """ compose_spec: service-one: @@ -162,7 +161,6 @@ def test_substitution_with_new_and_legacy_identifiers(): ids=lambda p: f"{p.parent.name}/{p.name}", ) def test_substitution_against_service_metadata_configs(metadata_path: Path): - meta_str = metadata_path.read_text() meta_str = substitute_all_legacy_identifiers(meta_str) diff --git a/services/director-v2/openapi.json b/services/director-v2/openapi.json index c599be221d3..e9bbcddfd35 100644 --- a/services/director-v2/openapi.json +++ b/services/director-v2/openapi.json @@ -120,7 +120,7 @@ "required": true, "schema": { "title": "Service Key", - "pattern": "^simcore/services/(?P(comp|dynamic|frontend))/(?P[a-z0-9][a-z0-9_.-]*/)*(?P[a-z0-9-_]+[a-z0-9])$", + "pattern": "^simcore/services/((comp|dynamic|frontend))/([a-z0-9][a-z0-9_.-]*/)*([a-z0-9-_]+[a-z0-9])$", "type": "string", "description": "Distinctive name for the node based on the docker registry path" }, @@ -178,7 +178,7 @@ "required": true, "schema": { "title": "Service Key", - "pattern": "^simcore/services/(?P(comp|dynamic|frontend))/(?P[a-z0-9][a-z0-9_.-]*/)*(?P[a-z0-9-_]+[a-z0-9])$", + "pattern": "^simcore/services/((comp|dynamic|frontend))/([a-z0-9][a-z0-9_.-]*/)*([a-z0-9-_]+[a-z0-9])$", "type": "string", "description": "Distinctive name for the node based on the docker registry path" }, @@ -312,7 +312,7 @@ "required": true, "schema": { "title": "Service Key", - "pattern": "^simcore/services/(?P(comp|dynamic|frontend))/(?P[a-z0-9][a-z0-9_.-]*/)*(?P[a-z0-9-_]+[a-z0-9])$", + "pattern": "^simcore/services/((comp|dynamic|frontend))/([a-z0-9][a-z0-9_.-]*/)*([a-z0-9-_]+[a-z0-9])$", "type": "string", "description": "distinctive name for the node based on the docker registry path" }, @@ -1205,7 +1205,7 @@ "required": true, "schema": { "title": "Cluster Id", - "minimum": 0.0, + "minimum": 0, "type": "integer" }, "name": "cluster_id", @@ -1257,7 +1257,7 @@ "required": true, "schema": { "title": "Cluster Id", - "minimum": 0.0, + "minimum": 0, "type": "integer" }, "name": "cluster_id", @@ -1302,7 +1302,7 @@ "required": true, "schema": { "title": "Cluster Id", - "minimum": 0.0, + "minimum": 0, "type": "integer" }, "name": "cluster_id", @@ -1410,7 +1410,7 @@ "required": true, "schema": { "title": "Cluster Id", - "minimum": 0.0, + "minimum": 0, "type": "integer" }, "name": "cluster_id", @@ -1512,7 +1512,7 @@ "required": true, "schema": { "title": "Cluster Id", - "minimum": 0.0, + "minimum": 0, "type": "integer" }, "name": "cluster_id", @@ -2097,7 +2097,7 @@ }, "id": { "title": "Id", - "minimum": 0.0, + "minimum": 0, "type": "integer", "description": "The cluster ID" } @@ -2253,7 +2253,7 @@ }, "cluster_id": { "title": "Cluster Id", - "minimum": 0.0, + "minimum": 0, "type": "integer", "description": "the computation shall use the cluster described by its id, 0 is the default cluster" } @@ -2332,7 +2332,7 @@ }, "cluster_id": { "title": "Cluster Id", - "minimum": 0.0, + "minimum": 0, "type": "integer", "description": "the cluster on which the computaional task runs/ran (none if no task ran yet)" }, @@ -2433,7 +2433,7 @@ "properties": { "service_key": { "title": "Service Key", - "pattern": "^simcore/services/dynamic/(?P[a-z0-9][a-z0-9_.-]*/)*(?P[a-z0-9-_]+[a-z0-9])$", + "pattern": "^simcore/services/dynamic/([a-z0-9][a-z0-9_.-]*/)*([a-z0-9-_]+[a-z0-9])$", "type": "string", "description": "distinctive name for the node based on the docker registry path" }, @@ -2532,7 +2532,7 @@ "properties": { "image": { "title": "Image", - "pattern": "^(?:(?P[a-z0-9-]+(?:\\.[a-z0-9-]+)+(?::\\d+)?|[a-z0-9-]+:\\d+)/)?(?P(?:[a-z0-9][a-z0-9_.-]*/)*[a-z0-9-_]+[a-z0-9])(?::(?P[\\w][\\w.-]{0,127}))?(?P\\@sha256:[a-fA-F0-9]{32,64})?$", + "pattern": "^(?:([a-z0-9-]+(?:\\.[a-z0-9-]+)+(?::\\d+)?|[a-z0-9-]+:\\d+)/)?((?:[a-z0-9][a-z0-9_.-]*/)*[a-z0-9-_]+[a-z0-9])(?::([\\w][\\w.-]{0,127}))?(\\@sha256:[a-fA-F0-9]{32,64})?$", "type": "string", "description": "Used by the frontend to provide a context for the users.Services with a docker-compose spec will have multiple entries.Using the `image:version` instead of the docker-compose spec is more helpful for the end user." }, @@ -2685,7 +2685,7 @@ }, "GPU": { "title": "Gpu", - "minimum": 0.0, + "minimum": 0, "type": "integer", "description": "defines the required (maximum) GPU for running the services" }, @@ -2890,7 +2890,7 @@ "properties": { "service_key": { "title": "Service Key", - "pattern": "^simcore/services/dynamic/(?P[a-z0-9][a-z0-9_.-]*/)*(?P[a-z0-9-_]+[a-z0-9])$", + "pattern": "^simcore/services/dynamic/([a-z0-9][a-z0-9_.-]*/)*([a-z0-9-_]+[a-z0-9])$", "type": "string", "description": "distinctive name for the node based on the docker registry path" }, @@ -3013,7 +3013,7 @@ }, "service_key": { "title": "Service Key", - "pattern": "^simcore/services/(?P(comp|dynamic|frontend))/(?P[a-z0-9][a-z0-9_.-]*/)*(?P[a-z0-9-_]+[a-z0-9])$", + "pattern": "^simcore/services/((comp|dynamic|frontend))/([a-z0-9][a-z0-9_.-]*/)*([a-z0-9-_]+[a-z0-9])$", "type": "string", "description": "distinctive name for the node based on the docker registry path", "example": [ @@ -3208,7 +3208,7 @@ }, "key": { "title": "Key", - "pattern": "^simcore/services/(?P(comp|dynamic|frontend))/(?P[a-z0-9][a-z0-9_.-]*/)*(?P[a-z0-9-_]+[a-z0-9])$", + "pattern": "^simcore/services/((comp|dynamic|frontend))/([a-z0-9][a-z0-9_.-]*/)*([a-z0-9-_]+[a-z0-9])$", "type": "string", "description": "distinctive name for the node based on the docker registry path" }, @@ -3270,7 +3270,7 @@ }, "min-visible-inputs": { "title": "Min-Visible-Inputs", - "minimum": 0.0, + "minimum": 0, "type": "integer", "description": "The number of 'data type inputs' displayed by default in the UI. When None all 'data type inputs' are displayed." } diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_public.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_public.py index 4d35b5a4d3c..275529a4831 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_public.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_public.py @@ -8,6 +8,7 @@ from models_library.projects import ProjectID from models_library.projects_networks import DockerNetworkAlias from models_library.projects_nodes_io import NodeID +from models_library.sidecar_volumes import VolumeCategory, VolumeStatus from pydantic import AnyHttpUrl, PositiveFloat from servicelib.fastapi.long_running_tasks.client import ( Client, @@ -407,6 +408,18 @@ async def restart_containers(self, dynamic_sidecar_endpoint: AnyHttpUrl) -> None _debug_progress_callback, ) + async def update_volume_state( + self, + dynamic_sidecar_endpoint: AnyHttpUrl, + volume_category: VolumeCategory, + volume_status: VolumeStatus, + ) -> None: + await self._thin_client.put_volumes( + dynamic_sidecar_endpoint, + volume_category=volume_category, + volume_status=volume_status, + ) + async def setup(app: FastAPI) -> None: with log_context(logger, logging.DEBUG, "dynamic-sidecar api client setup"): diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_thin.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_thin.py index 364935151dd..1d6a08e4722 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_thin.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_thin.py @@ -4,6 +4,7 @@ from fastapi import FastAPI, status from httpx import Response, Timeout +from models_library.sidecar_volumes import VolumeCategory, VolumeStatus from pydantic import AnyHttpUrl from servicelib.docker_constants import SUFFIX_EGRESS_PROXY_NAME @@ -225,3 +226,15 @@ async def post_containers_tasks_restart( ) -> Response: url = self._get_url(dynamic_sidecar_endpoint, "/containers:restart") return await self.client.post(url) + + @retry_on_errors + @expect_status(status.HTTP_204_NO_CONTENT) + async def put_volumes( + self, + dynamic_sidecar_endpoint: AnyHttpUrl, + volume_category: VolumeCategory, + volume_status: VolumeStatus, + ) -> Response: + url = self._get_url(dynamic_sidecar_endpoint, f"/volumes/{volume_category}") + + return await self.client.put(url, json={"status": volume_status}) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py index 13225739f47..96264ea1d6e 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_events_utils.py @@ -12,6 +12,7 @@ from models_library.rabbitmq_messages import InstrumentationRabbitMessage from models_library.service_settings_labels import SimcoreServiceLabels from models_library.services import ServiceKeyVersion +from models_library.sidecar_volumes import VolumeCategory, VolumeStatus from servicelib.fastapi.long_running_tasks.client import ( ProgressCallback, TaskClientResultError, @@ -141,6 +142,11 @@ async def service_save_state( await dynamic_sidecar_client.save_service_state( scheduler_data.endpoint, progress_callback=progress_callback ) + await dynamic_sidecar_client.update_volume_state( + scheduler_data.endpoint, + volume_category=VolumeCategory.STATES, + volume_status=VolumeStatus.CONTENT_WAS_SAVED, + ) async def service_push_outputs( @@ -153,6 +159,11 @@ async def service_push_outputs( await dynamic_sidecar_client.push_service_output_ports( scheduler_data.endpoint, progress_callback=progress_callback ) + await dynamic_sidecar_client.update_volume_state( + scheduler_data.endpoint, + volume_category=VolumeCategory.OUTPUTS, + volume_status=VolumeStatus.CONTENT_WAS_SAVED, + ) async def service_remove_sidecar_proxy_docker_networks_and_volumes( @@ -415,6 +426,29 @@ async def prepare_services_environment( dynamic_sidecar_client = get_dynamic_sidecar_client(app, scheduler_data.node_uuid) dynamic_sidecar_endpoint = scheduler_data.endpoint + # Before starting, update the volume states. It is not always + # required to save the data from these volumes, eg: when services + # are opened in read only mode. + volume_status: VolumeStatus = ( + VolumeStatus.CONTENT_NEEDS_TO_BE_SAVED + if scheduler_data.dynamic_sidecar.service_removal_state.can_save + else VolumeStatus.CONTENT_NO_SAVE_REQUIRED + ) + await logged_gather( + *( + dynamic_sidecar_client.update_volume_state( + scheduler_data.endpoint, + volume_category=VolumeCategory.STATES, + volume_status=volume_status, + ), + dynamic_sidecar_client.update_volume_state( + scheduler_data.endpoint, + volume_category=VolumeCategory.OUTPUTS, + volume_status=volume_status, + ), + ) + ) + async def _pull_outputs_and_state(): tasks = [ dynamic_sidecar_client.pull_service_output_ports(dynamic_sidecar_endpoint) diff --git a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_public.py b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_public.py index a221c087eb2..2fc7b2aea39 100644 --- a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_public.py +++ b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_public.py @@ -9,6 +9,7 @@ from faker import Faker from fastapi import FastAPI, status from httpx import HTTPError, Response +from models_library.sidecar_volumes import VolumeCategory, VolumeStatus from pydantic import AnyHttpUrl, parse_obj_as from pytest import LogCaptureFixture, MonkeyPatch from pytest_mock import MockerFixture @@ -331,3 +332,25 @@ async def test_detach_container_from_network( ) is None ) + + +@pytest.mark.parametrize("volume_category", VolumeCategory) +@pytest.mark.parametrize("volume_status", VolumeStatus) +async def test_update_volume_state( + get_patched_client: Callable, + dynamic_sidecar_endpoint: AnyHttpUrl, + volume_category: VolumeCategory, + volume_status: VolumeStatus, +) -> None: + with get_patched_client( + "put_volumes", + return_value=Response(status_code=status.HTTP_204_NO_CONTENT), + ) as client: + assert ( + await client.update_volume_state( + dynamic_sidecar_endpoint, + volume_category=volume_category, + volume_status=volume_status, + ) + is None + ) diff --git a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_thin.py b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_thin.py index 38a768b0b70..2622b6ddc04 100644 --- a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_thin.py +++ b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_thin.py @@ -7,6 +7,7 @@ import pytest from fastapi import FastAPI, status from httpx import Response +from models_library.sidecar_volumes import VolumeCategory, VolumeStatus from pydantic import AnyHttpUrl, parse_obj_as from pytest import MonkeyPatch from pytest_simcore.helpers.typing_env import EnvVarsDict @@ -242,6 +243,31 @@ async def test_post_containers_networks_detach( assert_responses(mock_response, response) +@pytest.mark.parametrize("volume_category", VolumeCategory) +@pytest.mark.parametrize("volume_status", VolumeStatus) +async def test_put_volumes( + thin_client: ThinDynamicSidecarClient, + dynamic_sidecar_endpoint: AnyHttpUrl, + mock_request: MockRequestType, + volume_category: str, + volume_status: VolumeStatus, +) -> None: + mock_response = Response(status.HTTP_204_NO_CONTENT) + mock_request( + "PUT", + f"{dynamic_sidecar_endpoint}/{thin_client.API_VERSION}/volumes/{volume_category}", + mock_response, + None, + ) + + response = await thin_client.put_volumes( + dynamic_sidecar_endpoint, + volume_category=volume_category, + volume_status=volume_status, + ) + assert_responses(mock_response, response) + + @pytest.mark.parametrize( "handler_name, mock_endpoint, extra_kwargs", [ diff --git a/services/dynamic-sidecar/Dockerfile b/services/dynamic-sidecar/Dockerfile index 0748594da4e..3238f77e415 100644 --- a/services/dynamic-sidecar/Dockerfile +++ b/services/dynamic-sidecar/Dockerfile @@ -60,7 +60,7 @@ ENV DYNAMIC_SIDECAR_DY_VOLUMES_MOUNT_DIR="/dy-volumes" # create direcotry to persist SharedStore data accessiable # between dynamic-sidecar reboots -ENV DYNAMIC_SIDECAR_SHARED_STORE_DIR="/shared-store" +ENV DYNAMIC_SIDECAR_SHARED_STORE_DIR="${DYNAMIC_SIDECAR_DY_VOLUMES_MOUNT_DIR}/shared-store" RUN mkdir -p "${DYNAMIC_SIDECAR_SHARED_STORE_DIR}" && \ chown -R scu:scu "${DYNAMIC_SIDECAR_SHARED_STORE_DIR}" diff --git a/services/dynamic-sidecar/openapi.json b/services/dynamic-sidecar/openapi.json index 8ed16f4a533..e9b5898588e 100644 --- a/services/dynamic-sidecar/openapi.json +++ b/services/dynamic-sidecar/openapi.json @@ -658,6 +658,50 @@ } } } + }, + "/v1/volumes/{id}": { + "put": { + "tags": [ + "volumes" + ], + "summary": "Updates the state of the volume", + "operationId": "put_volume_state_v1_volumes__id__put", + "parameters": [ + { + "required": true, + "schema": { + "$ref": "#/components/schemas/VolumeCategory" + }, + "name": "id", + "in": "path" + } + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/PutVolumeItem" + } + } + }, + "required": true + }, + "responses": { + "204": { + "description": "Successful Response" + }, + "422": { + "description": "Validation Error", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/HTTPValidationError" + } + } + } + } + } + } } }, "components": { @@ -768,6 +812,18 @@ } } }, + "PutVolumeItem": { + "title": "PutVolumeItem", + "required": [ + "status" + ], + "type": "object", + "properties": { + "status": { + "$ref": "#/components/schemas/VolumeStatus" + } + } + }, "SelectBox": { "title": "SelectBox", "required": [ @@ -927,6 +983,27 @@ } } }, + "VolumeCategory": { + "title": "VolumeCategory", + "enum": [ + "OUTPUTS", + "INPUTS", + "STATES", + "SHARED_STORE" + ], + "type": "string", + "description": "These uniquely identify volumes which are mounted by\nthe dynamic-sidecar and user services.\n\nThis is primarily used to keep track of the status of\neach individual volume on the volumes.\n\nThe status is ingested by the agent and processed\nwhen the volume is removed." + }, + "VolumeStatus": { + "title": "VolumeStatus", + "enum": [ + "CONTENT_NEEDS_TO_BE_SAVED", + "CONTENT_WAS_SAVED", + "CONTENT_NO_SAVE_REQUIRED" + ], + "type": "string", + "description": "Used by the agent to figure out what to do with the data\npresent on the volume." + }, "Widget": { "title": "Widget", "required": [ diff --git a/services/dynamic-sidecar/requirements/_base.in b/services/dynamic-sidecar/requirements/_base.in index 52dee709022..9daf6ac8a58 100644 --- a/services/dynamic-sidecar/requirements/_base.in +++ b/services/dynamic-sidecar/requirements/_base.in @@ -28,6 +28,7 @@ aio-pika aiodocker aiofiles aioprocessing +arrow docker-compose fastapi httpx diff --git a/services/dynamic-sidecar/requirements/_base.txt b/services/dynamic-sidecar/requirements/_base.txt index b6dd35c3a1d..856537659f9 100644 --- a/services/dynamic-sidecar/requirements/_base.txt +++ b/services/dynamic-sidecar/requirements/_base.txt @@ -41,9 +41,7 @@ anyio==3.6.2 # httpcore # starlette arrow==1.2.3 - # via - # -r requirements/../../../packages/models-library/requirements/_base.in - # -r requirements/../../../packages/simcore-sdk/requirements/../../../packages/models-library/requirements/_base.in + # via -r requirements/../../../packages/models-library/requirements/_base.in asgiref==3.5.2 # via uvicorn async-timeout==4.0.2 diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/_routing.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/_routing.py index b731b86a466..9f6d04e676a 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/_routing.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/_routing.py @@ -6,7 +6,13 @@ from fastapi import APIRouter from .._meta import API_VTAG -from . import containers, containers_extension, containers_long_running_tasks, health +from . import ( + containers, + containers_extension, + containers_long_running_tasks, + health, + volumes, +) main_router = APIRouter() main_router.include_router(health.router) @@ -25,5 +31,10 @@ tags=["containers"], prefix=f"/{API_VTAG}", ) +main_router.include_router( + volumes.router, + tags=["volumes"], + prefix=f"/{API_VTAG}", +) __all__: tuple[str, ...] = ("main_router",) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/volumes.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/volumes.py new file mode 100644 index 00000000000..5620f6bc11d --- /dev/null +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/volumes.py @@ -0,0 +1,28 @@ +from fastapi import APIRouter, Depends +from fastapi import Path as PathParam +from fastapi import status +from models_library.sidecar_volumes import VolumeCategory, VolumeState, VolumeStatus +from pydantic import BaseModel + +from ..models.shared_store import SharedStore +from ._dependencies import get_shared_store + +router = APIRouter() + + +class PutVolumeItem(BaseModel): + status: VolumeStatus + + +@router.put( + "/volumes/{id}", + summary="Updates the state of the volume", + status_code=status.HTTP_204_NO_CONTENT, +) +async def put_volume_state( + item: PutVolumeItem, + volume_category: VolumeCategory = PathParam(..., alias="id"), + shared_store: SharedStore = Depends(get_shared_store), +) -> None: + async with shared_store: + shared_store.volume_states[volume_category] = VolumeState(status=item.status) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/models/shared_store.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/models/shared_store.py index b0c8d58f77b..beb52471bb7 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/models/shared_store.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/models/shared_store.py @@ -1,21 +1,57 @@ +from asyncio import Lock from pathlib import Path -from typing import Final, Optional +from typing import Final, TypeAlias import aiofiles from fastapi import FastAPI +from models_library.sidecar_volumes import VolumeCategory, VolumeState, VolumeStatus from pydantic import BaseModel, Field, PrivateAttr from ..core.settings import ApplicationSettings -ContainerNameStr = str +ContainerNameStr: TypeAlias = str STORE_FILE_NAME: Final[str] = "data.json" -class SharedStore(BaseModel): - _shared_store_dir: Path = PrivateAttr() +class _StoreMixin(BaseModel): + _shared_store_dir: Path | None = PrivateAttr() + _persist_lock: Lock = PrivateAttr(default_factory=Lock) - compose_spec: Optional[str] = Field( + async def __aenter__(self) -> None: + await self._persist_lock.acquire() + return None + + async def __aexit__(self, *args) -> None: + await self._persist_to_disk() + self._persist_lock.release() + + async def _persist_to_disk(self) -> None: + assert self._shared_store_dir # nosec + async with aiofiles.open( + self._shared_store_dir / STORE_FILE_NAME, "w" + ) as data_file: + await data_file.write(self.json()) + + def post_init(self, shared_store_dir: Path): + self._shared_store_dir = shared_store_dir + + +class SharedStore(_StoreMixin): + """ + When used as a context manager will persist the state to the disk upon exit. + + NOTE: when updating the contents of the shared store always use a context manger + to avoid concurrency issues. + + Example: + async with shared_store: + copied_list = deepcopy(shared_store.container_names) + copied_list.append("a_container_name") + shared_store.container_names = copied_list + """ + + compose_spec: str | None = Field( default=None, description="stores the stringified compose spec" ) container_names: list[ContainerNameStr] = Field( @@ -23,36 +59,38 @@ class SharedStore(BaseModel): description="stores the container names from the compose_spec", ) - # NOTE: setting up getter and setter does not work. - def set_shared_store_dir(self, shared_store_dir: Path) -> None: - self._shared_store_dir = shared_store_dir + volume_states: dict[VolumeCategory, VolumeState] = Field( + default_factory=dict, description="persist the state of each volume" + ) - async def clear(self): - self.compose_spec = None - self.container_names = [] - await self.persist_to_disk() + async def _setup_initial_volume_states(self) -> None: + async with self: + for category, status in [ + (VolumeCategory.INPUTS, VolumeStatus.CONTENT_NO_SAVE_REQUIRED), + (VolumeCategory.SHARED_STORE, VolumeStatus.CONTENT_NO_SAVE_REQUIRED), + (VolumeCategory.OUTPUTS, VolumeStatus.CONTENT_NEEDS_TO_BE_SAVED), + (VolumeCategory.STATES, VolumeStatus.CONTENT_NEEDS_TO_BE_SAVED), + ]: + self.volume_states[category] = VolumeState(status=status) @classmethod async def init_from_disk(cls, shared_store_dir: Path) -> "SharedStore": data_file_path = shared_store_dir / STORE_FILE_NAME - if data_file_path.exists(): - # if the sidecar is started for a second time (usually the container dies) - # it will load the previous data which was stored - async with aiofiles.open(shared_store_dir / STORE_FILE_NAME) as data_file: - file_content = await data_file.read() - - obj = cls.parse_obj(file_content) - else: + + if not data_file_path.exists(): obj = cls() + obj.post_init(shared_store_dir) + await obj._setup_initial_volume_states() + return obj - obj.set_shared_store_dir(shared_store_dir) - return obj + # if the sidecar is started for a second time (usually the container dies) + # it will load the previous data which was stored + async with aiofiles.open(shared_store_dir / STORE_FILE_NAME) as data_file: + file_content = await data_file.read() - async def persist_to_disk(self) -> None: - async with aiofiles.open( - self._shared_store_dir / STORE_FILE_NAME, "w" - ) as data_file: - await data_file.write(self.json()) + obj = cls.parse_obj(file_content) + obj.post_init(shared_store_dir) + return obj def setup_shared_store(app: FastAPI) -> None: diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py index 766e40ce348..23cbaa5dfd7 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py @@ -126,13 +126,15 @@ async def task_create_service_containers( ) -> list[str]: progress.update(message="validating service spec", percent=0) - shared_store.compose_spec = await validate_compose_spec( - settings=settings, - compose_file_content=containers_create.docker_compose_yaml, - mounted_volumes=mounted_volumes, - ) - shared_store.container_names = assemble_container_names(shared_store.compose_spec) - await shared_store.persist_to_disk() + async with shared_store: + shared_store.compose_spec = await validate_compose_spec( + settings=settings, + compose_file_content=containers_create.docker_compose_yaml, + mounted_volumes=mounted_volumes, + ) + shared_store.container_names = assemble_container_names( + shared_store.compose_spec + ) logger.info("Validated compose-spec:\n%s", f"{shared_store.compose_spec}") @@ -207,7 +209,9 @@ async def task_runs_docker_compose_down( _raise_for_errors(result, "rm") # removing compose-file spec - await shared_store.clear() + async with shared_store: + shared_store.compose_spec = None + shared_store.container_names = [] progress.update(message="done", percent=0.99) diff --git a/services/dynamic-sidecar/tests/conftest.py b/services/dynamic-sidecar/tests/conftest.py index 0ae0fde09fe..62a7274efff 100644 --- a/services/dynamic-sidecar/tests/conftest.py +++ b/services/dynamic-sidecar/tests/conftest.py @@ -123,6 +123,20 @@ def run_id(faker: Faker) -> RunID: return faker.uuid4(cast_to=None) +@pytest.fixture +def ensure_shared_store_dir(shared_store_dir: Path) -> Iterator[Path]: + shared_store_dir.mkdir(parents=True, exist_ok=True) + assert shared_store_dir.exists() is True + + yield shared_store_dir + + # remove files and dir + for f in shared_store_dir.glob("*"): + f.unlink() + shared_store_dir.rmdir() + assert shared_store_dir.exists() is False + + @pytest.fixture def mock_environment( monkeypatch: MonkeyPatch, @@ -137,6 +151,7 @@ def mock_environment( project_id: ProjectID, node_id: NodeID, run_id: RunID, + ensure_shared_store_dir: None, ) -> EnvVarsDict: """Main test environment used to build the application @@ -193,20 +208,6 @@ def mock_environment_with_envdevel( return envs -@pytest.fixture -def ensure_shared_store_dir(shared_store_dir: Path) -> Iterator[Path]: - shared_store_dir.mkdir(parents=True, exist_ok=True) - assert shared_store_dir.exists() is True - - yield shared_store_dir - - # remove files and dir - for f in shared_store_dir.glob("*"): - f.unlink() - shared_store_dir.rmdir() - assert shared_store_dir.exists() is False - - @pytest.fixture() def caplog_info_debug(caplog: LogCaptureFixture) -> Iterable[LogCaptureFixture]: with caplog.at_level(logging.DEBUG): diff --git a/services/dynamic-sidecar/tests/unit/test_api_volumes.py b/services/dynamic-sidecar/tests/unit/test_api_volumes.py new file mode 100644 index 00000000000..fe396d002ad --- /dev/null +++ b/services/dynamic-sidecar/tests/unit/test_api_volumes.py @@ -0,0 +1,61 @@ +# pylint: disable=unused-argument + +from pathlib import Path + +import pytest +from async_asgi_testclient import TestClient +from fastapi import status +from models_library.sidecar_volumes import VolumeCategory, VolumeState, VolumeStatus +from simcore_service_dynamic_sidecar._meta import API_VTAG +from simcore_service_dynamic_sidecar.models.shared_store import SharedStore + + +@pytest.mark.parametrize( + "volume_category, initial_expected_status", + [ + (VolumeCategory.STATES, VolumeStatus.CONTENT_NEEDS_TO_BE_SAVED), + (VolumeCategory.OUTPUTS, VolumeStatus.CONTENT_NEEDS_TO_BE_SAVED), + (VolumeCategory.INPUTS, VolumeStatus.CONTENT_NO_SAVE_REQUIRED), + (VolumeCategory.SHARED_STORE, VolumeStatus.CONTENT_NO_SAVE_REQUIRED), + ], +) +async def test_volumes_state_saved_ok( + ensure_shared_store_dir: Path, + test_client: TestClient, + volume_category: VolumeCategory, + initial_expected_status: VolumeStatus, +): + shared_store: SharedStore = test_client.application.state.shared_store + + # check that initial status is as expected + assert shared_store.volume_states[volume_category] == VolumeState( + status=initial_expected_status + ) + + response = await test_client.put( + f"/{API_VTAG}/volumes/{volume_category}", + json={"status": VolumeStatus.CONTENT_WAS_SAVED}, + ) + assert response.status_code == status.HTTP_204_NO_CONTENT, response.text + + # check that + assert shared_store.volume_states[volume_category] == VolumeState( + status=VolumeStatus.CONTENT_WAS_SAVED + ) + + +@pytest.mark.parametrize("invalid_volume_category", ["outputs", "outputS"]) +async def test_volumes_state_saved_error( + ensure_shared_store_dir: Path, + test_client: TestClient, + invalid_volume_category: VolumeCategory, +): + response = await test_client.put( + f"/{API_VTAG}/volumes/{invalid_volume_category}", + json={"status": VolumeStatus.CONTENT_WAS_SAVED}, + ) + assert response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY, response.text + json_response = response.json() + assert ( + invalid_volume_category not in json_response["detail"][0]["ctx"]["enum_values"] + ) diff --git a/services/dynamic-sidecar/tests/unit/test_models_shared_store.py b/services/dynamic-sidecar/tests/unit/test_models_shared_store.py index e17b0e9d9aa..b6ca9b4458a 100644 --- a/services/dynamic-sidecar/tests/unit/test_models_shared_store.py +++ b/services/dynamic-sidecar/tests/unit/test_models_shared_store.py @@ -1,19 +1,20 @@ # pylint: disable=redefined-outer-name # pylint: disable=unused-argument -from dataclasses import dataclass +from copy import deepcopy from pathlib import Path -from typing import Optional +from typing import Any import pytest from async_asgi_testclient import TestClient -from faker import Faker from fastapi import FastAPI +from models_library.sidecar_volumes import VolumeCategory, VolumeState, VolumeStatus +from pydantic import parse_obj_as from pytest_mock.plugin import MockerFixture +from servicelib.utils import logged_gather from simcore_service_dynamic_sidecar.core import application from simcore_service_dynamic_sidecar.models.shared_store import ( STORE_FILE_NAME, - ContainerNameStr, SharedStore, ) @@ -32,52 +33,75 @@ def shared_store(trigger_setup_shutdown_events: None, app: FastAPI) -> SharedSto return app.state.shared_store -@pytest.fixture -def fake_compose_spec(faker: Faker) -> str: - return f"just_a_string_pretending_to_be_a_compose_spec_{faker.uuid4()}" - - # mock docker_compose_down in application @pytest.fixture def mock_docker_compose(mocker: MockerFixture) -> None: mocker.patch.object(application, "docker_compose_down") -@dataclass -class UpdateFields: - compose_spec: Optional[str] - container_names: list[ContainerNameStr] - - @pytest.mark.parametrize( "update_fields", [ - UpdateFields(compose_spec=None, container_names=[]), - UpdateFields(compose_spec="some_random_fake_spec", container_names=[]), - UpdateFields( - compose_spec="some_random_fake_spec", container_names=["a_continaer"] - ), - UpdateFields( - compose_spec="some_random_fake_spec", container_names=["a_ctnr", "b_cont"] - ), + {"compose_spec": None, "container_names": []}, + {"compose_spec": "some_random_fake_spec", "container_names": []}, + {"compose_spec": "some_random_fake_spec", "container_names": ["a_continaer"]}, + { + "compose_spec": "some_random_fake_spec", + "container_names": ["a_ctnr", "b_cont"], + }, + {"volume_states": {}}, + { + "volume_states": { + VolumeCategory.OUTPUTS: parse_obj_as( + VolumeState, {"status": VolumeStatus.CONTENT_NO_SAVE_REQUIRED} + ), + VolumeCategory.INPUTS: parse_obj_as( + VolumeState, {"status": VolumeStatus.CONTENT_NEEDS_TO_BE_SAVED} + ), + VolumeCategory.STATES: parse_obj_as( + VolumeState, {"status": VolumeStatus.CONTENT_WAS_SAVED} + ), + VolumeCategory.SHARED_STORE: parse_obj_as( + VolumeState, {"status": VolumeStatus.CONTENT_NO_SAVE_REQUIRED} + ), + } + }, ], ) async def test_shared_store_updates( mock_docker_compose: None, shared_store: SharedStore, - fake_compose_spec: str, ensure_shared_store_dir: Path, - update_fields: UpdateFields, + update_fields: dict[str, Any], ): # check no file is present on the disk from where the data was created store_file_path = ensure_shared_store_dir / STORE_FILE_NAME + # file already exists since it was initialized, removing it for this test + store_file_path.unlink() assert store_file_path.exists() is False # change some data and trigger a persist - shared_store.compose_spec = update_fields.compose_spec - shared_store.container_names = update_fields.container_names - await shared_store.persist_to_disk() + async with shared_store: + for attr_name, attr_value in update_fields.items(): + setattr(shared_store, attr_name, attr_value) # check the contes of the file should be the same as the shared_store's assert store_file_path.exists() is True assert shared_store == SharedStore.parse_raw(store_file_path.read_text()) + + +async def test_no_concurrency_with_parallel_writes( + mock_docker_compose: None, shared_store: SharedStore, ensure_shared_store_dir: Path +): + PARALLEL_CHANGES: int = 1000 + + async def replace_list_in_shared_store(item: str): + async with shared_store: + new_list = deepcopy(shared_store.container_names) + new_list.append(item) + shared_store.container_names = new_list + + await logged_gather( + *(replace_list_in_shared_store(f"{x}") for x in range(PARALLEL_CHANGES)) + ) + assert len(shared_store.container_names) == PARALLEL_CHANGES