From 5346f612226143bbbab39376044c41e385eec432 Mon Sep 17 00:00:00 2001 From: Andrei Neagu <5694077+GitHK@users.noreply.github.com> Date: Thu, 11 May 2023 09:24:17 +0200 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20fixing=20mypy=20issues=20i?= =?UTF-8?q?n=20`director-v2`=20part3=20(#4211)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cli/_client.py | 2 +- .../core/settings.py | 3 +- .../models/schemas/clusters.py | 5 +- .../schemas/dynamic_services/scheduler.py | 2 +- .../schemas/dynamic_services/service.py | 2 +- .../modules/dynamic_sidecar/scheduler/_abc.py | 2 +- .../scheduler/_core/_scheduler.py | 225 ++++++++---------- .../scheduler/_core/_scheduler_mixin.py | 101 -------- .../scheduler/_core/_scheduler_utils.py | 134 ++++++++++- .../modules/projects_networks.py | 3 +- services/director-v2/tests/unit/conftest.py | 4 +- .../tests/unit/test_models_clusters.py | 5 +- .../test_modules_dynamic_sidecar_scheduler.py | 2 +- .../test_api_route_dynamic_services.py | 2 +- 14 files changed, 244 insertions(+), 248 deletions(-) delete mode 100644 services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler_mixin.py diff --git a/services/director-v2/src/simcore_service_director_v2/cli/_client.py b/services/director-v2/src/simcore_service_director_v2/cli/_client.py index 2c4f0ae9a58a..50e66cdf8258 100644 --- a/services/director-v2/src/simcore_service_director_v2/cli/_client.py +++ b/services/director-v2/src/simcore_service_director_v2/cli/_client.py @@ -13,7 +13,7 @@ class ThinDV2LocalhostClient(BaseThinClient): - BASE_ADDRESS: str = "http://localhost:8000" # NOSONAR + BASE_ADDRESS: str = "http://localhost:8000" # NOSONAR def __init__(self): self.client = AsyncClient(timeout=Timeout(5)) diff --git a/services/director-v2/src/simcore_service_director_v2/core/settings.py b/services/director-v2/src/simcore_service_director_v2/core/settings.py index 1217754ccef0..ee92c2640c0f 100644 --- a/services/director-v2/src/simcore_service_director_v2/core/settings.py +++ b/services/director-v2/src/simcore_service_director_v2/core/settings.py @@ -170,7 +170,8 @@ class DynamicSidecarEgressSettings(BaseCustomSettings): description="envoy image to use", ) DYNAMIC_SIDECAR_ENVOY_LOG_LEVEL: EnvoyLogLevel = Field( - default=EnvoyLogLevel.ERROR, description="log level for envoy proxy service" + default=EnvoyLogLevel.ERROR, # type: ignore + description="log level for envoy proxy service", ) diff --git a/services/director-v2/src/simcore_service_director_v2/models/schemas/clusters.py b/services/director-v2/src/simcore_service_director_v2/models/schemas/clusters.py index e8c7fba04c20..9c3bff363c9a 100644 --- a/services/director-v2/src/simcore_service_director_v2/models/schemas/clusters.py +++ b/services/director-v2/src/simcore_service_director_v2/models/schemas/clusters.py @@ -64,8 +64,7 @@ class Worker(BaseModel): metrics: WorkerMetrics -class WorkersDict(DictModel[AnyUrl, Worker]): - ... +WorkersDict: TypeAlias = dict[AnyUrl, Worker] class Scheduler(BaseModel): @@ -74,7 +73,7 @@ class Scheduler(BaseModel): @validator("workers", pre=True, always=True) @classmethod - def ensure_workers_is_empty_dict(cls, v) -> WorkersDict: + def ensure_workers_is_empty_dict(cls, v): if v is None: return {} return v diff --git a/services/director-v2/src/simcore_service_director_v2/models/schemas/dynamic_services/scheduler.py b/services/director-v2/src/simcore_service_director_v2/models/schemas/dynamic_services/scheduler.py index 876bd0d6d120..c6f38cd05086 100644 --- a/services/director-v2/src/simcore_service_director_v2/models/schemas/dynamic_services/scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/models/schemas/dynamic_services/scheduler.py @@ -143,7 +143,7 @@ class ServiceRemovalState(BaseModel): description="when True, marks the service as ready to be removed", ) can_save: bool = Field( - ..., + False, description="when True, saves the internal state and upload outputs of the service", ) was_removed: bool = Field( diff --git a/services/director-v2/src/simcore_service_director_v2/models/schemas/dynamic_services/service.py b/services/director-v2/src/simcore_service_director_v2/models/schemas/dynamic_services/service.py index e87324e34cc2..6368b479f7b7 100644 --- a/services/director-v2/src/simcore_service_director_v2/models/schemas/dynamic_services/service.py +++ b/services/director-v2/src/simcore_service_director_v2/models/schemas/dynamic_services/service.py @@ -145,7 +145,7 @@ class RunningDynamicServiceDetails(ServiceDetails): @cached_property def legacy_service_url(self) -> str: - return f"http://{self.host}:{self.internal_port}{self.basepath}" # NOSONAR + return f"http://{self.host}:{self.internal_port}{self.basepath}" # NOSONAR @classmethod def from_scheduler_data( diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_abc.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_abc.py index d86ac9b877e9..975021f445bf 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_abc.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_abc.py @@ -79,7 +79,7 @@ async def add_service( request_dns: str, request_scheme: str, request_simcore_user_agent: str, - can_save: str, + can_save: bool, ) -> None: """ Adds a new service. diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py index b85ec6df1877..6b4a13d3c0b6 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py @@ -14,11 +14,13 @@ """ import asyncio +import contextlib import functools import logging -from asyncio import sleep -from dataclasses import dataclass +from asyncio import Lock, Queue, Task, sleep +from dataclasses import dataclass, field +from fastapi import FastAPI from models_library.basic_types import PortInt from models_library.projects import ProjectID from models_library.projects_networks import DockerNetworkAlias @@ -39,24 +41,13 @@ RetrieveDataOutEnveloped, ) from .....models.schemas.dynamic_services import ( - DynamicSidecarStatus, RunningDynamicServiceDetails, SchedulerData, - ServiceState, + ServiceName, ) from ...api_client import DynamicSidecarClient, get_dynamic_sidecar_client -from ...docker_api import ( - get_dynamic_sidecar_state, - get_dynamic_sidecars_to_observe, - remove_pending_volume_removal_services, - update_scheduler_data_label, -) -from ...docker_states import extract_containers_minimum_statuses -from ...errors import ( - DockerServiceNotFoundError, - DynamicSidecarError, - DynamicSidecarNotFoundError, -) +from ...docker_api import update_scheduler_data_label +from ...errors import DynamicSidecarError, DynamicSidecarNotFoundError from .._abc import SchedulerPublicInterface from . import _scheduler_utils from ._events_utils import ( @@ -65,7 +56,6 @@ service_save_state, ) from ._observer import observing_single_service -from ._scheduler_mixin import SchedulerInternalsMixin logger = logging.getLogger(__name__) @@ -74,7 +64,92 @@ @dataclass -class Scheduler(SchedulerInternalsMixin, SchedulerPublicInterface): +class Scheduler( # pylint: disable=too-many-instance-attributes + SchedulerPublicInterface +): + app: FastAPI + + _lock: Lock = field(default_factory=Lock) + _to_observe: dict[ServiceName, SchedulerData] = field(default_factory=dict) + _service_observation_task: dict[ServiceName, asyncio.Task | object | None] = field( + default_factory=dict + ) + _keep_running: bool = False + _inverse_search_mapping: dict[NodeID, ServiceName] = field(default_factory=dict) + _scheduler_task: Task | None = None + _cleanup_volume_removal_services_task: Task | None = None + _trigger_observation_queue_task: Task | None = None + _trigger_observation_queue: Queue = field(default_factory=Queue) + _observation_counter: int = 0 + + async def start(self) -> None: + # run as a background task + logger.info("Starting dynamic-sidecar scheduler") + self._keep_running = True + self._scheduler_task = asyncio.create_task( + self._run_scheduler_task(), name="dynamic-scheduler" + ) + self._trigger_observation_queue_task = asyncio.create_task( + self._run_trigger_observation_queue_task(), + name="dynamic-scheduler-trigger-obs-queue", + ) + + self._cleanup_volume_removal_services_task = asyncio.create_task( + _scheduler_utils.cleanup_volume_removal_services(self.app), + name="dynamic-scheduler-cleanup-volume-removal-services", + ) + await _scheduler_utils.discover_running_services(self) + + async def shutdown(self) -> None: + logger.info("Shutting down dynamic-sidecar scheduler") + self._keep_running = False + self._inverse_search_mapping = {} + self._to_observe = {} + + if self._cleanup_volume_removal_services_task is not None: + self._cleanup_volume_removal_services_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._cleanup_volume_removal_services_task + self._cleanup_volume_removal_services_task = None + + if self._scheduler_task is not None: + self._scheduler_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._scheduler_task + self._scheduler_task = None + + if self._trigger_observation_queue_task is not None: + await self._trigger_observation_queue.put(None) + + self._trigger_observation_queue_task.cancel() + with contextlib.suppress(asyncio.CancelledError): + await self._trigger_observation_queue_task + self._trigger_observation_queue_task = None + self._trigger_observation_queue = Queue() + + # let's properly cleanup remaining observation tasks + running_tasks = [ + x for x in self._service_observation_task.values() if isinstance(x, Task) + ] + for task in running_tasks: + task.cancel() + try: + MAX_WAIT_TIME_SECONDS = 5 + results = await asyncio.wait_for( + asyncio.gather(*running_tasks, return_exceptions=True), + timeout=MAX_WAIT_TIME_SECONDS, + ) + if bad_results := list(filter(lambda r: isinstance(r, Exception), results)): + logger.error( + "Following observation tasks completed with an unexpected error:%s", + f"{bad_results}", + ) + except asyncio.TimeoutError: + logger.error( + "Timed-out waiting for %s to complete. Action: Check why this is blocking", + f"{running_tasks=}", + ) + def toggle_observation(self, node_uuid: NodeID, disable: bool) -> bool: """ returns True if it managed to enable/disable observation of the service @@ -320,7 +395,6 @@ async def remove_service_from_observation(self, node_uuid: NodeID) -> None: logger.debug("Removed service '%s' from scheduler", service_name) async def get_stack_status(self, node_uuid: NodeID) -> RunningDynamicServiceDetails: - # pylint: disable=too-many-return-statements """ raises DynamicSidecarNotFoundError @@ -330,73 +404,8 @@ async def get_stack_status(self, node_uuid: NodeID) -> RunningDynamicServiceDeta service_name = self._inverse_search_mapping[node_uuid] scheduler_data: SchedulerData = self._to_observe[service_name] - - # check if there was an error picked up by the scheduler - # and marked this service as failed - if scheduler_data.dynamic_sidecar.status.current != DynamicSidecarStatus.OK: - return RunningDynamicServiceDetails.from_scheduler_data( - node_uuid=node_uuid, - scheduler_data=scheduler_data, - service_state=ServiceState.FAILED, - service_message=scheduler_data.dynamic_sidecar.status.info, - ) - - # is the service stopping? - if scheduler_data.dynamic_sidecar.service_removal_state.can_remove: - return RunningDynamicServiceDetails.from_scheduler_data( - node_uuid=node_uuid, - scheduler_data=scheduler_data, - service_state=ServiceState.STOPPING, - service_message=scheduler_data.dynamic_sidecar.status.info, - ) - - # the service should be either running or starting - try: - sidecar_state, sidecar_message = await get_dynamic_sidecar_state( - # the service_name is unique and will not collide with other names - # it can be used in place of the service_id here, as the docker API accepts both - service_id=scheduler_data.service_name - ) - except DockerServiceNotFoundError: - # in this case, the service is starting, so state is pending - return RunningDynamicServiceDetails.from_scheduler_data( - node_uuid=node_uuid, - scheduler_data=scheduler_data, - service_state=ServiceState.PENDING, - service_message=scheduler_data.dynamic_sidecar.status.info, - ) - - # while the dynamic-sidecar state is not RUNNING report it's state - if sidecar_state != ServiceState.RUNNING: - return RunningDynamicServiceDetails.from_scheduler_data( - node_uuid=node_uuid, - scheduler_data=scheduler_data, - service_state=sidecar_state, - service_message=sidecar_message, - ) - - # NOTE: This will be repeatedly called until the - # user services are effectively started - - # wait for containers to start - if len(scheduler_data.dynamic_sidecar.containers_inspect) == 0: - # marks status as waiting for containers - return RunningDynamicServiceDetails.from_scheduler_data( - node_uuid=node_uuid, - scheduler_data=scheduler_data, - service_state=ServiceState.STARTING, - service_message="", - ) - - # compute composed containers states - container_state, container_message = extract_containers_minimum_statuses( - scheduler_data.dynamic_sidecar.containers_inspect - ) - return RunningDynamicServiceDetails.from_scheduler_data( - node_uuid=node_uuid, - scheduler_data=scheduler_data, - service_state=container_state, - service_message=container_message, + return await _scheduler_utils.get_stack_status_from_scheduler_data( + scheduler_data ) async def retrieve_service_inputs( @@ -562,47 +571,3 @@ async def _run_scheduler_task(self) -> None: await sleep(settings.DIRECTOR_V2_DYNAMIC_SCHEDULER_INTERVAL_SECONDS) self._observation_counter += 1 - - async def _discover_running_services(self) -> None: - """discover all services which were started before and add them to the scheduler""" - dynamic_sidecar_settings: DynamicSidecarSettings = ( - self.app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SIDECAR - ) - services_to_observe: list[ - SchedulerData - ] = await get_dynamic_sidecars_to_observe(dynamic_sidecar_settings) - - logger.info( - "The following services need to be observed: %s", services_to_observe - ) - - for scheduler_data in services_to_observe: - await self._add_service(scheduler_data) - - async def _cleanup_volume_removal_services(self) -> None: - settings: DynamicServicesSchedulerSettings = ( - self.app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SCHEDULER - ) - dynamic_sidecar_settings: DynamicSidecarSettings = ( - self.app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SIDECAR - ) - - logger.debug( - "dynamic-sidecars cleanup pending volume removal services every %s seconds", - settings.DIRECTOR_V2_DYNAMIC_SCHEDULER_PENDING_VOLUME_REMOVAL_INTERVAL_S, - ) - while await asyncio.sleep( - settings.DIRECTOR_V2_DYNAMIC_SCHEDULER_PENDING_VOLUME_REMOVAL_INTERVAL_S, - True, - ): - logger.debug("Removing pending volume removal services...") - - try: - await remove_pending_volume_removal_services(dynamic_sidecar_settings) - except asyncio.CancelledError: - logger.info("Stopped pending volume removal services task") - raise - except Exception: # pylint: disable=broad-except - logger.exception( - "Unexpected error while cleaning up pending volume removal services" - ) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler_mixin.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler_mixin.py deleted file mode 100644 index f3bc745d7301..000000000000 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler_mixin.py +++ /dev/null @@ -1,101 +0,0 @@ -import asyncio -import contextlib -import logging -from asyncio import Lock, Queue, Task -from dataclasses import dataclass, field - -from fastapi import FastAPI -from models_library.projects_nodes_io import NodeID - -from .....models.schemas.dynamic_services import SchedulerData, ServiceName -from .._abc import SchedulerInternalsInterface - -logger = logging.getLogger(__name__) - - -@dataclass -class SchedulerInternalsMixin( # pylint: disable=too-many-instance-attributes - SchedulerInternalsInterface -): - app: FastAPI - - _lock: Lock = field(default_factory=Lock) - _to_observe: dict[ServiceName, SchedulerData] = field(default_factory=dict) - _service_observation_task: dict[ServiceName, asyncio.Task | object | None] = field( - default_factory=dict - ) - _keep_running: bool = False - _inverse_search_mapping: dict[NodeID, ServiceName] = field(default_factory=dict) - _scheduler_task: Task | None = None - _cleanup_volume_removal_services_task: Task | None = None - _trigger_observation_queue_task: Task | None = None - _trigger_observation_queue: Queue = field(default_factory=Queue) - _observation_counter: int = 0 - - async def start(self) -> None: - # run as a background task - logger.info("Starting dynamic-sidecar scheduler") - self._keep_running = True - self._scheduler_task = asyncio.create_task( - self._run_scheduler_task(), name="dynamic-scheduler" - ) - self._trigger_observation_queue_task = asyncio.create_task( - self._run_trigger_observation_queue_task(), - name="dynamic-scheduler-trigger-obs-queue", - ) - - self._cleanup_volume_removal_services_task = asyncio.create_task( - self._cleanup_volume_removal_services(), - name="dynamic-scheduler-cleanup-volume-removal-services", - ) - await self._discover_running_services() - - async def shutdown(self) -> None: - logger.info("Shutting down dynamic-sidecar scheduler") - self._keep_running = False - self._inverse_search_mapping = {} - self._to_observe = {} - - if self._cleanup_volume_removal_services_task is not None: - self._cleanup_volume_removal_services_task.cancel() - with contextlib.suppress(asyncio.CancelledError): - await self._cleanup_volume_removal_services_task - self._cleanup_volume_removal_services_task = None - - if self._scheduler_task is not None: - self._scheduler_task.cancel() - with contextlib.suppress(asyncio.CancelledError): - await self._scheduler_task - self._scheduler_task = None - - if self._trigger_observation_queue_task is not None: - await self._trigger_observation_queue.put(None) - - self._trigger_observation_queue_task.cancel() - with contextlib.suppress(asyncio.CancelledError): - await self._trigger_observation_queue_task - self._trigger_observation_queue_task = None - self._trigger_observation_queue = Queue() - - # let's properly cleanup remaining observation tasks - running_tasks = [ - x for x in self._service_observation_task.values() if isinstance(x, Task) - ] - for task in running_tasks: - task.cancel() - try: - MAX_WAIT_TIME_SECONDS = 5 - results = await asyncio.wait_for( - asyncio.gather(*running_tasks, return_exceptions=True), - timeout=MAX_WAIT_TIME_SECONDS, - ) - if bad_results := list(filter(lambda r: isinstance(r, Exception), results)): - logger.error( - "Following observation tasks completed with an unexpected error:%s", - f"{bad_results}", - ) - except asyncio.TimeoutError: - logger.error( - "Timed-out waiting for %s to complete. Action: Check why this is blocking", - f"{running_tasks=}", - ) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler_utils.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler_utils.py index d533cf449d6a..14068d1decdb 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler_utils.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler_utils.py @@ -1,3 +1,4 @@ +import asyncio import logging from typing import Final @@ -5,8 +6,21 @@ from models_library.projects_nodes_io import NodeID from servicelib.fastapi.long_running_tasks.client import ProgressCallback -from .....models.schemas.dynamic_services import DynamicSidecarStatus, SchedulerData +from .....core.settings import DynamicServicesSchedulerSettings, DynamicSidecarSettings +from .....models.schemas.dynamic_services import ( + DynamicSidecarStatus, + RunningDynamicServiceDetails, + SchedulerData, + ServiceState, +) from ...api_client import DynamicSidecarClient, get_dynamic_sidecar_client +from ...docker_api import ( + get_dynamic_sidecar_state, + get_dynamic_sidecars_to_observe, + remove_pending_volume_removal_services, +) +from ...docker_states import extract_containers_minimum_statuses +from ...errors import DockerServiceNotFoundError from ._events_utils import service_push_outputs logger = logging.getLogger(__name__) @@ -45,3 +59,121 @@ async def service_awaits_manual_interventions(scheduler_data: SchedulerData) -> scheduler_data.dynamic_sidecar.wait_for_manual_intervention_logged = True logger.warning(" %s %s", LOG_MSG_MANUAL_INTERVENTION, scheduler_data.node_uuid) return service_awaits_intervention + + +async def cleanup_volume_removal_services(app: FastAPI) -> None: + settings: DynamicServicesSchedulerSettings = ( + app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SCHEDULER + ) + dynamic_sidecar_settings: DynamicSidecarSettings = ( + app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SIDECAR + ) + + logger.debug( + "dynamic-sidecars cleanup pending volume removal services every %s seconds", + settings.DIRECTOR_V2_DYNAMIC_SCHEDULER_PENDING_VOLUME_REMOVAL_INTERVAL_S, + ) + while await asyncio.sleep( + settings.DIRECTOR_V2_DYNAMIC_SCHEDULER_PENDING_VOLUME_REMOVAL_INTERVAL_S, + True, + ): + logger.debug("Removing pending volume removal services...") + + try: + await remove_pending_volume_removal_services(dynamic_sidecar_settings) + except asyncio.CancelledError: + logger.info("Stopped pending volume removal services task") + raise + except Exception: # pylint: disable=broad-except + logger.exception( + "Unexpected error while cleaning up pending volume removal services" + ) + + +async def discover_running_services(schduler: "Scheduler") -> None: # type: ignore + """discover all services which were started before and add them to the scheduler""" + dynamic_sidecar_settings: DynamicSidecarSettings = ( + schduler.app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SIDECAR + ) + services_to_observe: list[SchedulerData] = await get_dynamic_sidecars_to_observe( + dynamic_sidecar_settings + ) + + logger.info("The following services need to be observed: %s", services_to_observe) + + for scheduler_data in services_to_observe: + await schduler._add_service(scheduler_data) # pylint: disable=protected-access + + +async def get_stack_status_from_scheduler_data( + scheduler_data: SchedulerData, +) -> RunningDynamicServiceDetails: + # pylint: disable=too-many-return-statements + + # check if there was an error picked up by the scheduler + # and marked this service as failed + if scheduler_data.dynamic_sidecar.status.current != DynamicSidecarStatus.OK: + return RunningDynamicServiceDetails.from_scheduler_data( + node_uuid=scheduler_data.node_uuid, + scheduler_data=scheduler_data, + service_state=ServiceState.FAILED, + service_message=scheduler_data.dynamic_sidecar.status.info, + ) + + # is the service stopping? + if scheduler_data.dynamic_sidecar.service_removal_state.can_remove: + return RunningDynamicServiceDetails.from_scheduler_data( + node_uuid=scheduler_data.node_uuid, + scheduler_data=scheduler_data, + service_state=ServiceState.STOPPING, + service_message=scheduler_data.dynamic_sidecar.status.info, + ) + + # the service should be either running or starting + try: + sidecar_state, sidecar_message = await get_dynamic_sidecar_state( + # the service_name is unique and will not collide with other names + # it can be used in place of the service_id here, as the docker API accepts both + service_id=scheduler_data.service_name + ) + except DockerServiceNotFoundError: + # in this case, the service is starting, so state is pending + return RunningDynamicServiceDetails.from_scheduler_data( + node_uuid=scheduler_data.node_uuid, + scheduler_data=scheduler_data, + service_state=ServiceState.PENDING, + service_message=scheduler_data.dynamic_sidecar.status.info, + ) + + # while the dynamic-sidecar state is not RUNNING report it's state + if sidecar_state != ServiceState.RUNNING: + return RunningDynamicServiceDetails.from_scheduler_data( + node_uuid=scheduler_data.node_uuid, + scheduler_data=scheduler_data, + service_state=sidecar_state, + service_message=sidecar_message, + ) + + # NOTE: This will be repeatedly called until the + # user services are effectively started + + # wait for containers to start + if len(scheduler_data.dynamic_sidecar.containers_inspect) == 0: + # marks status as waiting for containers + return RunningDynamicServiceDetails.from_scheduler_data( + node_uuid=scheduler_data.node_uuid, + scheduler_data=scheduler_data, + service_state=ServiceState.STARTING, + service_message="", + ) + + # compute composed containers states + container_state, container_message = extract_containers_minimum_statuses( + scheduler_data.dynamic_sidecar.containers_inspect + ) + return RunningDynamicServiceDetails.from_scheduler_data( + node_uuid=scheduler_data.node_uuid, + scheduler_data=scheduler_data, + service_state=container_state, + service_message=container_message, + ) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/projects_networks.py b/services/director-v2/src/simcore_service_director_v2/modules/projects_networks.py index fd9d89114a87..df63fb9af606 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/projects_networks.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/projects_networks.py @@ -69,7 +69,8 @@ async def requires_dynamic_sidecar( ) ) ) - return simcore_service_labels.needs_dynamic_sidecar + requires_dynamic_sidecar_: bool = simcore_service_labels.needs_dynamic_sidecar + return requires_dynamic_sidecar_ async def _send_network_configuration_to_dynamic_sidecar( diff --git a/services/director-v2/tests/unit/conftest.py b/services/director-v2/tests/unit/conftest.py index 7e1d14ce0a03..6282927cf10d 100644 --- a/services/director-v2/tests/unit/conftest.py +++ b/services/director-v2/tests/unit/conftest.py @@ -429,7 +429,7 @@ def caplog_debug_level(caplog: LogCaptureFixture) -> Iterable[LogCaptureFixture] def mock_docker_api(mocker: MockerFixture) -> None: module_base = "simcore_service_director_v2.modules.dynamic_sidecar.scheduler" mocker.patch( - f"{module_base}._core._scheduler.get_dynamic_sidecars_to_observe", + f"{module_base}._core._scheduler_utils.get_dynamic_sidecars_to_observe", autospec=True, return_value=[], ) @@ -439,7 +439,7 @@ def mock_docker_api(mocker: MockerFixture) -> None: return_value=True, ) mocker.patch( - f"{module_base}._core._scheduler.get_dynamic_sidecar_state", + f"{module_base}._core._scheduler_utils.get_dynamic_sidecar_state", return_value=(ServiceState.PENDING, ""), ) diff --git a/services/director-v2/tests/unit/test_models_clusters.py b/services/director-v2/tests/unit/test_models_clusters.py index d7bb73b19f68..76f8801610eb 100644 --- a/services/director-v2/tests/unit/test_models_clusters.py +++ b/services/director-v2/tests/unit/test_models_clusters.py @@ -12,7 +12,6 @@ UsedResources, Worker, WorkerMetrics, - WorkersDict, ) @@ -48,13 +47,13 @@ def test_cluster_creation_brings_default_thumbail( def test_scheduler_constructor_with_default_has_correct_dict(faker: Faker): scheduler = Scheduler(status=faker.text()) - assert isinstance(scheduler.workers, WorkersDict) + assert scheduler.workers is not None assert len(scheduler.workers) == 0 def test_scheduler_constructor_with_no_workers_has_correct_dict(faker: Faker): scheduler = Scheduler(status=faker.text(), workers=None) - assert isinstance(scheduler.workers, WorkersDict) + assert scheduler.workers is not None assert len(scheduler.workers) == 0 diff --git a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_scheduler.py b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_scheduler.py index a79e99336e5c..c8c11ed9fa17 100644 --- a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_scheduler.py +++ b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_scheduler.py @@ -209,7 +209,7 @@ def mocked_api_client(scheduler_data: SchedulerData) -> Iterator[MockRouter]: @pytest.fixture def mock_service_running(mock_docker_api, mocker: MockerFixture) -> Iterator[AsyncMock]: mock = mocker.patch( - "simcore_service_director_v2.modules.dynamic_sidecar.scheduler._core._scheduler.get_dynamic_sidecar_state", + "simcore_service_director_v2.modules.dynamic_sidecar.scheduler._core._scheduler_utils.get_dynamic_sidecar_state", return_value=(ServiceState.RUNNING, ""), ) diff --git a/services/director-v2/tests/unit/with_dbs/test_api_route_dynamic_services.py b/services/director-v2/tests/unit/with_dbs/test_api_route_dynamic_services.py index ce1a8a383f8e..db4047d3fce0 100644 --- a/services/director-v2/tests/unit/with_dbs/test_api_route_dynamic_services.py +++ b/services/director-v2/tests/unit/with_dbs/test_api_route_dynamic_services.py @@ -246,7 +246,7 @@ def remove_service(node_uuid: NodeID, *ars: Any, **kwargs: Any) -> None: ) mocker.patch( - f"{module_base}._core._scheduler.Scheduler._discover_running_services", + f"{module_base}._core._scheduler_utils.discover_running_services", autospec=True, return_value=None, )