diff --git a/packages/models-library/src/models_library/services_resources.py b/packages/models-library/src/models_library/services_resources.py index 8722d158b7e..ac7514cf659 100644 --- a/packages/models-library/src/models_library/services_resources.py +++ b/packages/models-library/src/models_library/services_resources.py @@ -1,7 +1,9 @@ import logging -from typing import Any, Final, Union +from enum import auto +from typing import Any, Final, Optional, Union from models_library.docker import DockerGenericTag +from models_library.utils.enums import StrAutoEnum from pydantic import ( BaseModel, ByteSize, @@ -58,6 +60,12 @@ class Config: ResourcesDict = dict[ResourceName, ResourceValue] +class BootMode(StrAutoEnum): + CPU = auto() + GPU = auto() + MPI = auto() + + class ImageResources(BaseModel): image: DockerGenericTag = Field( ..., @@ -69,6 +77,10 @@ class ImageResources(BaseModel): ), ) resources: ResourcesDict + boot_modes: list[BootMode] = Field( + default=[BootMode.CPU], + description="describe how a service shall be booted, using CPU, MPI, openMP or GPU", + ) class Config: schema_extra = { @@ -96,13 +108,17 @@ class ServiceResourcesDictHelpers: def create_from_single_service( image: DockerGenericTag, resources: ResourcesDict, + boot_modes: Optional[list[BootMode]] = None, ) -> ServiceResourcesDict: + if boot_modes is None: + boot_modes = [BootMode.CPU] return parse_obj_as( ServiceResourcesDict, { DEFAULT_SINGLE_SERVICE_NAME: { "image": image, "resources": resources, + "boot_modes": boot_modes, } }, ) @@ -127,6 +143,7 @@ class Config: "reservation": parse_obj_as(ByteSize, "2Gib"), }, }, + "boot_modes": [BootMode.CPU], }, }, # service with a compose spec @@ -137,6 +154,7 @@ class Config: "CPU": {"limit": 0.3, "reservation": 0.3}, "RAM": {"limit": 53687091232, "reservation": 53687091232}, }, + "boot_modes": [BootMode.CPU], }, "s4l-core": { "image": "simcore/services/dynamic/s4l-core-dy:3.0.0", @@ -145,6 +163,7 @@ class Config: "RAM": {"limit": 17179869184, "reservation": 536870912}, "VRAM": {"limit": 1, "reservation": 1}, }, + "boot_modes": [BootMode.GPU], }, "sym-server": { "image": "simcore/services/dynamic/sym-server:3.0.0", @@ -155,6 +174,7 @@ class Config: "reservation": parse_obj_as(ByteSize, "2Gib"), }, }, + "boot_modes": [BootMode.CPU], }, }, # compose spec with image outside the platform @@ -168,6 +188,7 @@ class Config: "reservation": parse_obj_as(ByteSize, "2Gib"), }, }, + "boot_modes": [BootMode.CPU], }, "proxy": { "image": "traefik:v2.6.6", @@ -178,6 +199,7 @@ class Config: "reservation": parse_obj_as(ByteSize, "2Gib"), }, }, + "boot_modes": [BootMode.CPU], }, }, ] diff --git a/packages/pytest-simcore/src/pytest_simcore/docker_compose.py b/packages/pytest-simcore/src/pytest_simcore/docker_compose.py index 25535c3b20c..820216a3cab 100644 --- a/packages/pytest-simcore/src/pytest_simcore/docker_compose.py +++ b/packages/pytest-simcore/src/pytest_simcore/docker_compose.py @@ -132,6 +132,7 @@ def env_file_for_testing( @pytest.fixture(scope="module") def simcore_docker_compose( osparc_simcore_root_dir: Path, + osparc_simcore_scripts_dir: Path, env_file_for_testing: Path, temp_folder: Path, ) -> dict[str, Any]: @@ -155,6 +156,7 @@ def simcore_docker_compose( compose_specs = run_docker_compose_config( project_dir=osparc_simcore_root_dir / "services", + scripts_dir=osparc_simcore_scripts_dir, docker_compose_paths=docker_compose_paths, env_file_path=env_file_for_testing, destination_path=temp_folder / "simcore_docker_compose.yml", @@ -205,6 +207,7 @@ def inject_filestash_config_path( @pytest.fixture(scope="module") def ops_docker_compose( osparc_simcore_root_dir: Path, + osparc_simcore_scripts_dir: Path, env_file_for_testing: Path, temp_folder: Path, inject_filestash_config_path: None, @@ -224,6 +227,7 @@ def ops_docker_compose( compose_specs = run_docker_compose_config( project_dir=osparc_simcore_root_dir / "services", + scripts_dir=osparc_simcore_scripts_dir, docker_compose_paths=docker_compose_path, env_file_path=env_file_for_testing, destination_path=temp_folder / "ops_docker_compose.yml", diff --git a/packages/pytest-simcore/src/pytest_simcore/helpers/utils_docker.py b/packages/pytest-simcore/src/pytest_simcore/helpers/utils_docker.py index 26bf5df6851..c985ea258cd 100644 --- a/packages/pytest-simcore/src/pytest_simcore/helpers/utils_docker.py +++ b/packages/pytest-simcore/src/pytest_simcore/helpers/utils_docker.py @@ -98,6 +98,7 @@ def get_service_published_port( def run_docker_compose_config( docker_compose_paths: Union[list[Path], Path], + scripts_dir: Path, project_dir: Path, env_file_path: Path, destination_path: Optional[Path] = None, @@ -131,26 +132,26 @@ def run_docker_compose_config( # SEE https://docs.docker.com/compose/reference/ global_options = [ - "--project-directory", + "-p", str(project_dir), # Specify an alternate working directory ] + # https://docs.docker.com/compose/environment-variables/#using-the---env-file--option + global_options += [ + "-e", + str(env_file_path), # Custom environment variables + ] # Specify an alternate compose files # - When you use multiple Compose files, all paths in the files are relative to the first configuration file specified with -f. # You can use the --project-directory option to override this base path. for docker_compose_path in docker_compose_paths: - global_options += ["--file", os.path.relpath(docker_compose_path, project_dir)] - - # https://docs.docker.com/compose/environment-variables/#using-the---env-file--option - global_options += [ - "--env-file", - str(env_file_path), # Custom environment variables - ] + global_options += [os.path.relpath(docker_compose_path, project_dir)] # SEE https://docs.docker.com/compose/reference/config/ - cmd_options = [] + docker_compose_path = scripts_dir / "docker" / "docker-compose-config.bash" + assert docker_compose_path.exists() - cmd = ["docker-compose"] + global_options + ["config"] + cmd_options + cmd = [f"{docker_compose_path}"] + global_options print(" ".join(cmd)) process = subprocess.run( diff --git a/scripts/docker/docker-compose-config.bash b/scripts/docker/docker-compose-config.bash index 1e8b5d5e261..a9327b18745 100755 --- a/scripts/docker/docker-compose-config.bash +++ b/scripts/docker/docker-compose-config.bash @@ -17,12 +17,16 @@ show_error() { env_file=".env" +project_directory="" # Parse command line arguments -while getopts ":e:" opt; do +while getopts ":e:p:" opt; do case $opt in e) env_file="$OPTARG" ;; + p) + project_directory="$OPTARG" + ;; \?) show_error "Invalid option: -$OPTARG" exit 1 @@ -64,6 +68,10 @@ docker \ compose \ --env-file ${env_file}" + if [ "$project_directory" ]; then + docker_command+=" --project-directory ${project_directory}" + fi + for compose_file_path in "$@" do docker_command+=" --file=${compose_file_path}" @@ -91,6 +99,9 @@ docker-compose \ do docker_command+=" --file=${compose_file_path} " done + if [ "$project_directory" ]; then + docker_command+=" --project-directory ${project_directory}" + fi docker_command+=" \ config \ | sed --regexp-extended 's/cpus: ([0-9\\.]+)/cpus: \"\\1\"/'" diff --git a/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs.py b/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs.py index d9bb3d609ee..8930d3a6789 100644 --- a/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs.py +++ b/services/api-server/src/simcore_service_api_server/api/routes/solvers_jobs.py @@ -145,7 +145,9 @@ async def create_job( # -> director2: ComputationTaskOut = JobStatus # consistency check - task: ComputationTaskGet = await director2_api.create_computation(job.id, user_id) + task: ComputationTaskGet = await director2_api.create_computation( + job.id, user_id, product_name + ) assert task.id == job.id # nosec job_status: JobStatus = create_jobstatus_from_task(task) diff --git a/services/api-server/src/simcore_service_api_server/modules/director_v2.py b/services/api-server/src/simcore_service_api_server/modules/director_v2.py index 636c0a067b9..21426b4be53 100644 --- a/services/api-server/src/simcore_service_api_server/modules/director_v2.py +++ b/services/api-server/src/simcore_service_api_server/modules/director_v2.py @@ -102,7 +102,10 @@ class DirectorV2Api(BaseServiceClientApi): # ServiceUnabalabe: 503 async def create_computation( - self, project_id: UUID, user_id: PositiveInt + self, + project_id: UUID, + user_id: PositiveInt, + product_name: str, ) -> ComputationTaskGet: resp = await self.client.post( "/v2/computations", @@ -110,6 +113,7 @@ async def create_computation( "user_id": user_id, "project_id": str(project_id), "start_pipeline": False, + "product_name": product_name, }, ) diff --git a/services/catalog/src/simcore_service_catalog/api/routes/services_resources.py b/services/catalog/src/simcore_service_catalog/api/routes/services_resources.py index 93b5b7f5762..8f5c1051bf2 100644 --- a/services/catalog/src/simcore_service_catalog/api/routes/services_resources.py +++ b/services/catalog/src/simcore_service_catalog/api/routes/services_resources.py @@ -12,6 +12,7 @@ ) from models_library.services import ServiceKey, ServiceVersion from models_library.services_resources import ( + BootMode, ImageResources, ResourcesDict, ServiceResourcesDict, @@ -41,6 +42,50 @@ logger = logging.getLogger(__name__) SIMCORE_SERVICE_COMPOSE_SPEC_LABEL: Final[str] = "simcore.service.compose-spec" +_DEPRECATED_RESOURCES: Final[list[str]] = ["MPI"] +_BOOT_MODE_TO_RESOURCE_NAME_MAP: Final[dict[str, str]] = {"MPI": "MPI", "GPU": "VRAM"} + + +def _compute_service_available_boot_modes( + settings: list[SimcoreServiceSettingLabelEntry], + service_key: ServiceKey, + service_version: ServiceVersion, +) -> list[BootMode]: + """returns the service boot-modes. + currently this uses the simcore.service.settings labels if available for backwards compatiblity. + if MPI is found, then boot mode is set to MPI, if GPU is found then boot mode is set to GPU, else to CPU. + In the future a dedicated label might be used, to add openMP for example. and to not abuse the resources of a service. + Also these will be used in a project to allow the user to choose among different boot modes + """ + + resource_entries = filter(lambda entry: entry.name.lower() == "resources", settings) + generic_resources = {} + for entry in resource_entries: + if not isinstance(entry.value, dict): + logger.warning( + "resource %s for %s got invalid type", + f"{entry.dict()!r}", + f"{service_key}:{service_version}", + ) + continue + generic_resources |= parse_generic_resource( + entry.value.get("Reservations", {}).get("GenericResources", []), + ) + # currently these are unique boot modes + for mode in BootMode: + if ( + _BOOT_MODE_TO_RESOURCE_NAME_MAP.get(mode.value, mode.value) + in generic_resources + ): + return [mode] + + return [BootMode.CPU] + + +def _remove_deprecated_resources(resources: ResourcesDict) -> ResourcesDict: + for res_name in _DEPRECATED_RESOURCES: + resources.pop(res_name, None) + return resources def _resources_from_settings( @@ -82,7 +127,7 @@ def _resources_from_settings( entry.value.get("Reservations", {}).get("GenericResources", []), ) - return service_resources + return _remove_deprecated_resources(service_resources) async def _get_service_labels( @@ -116,7 +161,7 @@ def _get_service_settings( ) -> list[SimcoreServiceSettingLabelEntry]: service_settings = parse_raw_as( list[SimcoreServiceSettingLabelEntry], - labels.get(SIMCORE_SERVICE_SETTINGS_LABELS, ""), + labels.get(SIMCORE_SERVICE_SETTINGS_LABELS, "[]"), ) logger.debug("received %s", f"{service_settings=}") return service_settings @@ -162,6 +207,10 @@ async def get_service_resources( service_resources = _resources_from_settings( service_settings, default_service_resources, service_key, service_version ) + service_boot_modes = _compute_service_available_boot_modes( + service_settings, service_key, service_version + ) + user_specific_service_specs = await services_repo.get_service_specifications( service_key, service_version, @@ -174,7 +223,7 @@ async def get_service_resources( ) return ServiceResourcesDictHelpers.create_from_single_service( - image_version, service_resources + image_version, service_resources, service_boot_modes ) # compose specifications available, potentially multiple services @@ -199,15 +248,19 @@ async def get_service_resources( ) if not spec_service_labels: - spec_service_resources = default_service_resources + spec_service_resources: ResourcesDict = default_service_resources + service_boot_modes = [BootMode.CPU] else: spec_service_settings = _get_service_settings(spec_service_labels) - spec_service_resources = _resources_from_settings( + spec_service_resources: ResourcesDict = _resources_from_settings( spec_service_settings, default_service_resources, service_key, service_version, ) + service_boot_modes = _compute_service_available_boot_modes( + spec_service_settings, service_key, service_version + ) user_specific_service_specs = ( await services_repo.get_service_specifications( key, @@ -225,6 +278,7 @@ async def get_service_resources( { "image": image, "resources": spec_service_resources, + "boot_modes": service_boot_modes, } ) diff --git a/services/catalog/tests/unit/with_dbs/test_api_routes_services_resources.py b/services/catalog/tests/unit/with_dbs/test_api_routes_services_resources.py index b9bb66c27a0..7e324b10f84 100644 --- a/services/catalog/tests/unit/with_dbs/test_api_routes_services_resources.py +++ b/services/catalog/tests/unit/with_dbs/test_api_routes_services_resources.py @@ -4,6 +4,7 @@ import urllib.parse from copy import deepcopy +from dataclasses import dataclass from random import choice, randint from typing import Any, Callable @@ -12,7 +13,9 @@ import respx from faker import Faker from fastapi import FastAPI +from models_library.docker import DockerGenericTag from models_library.services_resources import ( + BootMode, ResourcesDict, ResourceValue, ServiceResourcesDict, @@ -76,86 +79,104 @@ def _update_copy(dict_data: dict, update: dict) -> dict: return dict_data_copy +@dataclass +class _ServiceResourceParams: + simcore_service_label: dict[str, str] + expected_resources: ResourcesDict + expected_boot_modes: list[BootMode] + + @pytest.mark.parametrize( - "director_labels, expected_resources", + "params", [ pytest.param( - {}, - _DEFAULT_RESOURCES, + _ServiceResourceParams({}, _DEFAULT_RESOURCES, [BootMode.CPU]), id="nothing_defined_returns_default_resources", ), pytest.param( - { - "simcore.service.settings": '[ {"name": "Resources", "type": "Resources", "value": { "Limits": { "NanoCPUs": 4000000000, "MemoryBytes": 17179869184 } } } ]', - }, - _update_copy( - _DEFAULT_RESOURCES, + _ServiceResourceParams( { - "CPU": ResourceValue( - limit=4.0, - reservation=_DEFAULT_RESOURCES["CPU"].reservation, - ), - "RAM": ResourceValue( - limit=ByteSize(17179869184), - reservation=_DEFAULT_RESOURCES["RAM"].reservation, - ), + "simcore.service.settings": '[ {"name": "Resources", "type": "Resources", "value": { "Limits": { "NanoCPUs": 4000000000, "MemoryBytes": 17179869184 } } } ]', }, + _update_copy( + _DEFAULT_RESOURCES, + { + "CPU": ResourceValue( + limit=4.0, + reservation=_DEFAULT_RESOURCES["CPU"].reservation, + ), + "RAM": ResourceValue( + limit=ByteSize(17179869184), + reservation=_DEFAULT_RESOURCES["RAM"].reservation, + ), + }, + ), + [BootMode.CPU], ), id="only_limits_defined_returns_default_reservations", ), pytest.param( - { - "simcore.service.settings": '[ {"name": "constraints", "type": "string", "value": [ "node.platform.os == linux" ]}, {"name": "Resources", "type": "Resources", "value": { "Limits": { "NanoCPUs": 4000000000, "MemoryBytes": 17179869184 }, "Reservations": { "NanoCPUs": 100000000, "MemoryBytes": 536870912, "GenericResources": [ { "DiscreteResourceSpec": { "Kind": "VRAM", "Value": 1 } }, { "NamedResourceSpec": { "Kind": "AIRAM", "Value": "some_string" } } ] } } } ]' - }, - _update_copy( - _DEFAULT_RESOURCES, + _ServiceResourceParams( { - "CPU": ResourceValue(limit=4.0, reservation=0.1), - "RAM": ResourceValue( - limit=ByteSize(17179869184), reservation=ByteSize(536870912) - ), - "VRAM": ResourceValue(limit=1, reservation=1), - "AIRAM": ResourceValue(limit=0, reservation="some_string"), + "simcore.service.settings": '[ {"name": "constraints", "type": "string", "value": [ "node.platform.os == linux" ]}, {"name": "Resources", "type": "Resources", "value": { "Limits": { "NanoCPUs": 4000000000, "MemoryBytes": 17179869184 }, "Reservations": { "NanoCPUs": 100000000, "MemoryBytes": 536870912, "GenericResources": [ { "DiscreteResourceSpec": { "Kind": "VRAM", "Value": 1 } }, { "NamedResourceSpec": { "Kind": "AIRAM", "Value": "some_string" } } ] } } } ]' }, + _update_copy( + _DEFAULT_RESOURCES, + { + "CPU": ResourceValue(limit=4.0, reservation=0.1), + "RAM": ResourceValue( + limit=ByteSize(17179869184), reservation=ByteSize(536870912) + ), + "VRAM": ResourceValue(limit=1, reservation=1), + "AIRAM": ResourceValue(limit=0, reservation="some_string"), + }, + ), + [BootMode.GPU], ), id="everything_rightly_defined", ), pytest.param( - { - "simcore.service.settings": '[ {"name": "Resources", "type": "Resources", "value": { "Reservations": { "NanoCPUs": 100000000, "MemoryBytes": 536870912, "GenericResources": [ ] } } } ]' - }, - _update_copy( - _DEFAULT_RESOURCES, + _ServiceResourceParams( { - "CPU": ResourceValue( - limit=_DEFAULT_RESOURCES["CPU"].limit, - reservation=0.1, - ), - "RAM": ResourceValue( - limit=_DEFAULT_RESOURCES["RAM"].limit, - reservation=ByteSize(536870912), - ), + "simcore.service.settings": '[ {"name": "Resources", "type": "Resources", "value": { "Reservations": { "NanoCPUs": 100000000, "MemoryBytes": 536870912, "GenericResources": [ ] } } } ]' }, + _update_copy( + _DEFAULT_RESOURCES, + { + "CPU": ResourceValue( + limit=_DEFAULT_RESOURCES["CPU"].limit, + reservation=0.1, + ), + "RAM": ResourceValue( + limit=_DEFAULT_RESOURCES["RAM"].limit, + reservation=ByteSize(536870912), + ), + }, + ), + [BootMode.CPU], ), id="no_limits_defined_returns_default_limits", ), pytest.param( - { - "simcore.service.settings": '[ {"name": "Resources", "type": "Resources", "value": { "Reservations": { "NanoCPUs": 10000000000, "MemoryBytes": 53687091232, "GenericResources": [ { "DiscreteResourceSpec": { "Kind": "VRAM", "Value": 1 } } ] } } } ]' - }, - _update_copy( - _DEFAULT_RESOURCES, + _ServiceResourceParams( { - "CPU": ResourceValue( - limit=10.0, - reservation=10.0, - ), - "RAM": ResourceValue( - limit=ByteSize(53687091232), - reservation=ByteSize(53687091232), - ), - "VRAM": ResourceValue(limit=1, reservation=1), + "simcore.service.settings": '[ {"name": "Resources", "type": "Resources", "value": { "Reservations": { "NanoCPUs": 10000000000, "MemoryBytes": 53687091232, "GenericResources": [ { "DiscreteResourceSpec": { "Kind": "VRAM", "Value": 1 } } ] } } } ]' }, + _update_copy( + _DEFAULT_RESOURCES, + { + "CPU": ResourceValue( + limit=10.0, + reservation=10.0, + ), + "RAM": ResourceValue( + limit=ByteSize(53687091232), + reservation=ByteSize(53687091232), + ), + "VRAM": ResourceValue(limit=1, reservation=1), + }, + ), + [BootMode.GPU], ), id="no_limits_with_reservations_above_default_returns_same_as_reservation", ), @@ -165,25 +186,24 @@ async def test_get_service_resources( mock_catalog_background_task, mock_director_service_labels: Route, client: TestClient, - director_labels: dict[str, Any], - expected_resources: ResourcesDict, + params: _ServiceResourceParams, ) -> None: service_key = f"simcore/services/{choice(['comp', 'dynamic'])}/jupyter-math" service_version = f"{randint(0,100)}.{randint(0,100)}.{randint(0,100)}" - mock_director_service_labels.respond(json={"data": director_labels}) + mock_director_service_labels.respond(json={"data": params.simcore_service_label}) url = URL(f"/v0/services/{service_key}/{service_version}/resources") response = client.get(f"{url}") assert response.status_code == 200, f"{response.text}" data = response.json() received_resources: ServiceResourcesDict = parse_obj_as(ServiceResourcesDict, data) - assert type(received_resources) == dict + assert isinstance(received_resources, dict) expected_service_resources = ServiceResourcesDictHelpers.create_from_single_service( - f"{service_key}:{service_version}", - expected_resources, + parse_obj_as(DockerGenericTag, f"{service_key}:{service_version}"), + params.expected_resources, + boot_modes=params.expected_boot_modes, ) - assert type(expected_service_resources) == dict - + assert isinstance(expected_service_resources, dict) assert received_resources == expected_service_resources diff --git a/services/dask-sidecar/docker/boot.sh b/services/dask-sidecar/docker/boot.sh index 95f7374c648..913c944ffd7 100755 --- a/services/dask-sidecar/docker/boot.sh +++ b/services/dask-sidecar/docker/boot.sh @@ -62,7 +62,7 @@ else # CPU: number of CPUs available (= num of processing units - DASK_SIDECAR_NUM_NON_USABLE_CPUS) # GPU: number GPUs available (= num of GPUs if a nvidia-smi can be run inside a docker container) # RAM: amount of RAM available (= CPU/nproc * total virtual memory given by python psutil - DASK_SIDECAR_NON_USABLE_RAM) - # MPI: backwards-compatibility (deprecated if core_number = TARGET_MPI_NODE_CPU_COUNT set to 1) + # VRAM: amount of VRAM available (in bytes) # CPUs num_cpus=$(($(nproc) - ${DASK_SIDECAR_NUM_NON_USABLE_CPUS:-2})) @@ -83,15 +83,10 @@ else # add the GPUs if there are any if [ "$num_gpus" -gt 0 ]; then - resources="$resources,GPU=$num_gpus" + total_vram=$(python -c "from simcore_service_dask_sidecar.utils import video_memory; print(video_memory());") + resources="$resources,GPU=$num_gpus,VRAM=$total_vram" fi - # add the MPI if possible - if [ ${TARGET_MPI_NODE_CPU_COUNT+x} ]; then - if [ "$(nproc)" -eq "${TARGET_MPI_NODE_CPU_COUNT}" ]; then - resources="$resources,MPI=1" - fi - fi # # DASK RESOURCES DEFINITION --------------------------------- END diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/boot_mode.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/boot_mode.py deleted file mode 100644 index 449810c2a52..00000000000 --- a/services/dask-sidecar/src/simcore_service_dask_sidecar/boot_mode.py +++ /dev/null @@ -1,7 +0,0 @@ -from enum import Enum - - -class BootMode(Enum): - CPU = "CPU" - GPU = "GPU" - MPI = "MPI" diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/core.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/core.py index b0b6658e110..b83895c0db6 100644 --- a/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/core.py +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/core.py @@ -6,7 +6,7 @@ from pathlib import Path from pprint import pformat from types import TracebackType -from typing import Any, Coroutine, Dict, List, Optional, Type, cast +from typing import Coroutine, Optional, cast from uuid import uuid4 from aiodocker import Docker @@ -19,13 +19,13 @@ TaskOutputDataSchema, ) from models_library.projects_state import RunningState +from models_library.services_resources import BootMode from packaging import version from pydantic import ValidationError from pydantic.networks import AnyUrl from settings_library.s3 import S3Settings from yarl import URL -from ..boot_mode import BootMode from ..dask_utils import TaskPublisher, create_dask_worker_logger, publish_event from ..file_utils import pull_file_from_remote, push_file_to_remote from ..settings import Settings @@ -54,7 +54,7 @@ class ComputationalSidecar: # pylint: disable=too-many-instance-attributes output_data_keys: TaskOutputDataSchema log_file_url: AnyUrl boot_mode: BootMode - task_max_resources: Dict[str, Any] + task_max_resources: dict[str, float] task_publishers: TaskPublisher s3_settings: Optional[S3Settings] @@ -170,7 +170,7 @@ async def _publish_sidecar_state( TaskStateEvent.from_dask_worker(state=state, msg=msg), ) - async def run(self, command: List[str]) -> TaskOutputData: + async def run(self, command: list[str]) -> TaskOutputData: await self._publish_sidecar_state(RunningState.STARTED) await self._publish_sidecar_log( f"Starting task for {self.service_key}:{self.service_version} on {socket.gethostname()}..." @@ -262,7 +262,7 @@ async def __aenter__(self) -> "ComputationalSidecar": async def __aexit__( self, - exc_type: Optional[Type[BaseException]], + exc_type: Optional[type[BaseException]], exc: Optional[BaseException], tb: Optional[TracebackType], ) -> None: diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/docker_utils.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/docker_utils.py index 049183a80d6..08ab96f3532 100644 --- a/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/docker_utils.py +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/computational_sidecar/docker_utils.py @@ -1,6 +1,7 @@ import asyncio import contextlib import json +import logging import re import socket from pathlib import Path @@ -24,13 +25,14 @@ from aiodocker.volumes import DockerVolume from dask_task_models_library.container_tasks.docker import DockerBasicAuth from distributed.pubsub import Pub +from models_library.services_resources import BootMode from packaging import version from pydantic import ByteSize from pydantic.networks import AnyUrl from servicelib.docker_utils import to_datetime +from servicelib.logging_utils import log_catch, log_context from settings_library.s3 import S3Settings -from ..boot_mode import BootMode from ..dask_utils import LogType, create_dask_worker_logger, publish_task_logs from ..file_utils import push_file_to_remote from ..settings import Settings @@ -94,22 +96,28 @@ async def managed_container( ) -> AsyncIterator[DockerContainer]: container = None try: - logger.debug("Creating container...") - container = await docker_client.containers.create( - config.dict(by_alias=True), name=name - ) - logger.debug("container %s created", container.id) - yield container + with log_context( + logger, logging.DEBUG, msg=f"managing container {name} for {config.image}" + ): + container = await docker_client.containers.create( + config.dict(by_alias=True), name=name + ) + yield container except asyncio.CancelledError: if container: - logger.warning("Stopping container %s", container.id) + logger.warning( + "Cancelling run of container %s, for %s", container.id, config.image + ) raise finally: try: if container: - logger.debug("Removing container %s...", container.id) - await container.delete(remove=True, v=True, force=True) - logger.debug("container removed") + with log_context( + logger, + logging.DEBUG, + msg=f"Removing container {name}:{container.id} for {config.image}", + ): + await container.delete(remove=True, v=True, force=True) logger.info("Completed run of %s", config.image) except DockerError: logger.exception( @@ -123,7 +131,7 @@ async def managed_container( r"^(?P\d+-\d+-\d+T\d+:\d+:\d+\.\d+[^\s]+) (?P.+)$" ) _PROGRESS_REGEXP: re.Pattern[str] = re.compile( - r"\[?progress[\]:]?\s*([0-1]?\.\d+|\d+(%)|\d+\s*(percent)|(\d+\/\d+))" + r"\[?progress\]?:?\s*([0-1]?\.\d+|\d+(%)|\d+\s*(percent)|(\d+\/\d+))" ) DEFAULT_TIME_STAMP = "2000-01-01T00:00:00.000000000Z" @@ -368,7 +376,7 @@ async def monitor_container_logs( Services above are not creating a file and use the usual docker logging. These logs are retrieved using the usual cli 'docker logs CONTAINERID' """ - try: + with log_catch(logger, reraise=False): container_info = await container.show() container_name = container_info.get("Name", "undefined") logger.info( @@ -412,14 +420,6 @@ async def monitor_container_logs( container.id, container_name, ) - except DockerError as exc: - logger.exception( - "log monitoring of [%s:%s - %s] stopped with unexpected error:\n%s", - service_key, - service_version, - container.id, - exc, - ) @contextlib.asynccontextmanager diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/dask_utils.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/dask_utils.py index 8085873de87..4c68ac40eba 100644 --- a/services/dask-sidecar/src/simcore_service_dask_sidecar/dask_utils.py +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/dask_utils.py @@ -3,7 +3,7 @@ import logging from dataclasses import dataclass, field from enum import Enum -from typing import Any, AsyncIterator, Optional, cast +from typing import AsyncIterator, Final, Optional import distributed from dask_task_models_library.container_tasks.errors import TaskCancelledError @@ -17,8 +17,6 @@ from distributed.worker import get_worker from distributed.worker_state_machine import TaskState -from .boot_mode import BootMode - def create_dask_worker_logger(name: str) -> logging.Logger: return logging.getLogger(f"distributed.worker.{name}") @@ -52,21 +50,15 @@ def is_current_task_aborted() -> bool: return False -def get_current_task_boot_mode() -> BootMode: - task: Optional[TaskState] = _get_current_task_state() - if task and task.resource_restrictions: - if task.resource_restrictions.get("MPI", 0) > 0: - return BootMode.MPI - if task.resource_restrictions.get("GPU", 0) > 0: - return BootMode.GPU - return BootMode.CPU +_DEFAULT_MAX_RESOURCES: Final[dict[str, float]] = {"CPU": 1, "RAM": 1024**3} -def get_current_task_resources() -> dict[str, Any]: +def get_current_task_resources() -> dict[str, float]: + current_task_resources = _DEFAULT_MAX_RESOURCES if task := _get_current_task_state(): if task_resources := task.resource_restrictions: - return cast(dict[str, Any], task_resources) - return {} + current_task_resources.update(task_resources) + return current_task_resources @dataclass() diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/settings.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/settings.py index b76a73d8dac..65c32469d40 100644 --- a/services/dask-sidecar/src/simcore_service_dask_sidecar/settings.py +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/settings.py @@ -22,11 +22,6 @@ class Settings(BaseCustomSettings, MixinLoggingSettings): SIDECAR_INTERVAL_TO_CHECK_TASK_ABORTED_S: Optional[int] = 5 - TARGET_MPI_NODE_CPU_COUNT: Optional[int] = Field( - None, - description="If a node has this amount of CPUs it will be a candidate an MPI candidate", - ) - # dask config ---- DASK_START_AS_SCHEDULER: Optional[bool] = Field( diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/tasks.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/tasks.py index fbb689403fb..5c958e852bb 100644 --- a/services/dask-sidecar/src/simcore_service_dask_sidecar/tasks.py +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/tasks.py @@ -13,6 +13,7 @@ TaskOutputDataSchema, ) from distributed.worker import logger +from models_library.services_resources import BootMode from pydantic.networks import AnyUrl from settings_library.s3 import S3Settings @@ -20,7 +21,6 @@ from .dask_utils import ( TaskPublisher, create_dask_worker_logger, - get_current_task_boot_mode, get_current_task_resources, monitor_task_abortion, ) @@ -92,8 +92,8 @@ async def _run_computational_sidecar_async( log_file_url: AnyUrl, command: list[str], s3_settings: Optional[S3Settings], + boot_mode: BootMode, ) -> TaskOutputData: - task_publishers = TaskPublisher() log.debug( @@ -105,7 +105,6 @@ async def _run_computational_sidecar_async( async with monitor_task_abortion( task_name=current_task.get_name(), log_publisher=task_publishers.logs ): - sidecar_bootmode = get_current_task_boot_mode() task_max_resources = get_current_task_resources() async with ComputationalSidecar( service_key=service_key, @@ -114,7 +113,7 @@ async def _run_computational_sidecar_async( output_data_keys=output_data_keys, log_file_url=log_file_url, docker_auth=docker_auth, - boot_mode=sidecar_bootmode, + boot_mode=boot_mode, task_max_resources=task_max_resources, task_publishers=task_publishers, s3_settings=s3_settings, @@ -133,6 +132,7 @@ def run_computational_sidecar( log_file_url: AnyUrl, command: list[str], s3_settings: Optional[S3Settings], + boot_mode: BootMode = BootMode.CPU, ) -> TaskOutputData: # NOTE: The event loop MUST BE created in the main thread prior to this # Dask creates threads to run these calls, and the loop shall be created before @@ -155,6 +155,7 @@ def run_computational_sidecar( log_file_url, command, s3_settings, + boot_mode, ) ) return result diff --git a/services/dask-sidecar/src/simcore_service_dask_sidecar/utils.py b/services/dask-sidecar/src/simcore_service_dask_sidecar/utils.py index c6eff061615..4115ccfa169 100644 --- a/services/dask-sidecar/src/simcore_service_dask_sidecar/utils.py +++ b/services/dask-sidecar/src/simcore_service_dask_sidecar/utils.py @@ -1,11 +1,11 @@ import asyncio import logging import uuid -from pprint import pformat from typing import Any, Awaitable, Coroutine, Optional, cast import aiodocker from aiodocker.containers import DockerContainer +from pydantic import ByteSize, parse_obj_as logger = logging.getLogger(__name__) @@ -14,6 +14,22 @@ def _wrap_async_call(fct: Awaitable[Any]) -> Any: return asyncio.get_event_loop().run_until_complete(fct) +def _nvidia_smi_docker_config(cmd: list[str]) -> dict[str, Any]: + return { + "Cmd": ["nvidia-smi"] + cmd, + "Image": "nvidia/cuda:10.0-base", + "AttachStdin": False, + "AttachStdout": False, + "AttachStderr": False, + "Tty": False, + "OpenStdin": False, + "HostConfig": { + "Init": True, + "AutoRemove": False, + }, # NOTE: The Init parameter shows a weird behavior: no exception thrown when the container fails + } + + def num_available_gpus() -> int: """Returns the number of available GPUs, 0 if not a gpu node""" @@ -21,19 +37,7 @@ async def async_num_available_gpus() -> int: num_gpus = 0 container: Optional[DockerContainer] = None async with aiodocker.Docker() as docker: - spec_config = { - "Cmd": ["nvidia-smi", "--list-gpus"], - "Image": "nvidia/cuda:10.0-base", - "AttachStdin": False, - "AttachStdout": False, - "AttachStderr": False, - "Tty": False, - "OpenStdin": False, - "HostConfig": { - "Init": True, - "AutoRemove": False, - }, # NOTE: The Init parameter shows a weird behavior: no exception thrown when the container fails - } + spec_config = _nvidia_smi_docker_config(["--list-gpus"]) try: container = await docker.containers.run( config=spec_config, name=f"sidecar_{uuid.uuid4()}_test_gpu" @@ -51,14 +55,6 @@ async def async_num_available_gpus() -> int: if container_data.setdefault("StatusCode", 127) == 0 else 0 ) - logger.debug( - "testing for GPU presence with docker run %s %s completed with status code %s, found %d gpus:\nlogs:\n%s", - spec_config["Image"], - spec_config["Cmd"], - container_data["StatusCode"], - num_gpus, - pformat(container_logs), - ) except asyncio.TimeoutError as err: logger.warning( "num_gpus timedout while check-run %s: %s", spec_config, err @@ -75,3 +71,54 @@ async def async_num_available_gpus() -> int: return num_gpus return cast(int, _wrap_async_call(async_num_available_gpus())) + + +def video_memory() -> int: + """Returns the amount of VRAM available in bytes. 0 if no GPU available""" + + async def async_video_memory() -> int: + video_ram: ByteSize = ByteSize(0) + container: Optional[DockerContainer] = None + async with aiodocker.Docker() as docker: + spec_config = _nvidia_smi_docker_config( + [ + "--query-gpu=memory.total", + "--format=csv,noheader", + ] + ) + + try: + container = await docker.containers.run( + config=spec_config, name=f"sidecar_{uuid.uuid4()}_test_gpu_memory" + ) + if not container: + return 0 + + container_data = await container.wait(timeout=10) + container_logs = await cast( + Coroutine, + container.log(stdout=True, stderr=True, follow=False), + ) + video_ram = parse_obj_as(ByteSize, 0) + if container_data.setdefault("StatusCode", 127) == 0: + for line in container_logs: + video_ram = parse_obj_as( + ByteSize, video_ram + parse_obj_as(ByteSize, line) + ) + + except asyncio.TimeoutError as err: + logger.warning( + "num_gpus timedout while check-run %s: %s", spec_config, err + ) + except aiodocker.exceptions.DockerError as err: + logger.warning( + "num_gpus DockerError while check-run %s: %s", spec_config, err + ) + finally: + if container is not None: + # ensure container is removed + await container.delete() + + return video_ram + + return cast(int, _wrap_async_call(async_video_memory())) diff --git a/services/dask-sidecar/tests/unit/conftest.py b/services/dask-sidecar/tests/unit/conftest.py index 632886b874c..c9f4ede9854 100644 --- a/services/dask-sidecar/tests/unit/conftest.py +++ b/services/dask-sidecar/tests/unit/conftest.py @@ -82,7 +82,7 @@ def dask_client(mock_service_envs: None) -> Iterable[distributed.Client]: with distributed.LocalCluster( worker_class=distributed.Worker, **{ - "resources": {"CPU": 10, "GPU": 10, "MPI": 1}, + "resources": {"CPU": 10, "GPU": 10}, "preload": "simcore_service_dask_sidecar.tasks", }, ) as cluster: diff --git a/services/dask-sidecar/tests/unit/test_dask_utils.py b/services/dask-sidecar/tests/unit/test_dask_utils.py index b7a4038b553..b75e5366a50 100644 --- a/services/dask-sidecar/tests/unit/test_dask_utils.py +++ b/services/dask-sidecar/tests/unit/test_dask_utils.py @@ -14,9 +14,8 @@ from dask_task_models_library.container_tasks.errors import TaskCancelledError from dask_task_models_library.container_tasks.events import TaskLogEvent from dask_task_models_library.container_tasks.io import TaskCancelEventName -from simcore_service_dask_sidecar.boot_mode import BootMode from simcore_service_dask_sidecar.dask_utils import ( - get_current_task_boot_mode, + _DEFAULT_MAX_RESOURCES, get_current_task_resources, is_current_task_aborted, monitor_task_abortion, @@ -142,29 +141,10 @@ def test_monitor_task_abortion(dask_client: distributed.Client): future.result(timeout=DASK_TESTING_TIMEOUT_S) -@pytest.mark.parametrize( - "resources, expected_boot_mode", - [ - ({"CPU": 2}, BootMode.CPU), - ({"MPI": 1.0}, BootMode.MPI), - ({"GPU": 5.0}, BootMode.GPU), - ], -) -def test_task_boot_mode( - dask_client: distributed.Client, - resources: dict[str, Any], - expected_boot_mode: BootMode, -): - future = dask_client.submit(get_current_task_boot_mode, resources=resources) - received_boot_mode = future.result(timeout=DASK_TESTING_TIMEOUT_S) - assert received_boot_mode == expected_boot_mode - - @pytest.mark.parametrize( "resources", [ ({"CPU": 2}), - ({"MPI": 1.0}), ({"GPU": 5.0}), ], ) @@ -174,4 +154,6 @@ def test_task_resources( ): future = dask_client.submit(get_current_task_resources, resources=resources) received_resources = future.result(timeout=DASK_TESTING_TIMEOUT_S) - assert received_resources == resources + current_resources = _DEFAULT_MAX_RESOURCES + current_resources.update(resources) + assert received_resources == current_resources diff --git a/services/dask-sidecar/tests/unit/test_docker_utils.py b/services/dask-sidecar/tests/unit/test_docker_utils.py index b545f017a69..69d3d68a0a1 100644 --- a/services/dask-sidecar/tests/unit/test_docker_utils.py +++ b/services/dask-sidecar/tests/unit/test_docker_utils.py @@ -9,8 +9,8 @@ import aiodocker import pytest +from models_library.services_resources import BootMode from pytest_mock.plugin import MockerFixture -from simcore_service_dask_sidecar.boot_mode import BootMode from simcore_service_dask_sidecar.computational_sidecar.docker_utils import ( DEFAULT_TIME_STAMP, LogType, @@ -58,7 +58,6 @@ async def test_create_container_config( boot_mode: BootMode, task_max_resources: dict[str, Any], ): - container_config = await create_container_config( docker_registry, service_key, @@ -187,6 +186,10 @@ async def test_create_container_config( "2021-10-05T09:53:48.873236400Z [PROGRESS]1.000000\n", (LogType.PROGRESS, "2021-10-05T09:53:48.873236400Z", "1.00"), ), + ( + "2021-10-05T09:53:48.873236400Z [PROGRESS]: 1% [ 10 / 624 ] Time Update, estimated remaining time 1 seconds @ 26.43 MCells/s", + (LogType.PROGRESS, "2021-10-05T09:53:48.873236400Z", "0.01"), + ), ], ) async def test_parse_line(log_line: str, expected_parsing: tuple[LogType, str, str]): @@ -250,3 +253,32 @@ async def test_managed_container_always_removes_container( .delete(remove=True, v=True, force=True) ] ) + + +async def test_managed_container_with_broken_container_raises_docker_exception( + docker_registry: str, + service_key: str, + service_version: str, + command: list[str], + comp_volume_mount_point: str, + mocker: MockerFixture, +): + container_config = await create_container_config( + docker_registry, + service_key, + service_version, + command, + comp_volume_mount_point, + boot_mode=BootMode.CPU, + task_max_resources={}, + ) + mocked_aiodocker = mocker.patch("aiodocker.Docker", autospec=True) + mocked_aiodocker.return_value.__aenter__.return_value.containers.create.return_value.delete.side_effect = aiodocker.DockerError( + "bad", {"message": "pytest fake bad message"} + ) + async with aiodocker.Docker() as docker_client: + with pytest.raises(aiodocker.DockerError, match="pytest fake bad message"): + async with managed_container( + docker_client=docker_client, config=container_config + ) as container: + assert container is not None diff --git a/services/dask-sidecar/tests/unit/test_tasks.py b/services/dask-sidecar/tests/unit/test_tasks.py index aea5e51a54d..8309c38cb53 100644 --- a/services/dask-sidecar/tests/unit/test_tasks.py +++ b/services/dask-sidecar/tests/unit/test_tasks.py @@ -34,6 +34,7 @@ from distributed import Client from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID +from models_library.services_resources import BootMode from models_library.users import UserID from packaging import version from pydantic import AnyUrl, SecretStr @@ -50,6 +51,7 @@ from simcore_service_dask_sidecar.computational_sidecar.models import ( LEGACY_INTEGRATION_VERSION, ) +from simcore_service_dask_sidecar.dask_utils import _DEFAULT_MAX_RESOURCES from simcore_service_dask_sidecar.file_utils import _s3fs_settings_from_s3_settings from simcore_service_dask_sidecar.tasks import run_computational_sidecar @@ -143,15 +145,39 @@ class ServiceExampleParam: pytest_simcore_ops_services_selection = ["minio"] -@pytest.fixture(params=[f"{LEGACY_INTEGRATION_VERSION}", "1.0.0"]) +def _bash_check_env_exist(variable_name: str, variable_value: str) -> list[str]: + return [ + f"if [ -z ${{{variable_name}+x}} ];then echo {variable_name} does not exist && exit 9;fi", + f'if [ "${{{variable_name}}}" != "{variable_value}" ];then echo expected "{variable_value}" and found "${{{variable_name}}}" && exit 9;fi', + ] + + +@pytest.fixture(params=list(BootMode), ids=str) +def boot_mode(request: FixtureRequest) -> BootMode: + return request.param + + +@pytest.fixture( + # NOTE: legacy version comes second as it is less easy to debug issues with that one + params=[ + "1.0.0", + f"{LEGACY_INTEGRATION_VERSION}", + ], + ids=lambda v: f"integration.version.{v}", +) +def integration_version(request: FixtureRequest) -> version.Version: + print("Using service integration:", request.param) + return version.Version(request.param) + + +@pytest.fixture def ubuntu_task( - request: FixtureRequest, + integration_version: version.Version, file_on_s3_server: Callable[..., AnyUrl], s3_remote_file_url: Callable[..., AnyUrl], + boot_mode: BootMode, ) -> ServiceExampleParam: """Creates a console task in an ubuntu distro that checks for the expected files and error in case they are missing""" - integration_version = version.Version(request.param) # type: ignore - print("Using service integration:", integration_version) # let's have some input files on the file server NUM_FILES = 12 list_of_files = [file_on_s3_server() for _ in range(NUM_FILES)] @@ -186,9 +212,26 @@ def ubuntu_task( "ls -tlah -R ${OUTPUT_FOLDER}", "echo Logs:", "ls -tlah -R ${LOG_FOLDER}", + "echo Envs:", + "printenv", ] + + # check expected ENVS are set + list_of_commands += _bash_check_env_exist( + variable_name="SC_COMP_SERVICES_SCHEDULED_AS", variable_value=boot_mode.value + ) + list_of_commands += _bash_check_env_exist( + variable_name="SIMCORE_NANO_CPUS_LIMIT", + variable_value=f"{int(_DEFAULT_MAX_RESOURCES['CPU']*1e9)}", + ) + list_of_commands += _bash_check_env_exist( + variable_name="SIMCORE_MEMORY_BYTES_LIMIT", + variable_value=f"{_DEFAULT_MAX_RESOURCES['RAM']}", + ) + + # check input files list_of_commands += [ - f"(test -f ${{INPUT_FOLDER}}/{file} || (echo ${{INPUT_FOLDER}}/{file} does not exists && exit 1))" + f"(test -f ${{INPUT_FOLDER}}/{file} || (echo ${{INPUT_FOLDER}}/{file} does not exist && exit 1))" for file in file_names ] + [f"echo $(cat ${{INPUT_FOLDER}}/{file})" for file in file_names] @@ -279,7 +322,8 @@ def ubuntu_task( # NOTE: we use sleeper because it defines a user # that can write in outputs and the # sidecar can remove the outputs dirs - # + # it is based on ubuntu though but the bad part is that now it uses sh instead of bash... + # cause the entrypoint uses sh service_key="itisfoundation/sleeper", service_version="2.1.2", command=[ @@ -333,6 +377,7 @@ def test_run_computational_sidecar_real_fct( ubuntu_task: ServiceExampleParam, mocker: MockerFixture, s3_settings: S3Settings, + boot_mode: BootMode, ): mocked_get_integration_version = mocker.patch( "simcore_service_dask_sidecar.computational_sidecar.core.get_integration_version", @@ -348,6 +393,7 @@ def test_run_computational_sidecar_real_fct( ubuntu_task.log_file_url, ubuntu_task.command, s3_settings, + boot_mode, ) mocked_get_integration_version.assert_called_once_with( mock.ANY, @@ -400,12 +446,16 @@ def test_run_computational_sidecar_real_fct( assert log in saved_logs +@pytest.mark.parametrize( + "integration_version, boot_mode", [("1.0.0", BootMode.CPU)], indirect=True +) def test_run_multiple_computational_sidecar_dask( event_loop: asyncio.AbstractEventLoop, dask_client: Client, ubuntu_task: ServiceExampleParam, mocker: MockerFixture, s3_settings: S3Settings, + boot_mode: BootMode, ): NUMBER_OF_TASKS = 50 @@ -426,6 +476,7 @@ def test_run_multiple_computational_sidecar_dask( ubuntu_task.command, s3_settings, resources={}, + boot_mode=boot_mode, ) for _ in range(NUMBER_OF_TASKS) ] @@ -441,11 +492,15 @@ def test_run_multiple_computational_sidecar_dask( assert output_data[k] == v +@pytest.mark.parametrize( + "integration_version, boot_mode", [("1.0.0", BootMode.CPU)], indirect=True +) def test_run_computational_sidecar_dask( dask_client: Client, ubuntu_task: ServiceExampleParam, mocker: MockerFixture, s3_settings: S3Settings, + boot_mode: BootMode, ): mocker.patch( "simcore_service_dask_sidecar.computational_sidecar.core.get_integration_version", @@ -463,11 +518,13 @@ def test_run_computational_sidecar_dask( ubuntu_task.command, s3_settings, resources={}, + boot_mode=boot_mode, ) worker_name = next(iter(dask_client.scheduler_info()["workers"])) output_data = future.result() + assert isinstance(output_data, TaskOutputData) # check that the task produces expected logs worker_logs = [log for _, log in dask_client.get_worker_logs()[worker_name]] # type: ignore @@ -497,6 +554,9 @@ def test_run_computational_sidecar_dask( assert fp.details.get("size") > 0 # type: ignore +@pytest.mark.parametrize( + "integration_version, boot_mode", [("1.0.0", BootMode.CPU)], indirect=True +) def test_failing_service_raises_exception( caplog_info_level: LogCaptureFixture, event_loop: asyncio.AbstractEventLoop, @@ -518,6 +578,9 @@ def test_failing_service_raises_exception( ) +@pytest.mark.parametrize( + "integration_version, boot_mode", [("1.0.0", BootMode.CPU)], indirect=True +) def test_running_service_that_generates_unexpected_data_raises_exception( caplog_info_level: LogCaptureFixture, event_loop: asyncio.AbstractEventLoop, diff --git a/services/director-v2/openapi.json b/services/director-v2/openapi.json index 91bafb994b9..392b055bbf4 100644 --- a/services/director-v2/openapi.json +++ b/services/director-v2/openapi.json @@ -1862,6 +1862,16 @@ } } }, + "BootMode": { + "title": "BootMode", + "enum": [ + "CPU", + "GPU", + "MPI" + ], + "type": "string", + "description": "An enumeration." + }, "BootOption": { "title": "BootOption", "required": [ @@ -2199,7 +2209,8 @@ "title": "ComputationCreate", "required": [ "user_id", - "project_id" + "project_id", + "product_name" ], "type": "object", "properties": { @@ -2222,8 +2233,7 @@ }, "product_name": { "title": "Product Name", - "type": "string", - "description": "required if computation is started" + "type": "string" }, "subgraph": { "title": "Subgraph", @@ -2462,7 +2472,10 @@ "limit": 2147483648, "reservation": 2147483648 } - } + }, + "boot_modes": [ + "CPU" + ] } } } @@ -2490,7 +2503,7 @@ "properties": { "image": { "title": "Image", - "pattern": "[\\w/-]+:[\\w.@]+", + "pattern": "^(?P(?:(?:(?:[a-zA-Z0-9-]+\\.)+[a-zA-Z0-9-]+(?::\\d+)?)|[a-zA-Z0-9-]+:\\d+))?(?:/)?(?P(?:[a-z0-9][a-z0-9_.-]*/)*[a-z0-9-_]+[a-z0-9])(?::(?P[\\w][\\w.-]{0,126}[\\w]))?(?P\\@sha256:[a-fA-F0-9]{64})?$", "type": "string", "description": "Used by the frontend to provide a context for the users.Services with a docker-compose spec will have multiple entries.Using the `image:version` instead of the docker-compose spec is more helpful for the end user." }, @@ -2500,6 +2513,16 @@ "additionalProperties": { "$ref": "#/components/schemas/ResourceValue" } + }, + "boot_modes": { + "type": "array", + "items": { + "$ref": "#/components/schemas/BootMode" + }, + "description": "describe how a service shall be booted, using CPU, MPI, openMP or GPU", + "default": [ + "CPU" + ] } }, "example": { @@ -2640,15 +2663,12 @@ "RAM": { "title": "Ram", "type": "integer", - "description": "defines the required (maximum) amount of RAM for running the services in bytes" + "description": "defines the required (maximum) amount of RAM for running the services" }, - "MPI": { - "title": "Mpi", - "maximum": 1.0, - "minimum": 0.0, + "VRAM": { + "title": "Vram", "type": "integer", - "description": "defines whether a MPI node is required for running the services", - "deprecated": true + "description": "defines the required (maximum) amount of VRAM for running the services" } } }, @@ -3471,6 +3491,27 @@ }, "additionalProperties": false }, + "TaskCounts": { + "title": "TaskCounts", + "type": "object", + "properties": { + "error": { + "title": "Error", + "type": "integer", + "default": 0 + }, + "memory": { + "title": "Memory", + "type": "integer", + "default": 0 + }, + "executing": { + "title": "Executing", + "type": "integer", + "default": 0 + } + } + }, "TaskLogFileGet": { "title": "TaskLogFileGet", "required": [ @@ -3631,10 +3672,7 @@ "cpu", "memory", "num_fds", - "ready", - "executing", - "in_flight", - "in_memory" + "task_counts" ], "type": "object", "properties": { @@ -3653,29 +3691,14 @@ "type": "integer", "description": "consumed file descriptors" }, - "ready": { - "title": "Ready", - "minimum": 0.0, - "type": "integer", - "description": "# tasks ready to run" - }, - "executing": { - "title": "Executing", - "minimum": 0.0, - "type": "integer", - "description": "# tasks currently executing" - }, - "in_flight": { - "title": "In Flight", - "minimum": 0.0, - "type": "integer", - "description": "# tasks waiting for data" - }, - "in_memory": { - "title": "In Memory", - "minimum": 0.0, - "type": "integer", - "description": "# tasks in worker memory" + "task_counts": { + "title": "Task Counts", + "allOf": [ + { + "$ref": "#/components/schemas/TaskCounts" + } + ], + "description": "task details" } } }, diff --git a/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py b/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py index 9c4ee7ae102..ab648b20353 100644 --- a/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py +++ b/services/director-v2/src/simcore_service_director_v2/api/routes/computations.py @@ -157,12 +157,16 @@ async def create_computation( minimal_computational_dag, publish=computation.start_pipeline or False, ) + assert computation.product_name # nosec inserted_comp_tasks = await comp_tasks_repo.upsert_tasks_from_project( project, + catalog_client, director_client, published_nodes=list(minimal_computational_dag.nodes()) if computation.start_pipeline else [], + user_id=computation.user_id, + product_name=computation.product_name, ) if computation.start_pipeline: diff --git a/services/director-v2/src/simcore_service_director_v2/core/settings.py b/services/director-v2/src/simcore_service_director_v2/core/settings.py index b48776e7547..9b1e1e72b71 100644 --- a/services/director-v2/src/simcore_service_director_v2/core/settings.py +++ b/services/director-v2/src/simcore_service_director_v2/core/settings.py @@ -3,6 +3,7 @@ import logging +import re from enum import Enum, auto from functools import cached_property from pathlib import Path @@ -27,10 +28,10 @@ from pydantic import ( AnyHttpUrl, AnyUrl, + ConstrainedStr, Field, PositiveFloat, PositiveInt, - constr, validator, ) from settings_library.base import BaseCustomSettings @@ -68,10 +69,12 @@ SUPPORTED_TRAEFIK_LOG_LEVELS: set[str] = {"info", "debug", "warn", "error"} -PlacementConstraintStr = constr( - strip_whitespace=True, - regex=r"^(?!-)(?![.])(?!.*--)(?!.*[.][.])[a-zA-Z0-9.-]*(? "CatalogClient": async def request(self, method: str, tail_path: str, **kwargs) -> httpx.Response: return await self.client.request(method, tail_path, **kwargs) - @log_decorator(logger=logger) async def get_service( self, user_id: UserID, @@ -67,7 +65,6 @@ async def get_service( service_version: ServiceVersion, product_name: str, ) -> dict[str, Any]: - resp = await self.request( "GET", f"/services/{urllib.parse.quote( service_key, safe='')}/{service_version}", @@ -79,11 +76,22 @@ async def get_service( return resp.json() raise HTTPException(status_code=resp.status_code, detail=resp.content) - @log_decorator(logger=logger) - async def get_service_specifications( + async def get_service_resources( self, user_id: UserID, service_key: ServiceKey, service_version: ServiceVersion ) -> dict[str, Any]: + resp = await self.request( + "GET", + f"/services/{urllib.parse.quote( service_key, safe='')}/{service_version}/resources", + params={"user_id": user_id}, + ) + resp.raise_for_status() + if resp.status_code == status.HTTP_200_OK: + return resp.json() + raise HTTPException(status_code=resp.status_code, detail=resp.content) + async def get_service_specifications( + self, user_id: UserID, service_key: ServiceKey, service_version: ServiceVersion + ) -> dict[str, Any]: resp = await self.request( "GET", f"/services/{urllib.parse.quote( service_key, safe='')}/{service_version}/specifications", diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py b/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py index 0fafde35b99..ee8a40fa93b 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dask_client.py @@ -31,6 +31,7 @@ from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID from models_library.projects_state import RunningState +from models_library.services_resources import BootMode from models_library.users import UserID from pydantic import parse_obj_as from pydantic.networks import AnyUrl @@ -47,7 +48,7 @@ ComputationalBackendTaskNotFoundError, ComputationalBackendTaskResultsNotReadyError, ) -from ..core.settings import ComputationalBackendSettings +from ..core.settings import AppSettings, ComputationalBackendSettings from ..models.domains.comp_tasks import Image from ..models.schemas.clusters import ClusterDetails, Scheduler from ..utils.dask import ( @@ -99,6 +100,7 @@ LogFileUploadURL, Commands, Optional[S3Settings], + BootMode, ], TaskOutputData, ] @@ -206,6 +208,7 @@ def _comp_sidecar_fct( log_file_url: AnyUrl, command: list[str], s3_settings: Optional[S3Settings], + boot_mode: BootMode, ) -> TaskOutputData: """This function is serialized by the Dask client and sent over to the Dask sidecar(s) Therefore, (screaming here) DO NOT MOVE THAT IMPORT ANYWHERE ELSE EVER!!""" @@ -220,6 +223,7 @@ def _comp_sidecar_fct( log_file_url, command, s3_settings, + boot_mode, ) if remote_fct is None: @@ -233,6 +237,7 @@ def _comp_sidecar_fct( project_id=project_id, node_id=node_id, ) + assert node_image.node_requirements # nosec dask_resources = from_node_reqs_to_dask_resources( node_image.node_requirements ) @@ -299,12 +304,15 @@ def _comp_sidecar_fct( ) try: + assert self.app.state # nosec + assert self.app.state.settings # nosec + settings: AppSettings = self.app.state.settings task_future = self.backend.client.submit( remote_fct, docker_auth=DockerBasicAuth( - server_address=self.app.state.settings.DIRECTOR_V2_DOCKER_REGISTRY.resolved_registry_url, - username=self.app.state.settings.DIRECTOR_V2_DOCKER_REGISTRY.REGISTRY_USER, - password=self.app.state.settings.DIRECTOR_V2_DOCKER_REGISTRY.REGISTRY_PW, + server_address=settings.DIRECTOR_V2_DOCKER_REGISTRY.resolved_registry_url, + username=settings.DIRECTOR_V2_DOCKER_REGISTRY.REGISTRY_USER, + password=settings.DIRECTOR_V2_DOCKER_REGISTRY.REGISTRY_PW, ), service_key=node_image.name, service_version=node_image.tag, @@ -313,6 +321,7 @@ def _comp_sidecar_fct( log_file_url=log_file_url, command=node_image.command, s3_settings=s3_settings, + boot_mode=node_image.boot_mode, key=job_id, resources=dask_resources, retries=0, diff --git a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks.py b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks.py index d79f616cf11..51b1159744a 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/db/repositories/comp_tasks.py @@ -9,15 +9,18 @@ from models_library.projects_nodes import Node from models_library.projects_nodes_io import NodeID from models_library.projects_state import RunningState -from models_library.services import ServiceDockerData, ServiceKeyVersion +from models_library.services import ServiceDockerData +from models_library.services_resources import BootMode +from models_library.users import UserID from sqlalchemy import literal_column from sqlalchemy.dialects.postgresql import insert from ....core.errors import ErrorDict from ....models.domains.comp_tasks import CompTaskAtDB, Image, NodeSchema -from ....models.schemas.services import ServiceExtras +from ....models.schemas.services import NodeRequirements, ServiceExtras from ....utils.computations import to_node_class from ....utils.db import RUNNING_STATE_TO_DB +from ...catalog import CatalogClient from ...director_v0 import DirectorV0Client from ..tables import NodeClass, StateType, comp_tasks from ._base import BaseRepository @@ -38,45 +41,76 @@ } +async def _get_service_details( + catalog_client: CatalogClient, user_id: UserID, product_name: str, node: Node +) -> ServiceDockerData: + service_details = await catalog_client.get_service( + user_id, + node.key, + node.version, + product_name, + ) + return ServiceDockerData.construct(**service_details) + + +def _compute_node_requirements(node_resources: dict[str, Any]) -> NodeRequirements: + node_defined_resources = {} + + for image_data in node_resources.values(): + for resource_name, resource_value in image_data.get("resources", {}).items(): + node_defined_resources[resource_name] = node_defined_resources.get( + resource_name, 0 + ) + min(resource_value["limit"], resource_value["reservation"]) + return NodeRequirements.parse_obj(node_defined_resources) + + +def _compute_node_boot_mode(node_resources: dict[str, Any]) -> BootMode: + for image_data in node_resources.values(): + return BootMode(image_data.get("boot_modes")[0]) + raise RuntimeError("No BootMode") + + async def _generate_tasks_list_from_project( project: ProjectAtDB, + catalog_client: CatalogClient, director_client: DirectorV0Client, published_nodes: list[NodeID], + user_id: UserID, + product_name: str, ) -> list[CompTaskAtDB]: - list_comp_tasks = [] for internal_id, node_id in enumerate(project.workbench, 1): node: Node = project.workbench[node_id] - service_key_version = ServiceKeyVersion( - key=node.key, - version=node.version, - ) - node_class = to_node_class(service_key_version.key) + # get node infos + node_class = to_node_class(node.key) node_details: Optional[ServiceDockerData] = None + node_resources: Optional[dict[str, Any]] = None node_extras: Optional[ServiceExtras] = None if node_class == NodeClass.FRONTEND: - node_details = _FRONTEND_SERVICES_CATALOG.get(service_key_version.key, None) + node_details = _FRONTEND_SERVICES_CATALOG.get(node.key, None) else: - node_details, node_extras = await asyncio.gather( - director_client.get_service_details(service_key_version), - director_client.get_service_extras(service_key_version), + node_details, node_resources, node_extras = await asyncio.gather( + _get_service_details(catalog_client, user_id, product_name, node), + catalog_client.get_service_resources(user_id, node.key, node.version), + director_client.get_service_extras(node.key, node.version), ) if not node_details: continue - # aggregates node_details amd node_extras into Image + # aggregates node_details and node_extras into Image data: dict[str, Any] = { - "name": service_key_version.key, - "tag": service_key_version.version, + "name": node.key, + "tag": node.version, } - if node_extras: - data.update(node_requirements=node_extras.node_requirements) - if node_extras.container_spec: - data.update(command=node_extras.container_spec.command) + + if node_resources: + data.update(node_requirements=_compute_node_requirements(node_resources)) + data["boot_mode"] = _compute_node_boot_mode(node_resources) + if node_extras and node_extras.container_spec: + data.update(command=node_extras.container_spec.command) image = Image.parse_obj(data) - assert image.command # nosec assert node.state is not None # nosec task_state = node.state.current_status @@ -85,7 +119,7 @@ async def _generate_tasks_list_from_project( task_db = CompTaskAtDB( project_id=project.uuid, - node_id=node_id, + node_id=NodeID(node_id), schema=NodeSchema.parse_obj( node_details.dict( exclude_unset=True, by_alias=True, include={"inputs", "outputs"} @@ -151,14 +185,22 @@ async def check_task_exists(self, project_id: ProjectID, node_id: NodeID) -> boo async def upsert_tasks_from_project( self, project: ProjectAtDB, + catalog_client: CatalogClient, director_client: DirectorV0Client, published_nodes: list[NodeID], + user_id: UserID, + product_name: str, ) -> list[CompTaskAtDB]: # NOTE: really do an upsert here because of issue https://github.com/ITISFoundation/osparc-simcore/issues/2125 list_of_comp_tasks_in_project: list[ CompTaskAtDB ] = await _generate_tasks_list_from_project( - project, director_client, published_nodes + project, + catalog_client, + director_client, + published_nodes, + user_id, + product_name, ) async with self.db_engine.acquire() as conn: # get current tasks @@ -186,7 +228,6 @@ async def upsert_tasks_from_project( # NOTE: an exception to this is when a frontend service changes its output since there is no node_ports, the UPDATE must be done here. inserted_comp_tasks_db: list[CompTaskAtDB] = [] for comp_task_db in list_of_comp_tasks_in_project: - insert_stmt = insert(comp_tasks).values(**comp_task_db.to_db_model()) exclusion_rule = ( diff --git a/services/director-v2/src/simcore_service_director_v2/modules/director_v0.py b/services/director-v2/src/simcore_service_director_v2/modules/director_v0.py index 2558cc58af6..9ca20d4208f 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/director_v0.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/director_v0.py @@ -5,7 +5,7 @@ import logging import urllib.parse from dataclasses import dataclass -from typing import List, Optional +from typing import Any, Optional, cast import httpx import yarl @@ -13,7 +13,12 @@ from models_library.projects import ProjectID from models_library.projects_nodes import NodeID from models_library.service_settings_labels import SimcoreServiceLabels -from models_library.services import ServiceDockerData, ServiceKeyVersion +from models_library.services import ( + ServiceDockerData, + ServiceKey, + ServiceKeyVersion, + ServiceVersion, +) from models_library.users import UserID # Module's business logic --------------------------------------------- @@ -105,14 +110,17 @@ async def get_service_details( "GET", f"/services/{urllib.parse.quote_plus(service.key)}/{service.version}" ) if resp.status_code == status.HTTP_200_OK: - return ServiceDockerData.parse_obj(unenvelope_or_raise_error(resp)[0]) + data = cast(list[dict[str, Any]], unenvelope_or_raise_error(resp)) + return ServiceDockerData.parse_obj(data[0]) raise HTTPException(status_code=resp.status_code, detail=resp.content) @log_decorator(logger=logger) - async def get_service_extras(self, service: ServiceKeyVersion) -> ServiceExtras: + async def get_service_extras( + self, service_key: ServiceKey, service_version: ServiceVersion + ) -> ServiceExtras: resp = await self.request( "GET", - f"/service_extras/{urllib.parse.quote_plus(service.key)}/{service.version}", + f"/service_extras/{urllib.parse.quote_plus(service_key)}/{service_version}", ) if resp.status_code == status.HTTP_200_OK: return ServiceExtras.parse_obj(unenvelope_or_raise_error(resp)) @@ -145,7 +153,7 @@ async def get_service_labels( @log_decorator(logger=logger) async def get_running_services( self, user_id: Optional[UserID] = None, project_id: Optional[ProjectID] = None - ) -> List[RunningDynamicServiceDetails]: + ) -> list[RunningDynamicServiceDetails]: query_params = {} if user_id is not None: query_params["user_id"] = f"{user_id}" @@ -159,6 +167,6 @@ async def get_running_services( if resp.status_code == status.HTTP_200_OK: return [ RunningDynamicServiceDetails(**x) - for x in unenvelope_or_raise_error(resp) + for x in cast(list[dict[str, Any]], unenvelope_or_raise_error(resp)) ] raise HTTPException(status_code=resp.status_code, detail=resp.content) diff --git a/services/director-v2/src/simcore_service_director_v2/utils/clients.py b/services/director-v2/src/simcore_service_director_v2/utils/clients.py index 8847de64e0f..d01d38a1907 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/clients.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/clients.py @@ -1,5 +1,5 @@ import logging -from typing import Any, Dict +from typing import Any, Union import httpx from fastapi import HTTPException @@ -9,7 +9,7 @@ logger = logging.getLogger(__name__) -def unenvelope_or_raise_error(resp: httpx.Response) -> Dict[str, Any]: +def unenvelope_or_raise_error(resp: httpx.Response) -> Union[list[Any], dict[str, Any]]: """ Director responses are enveloped If successful response, we un-envelop it and return data as a dict diff --git a/services/director-v2/src/simcore_service_director_v2/utils/dags.py b/services/director-v2/src/simcore_service_director_v2/utils/dags.py index 5ff128b4043..b15109347ce 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/dags.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/dags.py @@ -1,6 +1,6 @@ import logging from copy import deepcopy -from typing import Any, Dict, List, Set +from typing import Any import networkx as nx from models_library.projects import Workbench @@ -24,7 +24,6 @@ def _is_node_computational(node_key: str) -> bool: return False -@log_decorator(logger=logger) def create_complete_dag(workbench: Workbench) -> nx.DiGraph: """creates a complete graph out of the project workbench""" dag_graph = nx.DiGraph() @@ -48,7 +47,7 @@ def create_complete_dag(workbench: Workbench) -> nx.DiGraph: @log_decorator(logger=logger) -def create_complete_dag_from_tasks(tasks: List[CompTaskAtDB]) -> nx.DiGraph: +def create_complete_dag_from_tasks(tasks: list[CompTaskAtDB]) -> nx.DiGraph: dag_graph = nx.DiGraph() for task in tasks: dag_graph.add_node( @@ -86,7 +85,7 @@ async def compute_node_modified_state( return True # maybe our inputs changed? let's compute the node hash and compare with the saved one - async def get_node_io_payload_cb(node_id: NodeID) -> Dict[str, Any]: + async def get_node_io_payload_cb(node_id: NodeID) -> dict[str, Any]: return nodes_data_view[str(node_id)] computed_hash = await compute_node_hash(node_id, get_node_io_payload_cb) @@ -95,10 +94,10 @@ async def get_node_io_payload_cb(node_id: NodeID) -> Dict[str, Any]: return False -async def compute_node_dependencies_state(nodes_data_view, node_id) -> Set[NodeID]: +async def compute_node_dependencies_state(nodes_data_view, node_id) -> set[NodeID]: node = nodes_data_view[str(node_id)] # check if the previous node is outdated or waits for dependencies... in which case this one has to wait - non_computed_dependencies: Set[NodeID] = set() + non_computed_dependencies: set[NodeID] = set() for input_port in node.get("inputs", {}).values(): if isinstance(input_port, PortLink): if node_needs_computation(nodes_data_view, input_port.node_uuid): @@ -142,7 +141,7 @@ async def _set_computational_nodes_states(complete_dag: nx.DiGraph) -> None: @log_decorator(logger=logger) async def create_minimal_computational_graph_based_on_selection( - complete_dag: nx.DiGraph, selected_nodes: List[NodeID], force_restart: bool + complete_dag: nx.DiGraph, selected_nodes: list[NodeID], force_restart: bool ) -> nx.DiGraph: nodes_data_view: nx.classes.reportviews.NodeDataView = complete_dag.nodes.data() try: @@ -153,7 +152,7 @@ async def create_minimal_computational_graph_based_on_selection( return nx.DiGraph() # second pass, detect all the nodes that need to be run - minimal_nodes_selection: Set[str] = set() + minimal_nodes_selection: set[str] = set() if not selected_nodes: # fully automatic detection, we want anything that is waiting for dependencies or outdated minimal_nodes_selection.update( @@ -168,12 +167,12 @@ async def create_minimal_computational_graph_based_on_selection( # we want all the outdated nodes that are in the tree leading to the selected nodes for node in selected_nodes: minimal_nodes_selection.update( - set( + { n for n in nx.bfs_tree(complete_dag, f"{node}", reverse=True) if _is_node_computational(nodes_data_view[n]["key"]) and node_needs_computation(nodes_data_view, n) - ) + } ) if force_restart and _is_node_computational( nodes_data_view[f"{node}"]["key"] @@ -185,7 +184,7 @@ async def create_minimal_computational_graph_based_on_selection( @log_decorator(logger=logger) async def compute_pipeline_details( - complete_dag: nx.DiGraph, pipeline_dag: nx.DiGraph, comp_tasks: List[CompTaskAtDB] + complete_dag: nx.DiGraph, pipeline_dag: nx.DiGraph, comp_tasks: list[CompTaskAtDB] ) -> PipelineDetails: try: # FIXME: this problem of cyclic graphs for control loops create all kinds of issues that must be fixed @@ -211,7 +210,7 @@ async def compute_pipeline_details( ) -def find_computational_node_cycles(dag: nx.DiGraph) -> List[List[str]]: +def find_computational_node_cycles(dag: nx.DiGraph) -> list[list[str]]: """returns a list of nodes part of a cycle and computational, which is currently forbidden.""" computational_node_cycles = [] list_potential_cycles = nx.simple_cycles(dag) diff --git a/services/director-v2/src/simcore_service_director_v2/utils/dask.py b/services/director-v2/src/simcore_service_director_v2/utils/dask.py index 5635365f603..8a1816a7a97 100644 --- a/services/director-v2/src/simcore_service_director_v2/utils/dask.py +++ b/services/director-v2/src/simcore_service_director_v2/utils/dask.py @@ -284,7 +284,6 @@ async def compute_service_log_file_upload_link( node_id: NodeID, file_link_type: FileLinkType, ) -> AnyUrl: - value_links = await port_utils.get_upload_links_from_storage( user_id=user_id, project_id=f"{project_id}", @@ -404,7 +403,9 @@ def from_node_reqs_to_dask_resources( ) -> dict[str, Union[int, float]]: """Dask resources are set such as {"CPU": X.X, "GPU": Y.Y, "RAM": INT}""" dask_resources = node_reqs.dict( - exclude_unset=True, by_alias=True, exclude_none=True + exclude_unset=True, + by_alias=True, + exclude_none=True, ) logger.debug("transformed to dask resources: %s", dask_resources) return dask_resources diff --git a/services/director-v2/tests/unit/conftest.py b/services/director-v2/tests/unit/conftest.py index 79d3566d392..2ae3ee025c0 100644 --- a/services/director-v2/tests/unit/conftest.py +++ b/services/director-v2/tests/unit/conftest.py @@ -182,24 +182,12 @@ async def dask_spec_local_cluster( }, }, }, - "mpi-worker": { + "bigcpu-worker": { "cls": Worker, "options": { "nthreads": 1, "resources": { "CPU": 8, - "MPI": 1, - "RAM": 768e9, - }, - }, - }, - "gpu-mpi-worker": { - "cls": Worker, - "options": { - "nthreads": 1, - "resources": { - "GPU": 1, - "MPI": 1, "RAM": 768e9, }, }, @@ -237,7 +225,7 @@ def local_dask_gateway_server_config( c.ClusterConfig.worker_cmd = [ # type: ignore "dask-worker", "--resources", - f"CPU=12,GPU=1,MPI=1,RAM={16e9}", + f"CPU=12,GPU=1,RAM={16e9}", ] # NOTE: This must be set such that the local unsafe backend creates a worker with enough cores/memory c.ClusterConfig.worker_cores = 12 # type: ignore diff --git a/services/director-v2/tests/unit/test_modules_dask_client.py b/services/director-v2/tests/unit/test_modules_dask_client.py index 9987767ad47..3aac6b498e7 100644 --- a/services/director-v2/tests/unit/test_modules_dask_client.py +++ b/services/director-v2/tests/unit/test_modules_dask_client.py @@ -38,6 +38,7 @@ from models_library.projects import ProjectID from models_library.projects_nodes_io import NodeID from models_library.projects_state import RunningState +from models_library.services_resources import BootMode from models_library.users import UserID from pydantic import AnyUrl, ByteSize, SecretStr from pydantic.tools import parse_obj_as @@ -260,7 +261,9 @@ def cpu_image(node_id: NodeID) -> ImageParams: name="simcore/services/comp/pytest/cpu_image", tag="1.5.5", node_requirements=NodeRequirements( - CPU=1, RAM=parse_obj_as(ByteSize, "128 MiB"), GPU=None, MPI=None + CPU=1, + RAM=parse_obj_as(ByteSize, "128 MiB"), + GPU=None, ), ) # type: ignore return ImageParams( @@ -285,7 +288,9 @@ def gpu_image(node_id: NodeID) -> ImageParams: name="simcore/services/comp/pytest/gpu_image", tag="1.4.7", node_requirements=NodeRequirements( - CPU=1, GPU=1, RAM=parse_obj_as(ByteSize, "256 MiB"), MPI=None + CPU=1, + GPU=1, + RAM=parse_obj_as(ByteSize, "256 MiB"), ), ) # type: ignore return ImageParams( @@ -306,41 +311,13 @@ def gpu_image(node_id: NodeID) -> ImageParams: ) -@pytest.fixture -def mpi_image(node_id: NodeID) -> ImageParams: - image = Image( - name="simcore/services/comp/pytest/mpi_image", - tag="1.4.5123", - node_requirements=NodeRequirements( - CPU=2, RAM=parse_obj_as(ByteSize, "128 MiB"), MPI=1, GPU=None - ), - ) # type: ignore - return ImageParams( - image=image, - expected_annotations={ - "resources": { - "CPU": 2.0, - "MPI": 1.0, - "RAM": 128 * 1024 * 1024, - }, - }, - expected_used_resources={ - "CPU": 2.0, - "MPI": 1.0, - "RAM": 128 * 1024 * 1024.0, - }, - fake_tasks={node_id: image}, - ) - - -@pytest.fixture(params=[cpu_image.__name__, gpu_image.__name__, mpi_image.__name__]) +@pytest.fixture(params=[cpu_image.__name__, gpu_image.__name__]) def image_params( - cpu_image: ImageParams, gpu_image: ImageParams, mpi_image: ImageParams, request + cpu_image: ImageParams, gpu_image: ImageParams, request ) -> ImageParams: return { "cpu_image": cpu_image, "gpu_image": gpu_image, - "mpi_image": mpi_image, }[request.param] @@ -476,6 +453,7 @@ def fake_sidecar_fct( log_file_url: AnyUrl, command: list[str], s3_settings: Optional[S3Settings], + boot_mode: BootMode, expected_annotations, ) -> TaskOutputData: # get the task data @@ -568,6 +546,7 @@ def fake_sidecar_fct( log_file_url: AnyUrl, command: list[str], s3_settings: Optional[S3Settings], + boot_mode: BootMode = BootMode.CPU, ) -> TaskOutputData: # get the task data worker = get_worker() @@ -646,6 +625,7 @@ def fake_remote_fct( log_file_url: AnyUrl, command: list[str], s3_settings: Optional[S3Settings], + boot_mode: BootMode = BootMode.CPU, ) -> TaskOutputData: # get the task data worker = get_worker() @@ -728,6 +708,7 @@ def fake_failing_sidecar_fct( log_file_url: AnyUrl, command: list[str], s3_settings: Optional[S3Settings], + boot_mode: BootMode = BootMode.CPU, ) -> TaskOutputData: raise ValueError( @@ -782,20 +763,21 @@ async def test_missing_resource_send_computation_task( mocked_storage_service_api: respx.MockRouter, ): - # remove the workers that can handle mpi + # remove the workers that can handle gpu scheduler_info = dask_client.backend.client.scheduler_info() assert scheduler_info - # find mpi workers + # find gpu workers workers_to_remove = [ worker_key for worker_key, worker_info in scheduler_info["workers"].items() - if "MPI" in worker_info["resources"] + if "GPU" in worker_info["resources"] ] await dask_client.backend.client.retire_workers(workers=workers_to_remove) # type: ignore await asyncio.sleep(5) # a bit of time is needed so the cluster adapts - # now let's adapt the task so it needs mpi - image_params.image.node_requirements.mpi = 2 + # now let's adapt the task so it needs gpu + assert image_params.image.node_requirements + image_params.image.node_requirements.gpu = 2 with pytest.raises(MissingComputationalResourcesError): await dask_client.send_computation_tasks( @@ -829,7 +811,6 @@ async def test_too_many_resources_send_computation_task( node_requirements=NodeRequirements( CPU=10000000000000000, RAM=parse_obj_as(ByteSize, "128 MiB"), - MPI=None, GPU=None, ), ) # type: ignore @@ -950,6 +931,7 @@ def fake_remote_fct( log_file_url: AnyUrl, command: list[str], s3_settings: Optional[S3Settings], + boot_mode: BootMode = BootMode.CPU, ) -> TaskOutputData: # wait here until the client allows us to continue start_event = Event(_DASK_EVENT_NAME) @@ -1029,6 +1011,7 @@ def fake_remote_fct( log_file_url: AnyUrl, command: list[str], s3_settings: Optional[S3Settings], + boot_mode: BootMode = BootMode.CPU, ) -> TaskOutputData: state_pub = distributed.Pub(TaskStateEvent.topic_name()) @@ -1108,6 +1091,7 @@ def fake_sidecar_fct( log_file_url: AnyUrl, command: list[str], s3_settings: Optional[S3Settings], + boot_mode: BootMode, expected_annotations, ) -> TaskOutputData: # get the task data diff --git a/services/director-v2/tests/unit/test_modules_director_v0.py b/services/director-v2/tests/unit/test_modules_director_v0.py index f44fcef4b42..00b624ed943 100644 --- a/services/director-v2/tests/unit/test_modules_director_v0.py +++ b/services/director-v2/tests/unit/test_modules_director_v0.py @@ -234,7 +234,7 @@ async def test_get_service_extras( ): director_client: DirectorV0Client = minimal_app.state.director_v0_client service_extras: ServiceExtras = await director_client.get_service_extras( - mock_service_key_version + mock_service_key_version.key, mock_service_key_version.version ) assert mocked_director_service_fcts["get_service_extras"].called assert fake_service_extras == service_extras diff --git a/services/director-v2/tests/unit/with_dbs/test_api_route_computations.py b/services/director-v2/tests/unit/with_dbs/test_api_route_computations.py index dcc7900a589..44a4a9d2601 100644 --- a/services/director-v2/tests/unit/with_dbs/test_api_route_computations.py +++ b/services/director-v2/tests/unit/with_dbs/test_api_route_computations.py @@ -22,6 +22,10 @@ from models_library.projects_pipeline import PipelineDetails from models_library.projects_state import RunningState from models_library.services import ServiceDockerData +from models_library.services_resources import ( + ServiceResourcesDict, + ServiceResourcesDictHelpers, +) from models_library.utils.fastapi_encoders import jsonable_encoder from pydantic import AnyHttpUrl, parse_obj_as from pytest_mock.plugin import MockerFixture @@ -89,6 +93,15 @@ def fake_service_extras() -> ServiceExtras: return random_extras +@pytest.fixture +def fake_service_resources() -> ServiceResourcesDict: + service_resources = parse_obj_as( + ServiceResourcesDict, + ServiceResourcesDictHelpers.Config.schema_extra["examples"][0], + ) + return service_resources + + @pytest.fixture def mocked_director_service_fcts( minimal_app: FastAPI, @@ -123,6 +136,7 @@ def mocked_catalog_service_fcts( minimal_app: FastAPI, fake_service_details: ServiceDockerData, fake_service_extras: ServiceExtras, + fake_service_resources: ServiceResourcesDict, ): # pylint: disable=not-context-manager with respx.mock( @@ -132,7 +146,14 @@ def mocked_catalog_service_fcts( ) as respx_mock: respx_mock.get( re.compile( - r"services/(simcore)%2F(services)%2F(comp|dynamic|frontend)%2F.+/(.+)" + r"services/(simcore)%2F(services)%2F(comp|dynamic|frontend)%2F[^/]+/[^\.]+.[^\.]+.[^\/]+/resources" + ), + name="get_service_resources", + ).respond(json=jsonable_encoder(fake_service_resources, by_alias=True)) + + respx_mock.get( + re.compile( + r"services/(simcore)%2F(services)%2F(comp|dynamic|frontend)%2F[^/]+/[^\.]+.[^\.]+.[^\/]+" ), name="get_service", ).respond(json=fake_service_details.dict(by_alias=True)) @@ -190,8 +211,7 @@ async def test_create_computation( create_computation_url, json=jsonable_encoder( ComputationCreate( - user_id=user["id"], - project_id=proj.uuid, + user_id=user["id"], project_id=proj.uuid, product_name=product_name ) ), ) diff --git a/services/osparc-gateway-server/tests/unit/test_utils.py b/services/osparc-gateway-server/tests/unit/test_utils.py index b1b17faf443..c8f4b2e77c8 100644 --- a/services/osparc-gateway-server/tests/unit/test_utils.py +++ b/services/osparc-gateway-server/tests/unit/test_utils.py @@ -51,7 +51,10 @@ async def create_docker_service( async def _creator(labels: dict[str, str]) -> dict[str, Any]: service = await async_docker_client.services.create( task_template={ - "ContainerSpec": {"Image": "busybox", "Command": ["sleep", "10000"]} + "ContainerSpec": { + "Image": "busybox:latest", + "Command": ["sleep", "10000"], + } }, name=faker.pystr(), labels=labels, diff --git a/services/web/server/src/simcore_service_webserver/director_v2_core_computations.py b/services/web/server/src/simcore_service_webserver/director_v2_core_computations.py index 66af38f0e2c..346afc7a131 100644 --- a/services/web/server/src/simcore_service_webserver/director_v2_core_computations.py +++ b/services/web/server/src/simcore_service_webserver/director_v2_core_computations.py @@ -97,7 +97,7 @@ def set_client(app: web.Application, obj: ComputationsApi): @log_decorator(logger=log) async def create_or_update_pipeline( - app: web.Application, user_id: UserID, project_id: ProjectID + app: web.Application, user_id: UserID, project_id: ProjectID, product_name: str ) -> Optional[DataType]: settings: DirectorV2Settings = get_plugin_settings(app) @@ -105,6 +105,7 @@ async def create_or_update_pipeline( body = { "user_id": user_id, "project_id": f"{project_id}", + "product_name": product_name, } # request to director-v2 try: @@ -122,7 +123,6 @@ async def create_or_update_pipeline( async def is_pipeline_running( app: web.Application, user_id: PositiveInt, project_id: UUID ) -> Optional[bool]: - # TODO: make it cheaper by /computations/{project_id}/state. First trial shows # that the efficiency gain is minimal but should be considered specially if the handler # gets heavier with time diff --git a/services/web/server/src/simcore_service_webserver/exporter/formatters/formatter_v1.py b/services/web/server/src/simcore_service_webserver/exporter/formatters/formatter_v1.py index 5fc157048ee..85db265216e 100644 --- a/services/web/server/src/simcore_service_webserver/exporter/formatters/formatter_v1.py +++ b/services/web/server/src/simcore_service_webserver/exporter/formatters/formatter_v1.py @@ -224,7 +224,7 @@ async def add_new_project( if _project_db["uuid"] != str(project.uuid): raise ExporterException("Project uuid dose nto match after validation") - await create_or_update_pipeline(app, user_id, project.uuid) + await create_or_update_pipeline(app, user_id, project.uuid, product_name) async def _fix_node_run_hashes_based_on_old_project( diff --git a/services/web/server/src/simcore_service_webserver/projects/projects_api.py b/services/web/server/src/simcore_service_webserver/projects/projects_api.py index 8d9c30c5a0e..d73f1b8f53d 100644 --- a/services/web/server/src/simcore_service_webserver/projects/projects_api.py +++ b/services/web/server/src/simcore_service_webserver/projects/projects_api.py @@ -47,6 +47,7 @@ from .. import catalog_client, director_v2_api, storage_api from ..application_settings import get_settings +from ..products import get_product_name from ..resource_manager.websocket_manager import ( PROJECT_ID_KEY, UserSessionID, @@ -276,7 +277,7 @@ async def add_project_node( # also ensure the project is updated by director-v2 since services # are due to access comp_tasks at some point see [https://github.com/ITISFoundation/osparc-simcore/issues/3216] await director_v2_api.create_or_update_pipeline( - request.app, user_id, project["uuid"] + request.app, user_id, project["uuid"], product_name ) if _is_node_dynamic(service_key): @@ -352,7 +353,10 @@ async def delete_project_node( partial_workbench_data, user_id, f"{project_uuid}" ) # also ensure the project is updated by director-v2 since services - await director_v2_api.create_or_update_pipeline(request.app, user_id, project_uuid) + product_name = get_product_name(request) + await director_v2_api.create_or_update_pipeline( + request.app, user_id, project_uuid, product_name + ) async def update_project_linked_product( @@ -591,7 +595,6 @@ async def try_open_project_for_user( await get_user_name(app, user_id), notify_users=False, ): - with managed_resource(user_id, client_session_id, app) as rt: # NOTE: if max_number_of_studies_per_user is set, the same # project shall still be openable if the tab was closed diff --git a/services/web/server/src/simcore_service_webserver/projects/projects_handlers_crud.py b/services/web/server/src/simcore_service_webserver/projects/projects_handlers_crud.py index 0adc5f2b52d..cf831e0515a 100644 --- a/services/web/server/src/simcore_service_webserver/projects/projects_handlers_crud.py +++ b/services/web/server/src/simcore_service_webserver/projects/projects_handlers_crud.py @@ -146,6 +146,7 @@ async def create_projects(request: web.Request): request_context=req_ctx, query_params=query_params, predefined_project=predefined_project, + product_name=req_ctx.product_name, fire_and_forget=True, ) @@ -249,7 +250,8 @@ async def _create_projects( query_params: _ProjectCreateParams, request_context: RequestContext, predefined_project: Optional[ProjectDict], -): + product_name: str, +) -> None: """ :raises web.HTTPBadRequest @@ -313,7 +315,7 @@ async def _create_projects( # This is a new project and every new graph needs to be reflected in the pipeline tables await director_v2_api.create_or_update_pipeline( - app, request_context.user_id, new_project["uuid"] + app, request_context.user_id, new_project["uuid"], product_name ) # Appends state @@ -636,7 +638,10 @@ async def replace_project(request: web.Request): request.app, path_params.project_id ) await director_v2_api.create_or_update_pipeline( - request.app, req_ctx.user_id, path_params.project_id + request.app, + req_ctx.user_id, + path_params.project_id, + product_name=req_ctx.product_name, ) # Appends state new_project = await projects_api.add_project_states_for_user( diff --git a/services/web/server/src/simcore_service_webserver/studies_dispatcher/_projects.py b/services/web/server/src/simcore_service_webserver/studies_dispatcher/_projects.py index e5490666b0a..04b3776b547 100644 --- a/services/web/server/src/simcore_service_webserver/studies_dispatcher/_projects.py +++ b/services/web/server/src/simcore_service_webserver/studies_dispatcher/_projects.py @@ -179,4 +179,4 @@ async def add_new_project( # # TODO: Ensure this user has access to these services! # - await create_or_update_pipeline(app, user.id, project.uuid) + await create_or_update_pipeline(app, user.id, project.uuid, product_name) diff --git a/services/web/server/tests/unit/with_dbs/01/test_director_v2.py b/services/web/server/tests/unit/with_dbs/01/test_director_v2.py index b9d600352f4..5f271c34a2d 100644 --- a/services/web/server/tests/unit/with_dbs/01/test_director_v2.py +++ b/services/web/server/tests/unit/with_dbs/01/test_director_v2.py @@ -46,10 +46,14 @@ def cluster_id(faker: Faker) -> ClusterID: async def test_create_pipeline( - mocked_director_v2, client, user_id: UserID, project_id: ProjectID + mocked_director_v2, + client, + user_id: UserID, + project_id: ProjectID, + osparc_product_name: str, ): task_out = await director_v2_api.create_or_update_pipeline( - client.app, user_id, project_id + client.app, user_id, project_id, osparc_product_name ) assert task_out assert isinstance(task_out, dict)