diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_base.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_base.py index 102c4c6cc2d..c13ef6b7713 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_base.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_base.py @@ -21,13 +21,15 @@ def _log_pool_status(client: AsyncClient, event_name: str) -> None: # pylint: disable=protected-access logger.warning( - "Pool status @ '%s': requests=%s, connections=%s", + "Pool status @ '%s': requests(%s)=%s, connections(%s)=%s", event_name.upper(), + len(client._transport._pool._requests), [ - (r.request.method, r.request.url, r.request.headers) + (id(r), r.request.method, r.request.url, r.request.headers) for r in client._transport._pool._requests ], - client._transport._pool.connections, + len(client._transport._pool.connections), + [(id(c), c.__dict__) for c in client._transport._pool.connections], ) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_abc.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_abc.py index 596a8f96a50..7ca0d80c323 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_abc.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_abc.py @@ -98,7 +98,10 @@ def list_services( @abstractmethod async def mark_service_for_removal( - self, node_uuid: NodeID, can_save: Optional[bool] + self, + node_uuid: NodeID, + can_save: Optional[bool], + skip_observation_recreation: bool = False, ) -> None: """The service will be removed as soon as possible""" diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_observer.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_observer.py index 14b957dded5..ffff436862c 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_observer.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_observer.py @@ -59,6 +59,7 @@ async def _apply_observation_cycle( await scheduler.mark_service_for_removal( node_uuid=scheduler_data.node_uuid, can_save=scheduler_data.dynamic_sidecar.were_containers_created, + skip_observation_recreation=True, ) for dynamic_scheduler_event in REGISTERED_EVENTS: diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py index 2b16ef629b2..1043899b15e 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py @@ -234,7 +234,10 @@ def _is_scheduled(node_id: NodeID) -> bool: ) async def mark_service_for_removal( - self, node_uuid: NodeID, can_save: Optional[bool] + self, + node_uuid: NodeID, + can_save: Optional[bool], + skip_observation_recreation: bool = False, ) -> None: """Marks service for removal, causing RemoveMarkedService to trigger""" async with self._lock: @@ -268,6 +271,9 @@ async def _await_task(task: asyncio.Task) -> None: except asyncio.TimeoutError: pass + if skip_observation_recreation: + return + # recreate new observation dynamic_sidecar_settings: DynamicSidecarSettings = ( self.app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SIDECAR @@ -275,7 +281,9 @@ async def _await_task(task: asyncio.Task) -> None: dynamic_scheduler: DynamicServicesSchedulerSettings = ( self.app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SCHEDULER ) - self._service_observation_task[service_name] = self.__get_observation_task( + self._service_observation_task[ + service_name + ] = self.__create_observation_task( dynamic_sidecar_settings, dynamic_scheduler, service_name ) @@ -461,7 +469,7 @@ async def restart_containers(self, node_uuid: NodeID) -> None: def _enqueue_observation_from_service_name(self, service_name: str) -> None: self._trigger_observation_queue.put_nowait(service_name) - def __get_observation_task( + def __create_observation_task( self, dynamic_sidecar_settings: DynamicSidecarSettings, dynamic_scheduler: DynamicServicesSchedulerSettings, @@ -509,7 +517,7 @@ async def _run_trigger_observation_queue_task(self) -> None: if self._service_observation_task.get(service_name) is None: self._service_observation_task[ service_name - ] = self.__get_observation_task( + ] = self.__create_observation_task( dynamic_sidecar_settings, dynamic_scheduler, service_name ) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_task.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_task.py index 6948fafca5b..8ba2858d816 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_task.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_task.py @@ -87,9 +87,14 @@ def list_services( return self._scheduler.list_services(user_id=user_id, project_id=project_id) async def mark_service_for_removal( - self, node_uuid: NodeID, can_save: Optional[bool] + self, + node_uuid: NodeID, + can_save: Optional[bool], + skip_observation_recreation: bool = False, ) -> None: - return await self._scheduler.mark_service_for_removal(node_uuid, can_save) + return await self._scheduler.mark_service_for_removal( + node_uuid, can_save, skip_observation_recreation + ) async def get_stack_status(self, node_uuid: NodeID) -> RunningDynamicServiceDetails: return await self._scheduler.get_stack_status(node_uuid) diff --git a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_base.py b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_base.py index 62f549f6d2a..a6e61a9b5f5 100644 --- a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_base.py +++ b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_base.py @@ -121,7 +121,7 @@ async def raises_request_error(self) -> Response: connections_message = caplog_info_level.messages[-1] assert ( connections_message - == "Pool status @ 'POOL TIMEOUT': requests=[], connections=[]" + == "Pool status @ 'POOL TIMEOUT': requests(0)=[], connections(0)=[]" ) else: _assert_messages(caplog_info_level.messages) diff --git a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_observer.py b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_observer.py new file mode 100644 index 00000000000..35b7576b773 --- /dev/null +++ b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_observer.py @@ -0,0 +1,166 @@ +# pylint:disable=protected-access +# pylint:disable=redefined-outer-name +# pylint:disable=unused-argument + +from typing import Optional +from unittest.mock import AsyncMock + +import pytest +from fastapi import FastAPI +from pytest import MonkeyPatch +from pytest_mock.plugin import MockerFixture +from pytest_simcore.helpers.utils_envs import EnvVarsDict, setenvs_from_dict +from simcore_service_director_v2.core.settings import AppSettings +from simcore_service_director_v2.models.schemas.dynamic_services import SchedulerData +from simcore_service_director_v2.modules.dynamic_sidecar.api_client import ( + setup, + shutdown, +) +from simcore_service_director_v2.modules.dynamic_sidecar.scheduler import ( + DynamicSidecarsScheduler, + setup_scheduler, + shutdown_scheduler, +) +from simcore_service_director_v2.modules.dynamic_sidecar.scheduler._core._observer import ( + _apply_observation_cycle, +) + + +@pytest.fixture +def disable_observation(mocker: MockerFixture) -> None: + mocker.patch( + "simcore_service_director_v2.modules.dynamic_sidecar.scheduler._task.DynamicSidecarsScheduler.start", + autospec=True, + ) + + +@pytest.fixture +def mock_are_sidecar_and_proxy_services_present(mocker: MockerFixture) -> None: + mocker.patch( + "simcore_service_director_v2.modules.dynamic_sidecar.scheduler._core._observer.are_sidecar_and_proxy_services_present", + autospec=True, + return_value=False, + ) + + +@pytest.fixture +def mock_events(mocker: MockerFixture) -> None: + for event_to_mock in ( + "CreateSidecars", + "WaitForSidecarAPI", + "UpdateHealth", + "GetStatus", + "PrepareServicesEnvironment", + "CreateUserServices", + "AttachProjectsNetworks", + "RemoveUserCreatedServices", + ): + mocker.patch( + f"simcore_service_director_v2.modules.dynamic_sidecar.scheduler._core._events.{event_to_mock}.action", + autospec=True, + return_value=True, + ) + + +@pytest.fixture +def mock_env( + docker_swarm: None, + mock_env: EnvVarsDict, + monkeypatch: MonkeyPatch, +) -> None: + setenvs_from_dict( + monkeypatch, + { + "SIMCORE_SERVICES_NETWORK_NAME": "test_network", + "DIRECTOR_HOST": "mocked_out", + "DIRECTOR_V2_DYNAMIC_SCHEDULER_ENABLED": "true", + "S3_ENDPOINT": "endpoint", + "S3_ACCESS_KEY": "access_key", + "S3_SECRET_KEY": "secret_key", + "S3_BUCKET_NAME": "bucket_name", + "S3_SECURE": "false", + "DIRECTOR_V2_POSTGRES_ENABLED": "false", + "POSTGRES_HOST": "test", + "POSTGRES_USER": "test", + "POSTGRES_PASSWORD": "test", + "POSTGRES_DB": "test", + }, + ) + + +@pytest.fixture +def mocked_app(mock_env: None) -> FastAPI: + app = FastAPI() + app.state.settings = AppSettings() + app.state.rabbitmq_client = AsyncMock() + return app + + +@pytest.fixture +async def dynamic_sidecar_scheduler(mocked_app: FastAPI) -> DynamicSidecarsScheduler: + await setup_scheduler(mocked_app) + await setup(mocked_app) + + yield mocked_app.state.dynamic_sidecar_scheduler + + await shutdown_scheduler(mocked_app) + await shutdown(mocked_app) + + +def _is_observation_task_present( + dynamic_sidecar_scheduler, + scheduler_data_from_http_request, +) -> bool: + return ( + scheduler_data_from_http_request.service_name + in dynamic_sidecar_scheduler._scheduler._service_observation_task + ) + + +@pytest.mark.parametrize("can_save", [None, False, False]) +async def test_regression_break_endless_loop_cancellation_edge_case( + disable_observation: None, + mock_are_sidecar_and_proxy_services_present: None, + mock_events: None, + dynamic_sidecar_scheduler: DynamicSidecarsScheduler, + scheduler_data_from_http_request: SchedulerData, + can_save: Optional[bool], +): + # in this situation the scheduler would never end loops forever + await dynamic_sidecar_scheduler._scheduler._add_service( + scheduler_data_from_http_request + ) + + # simulate edge case + scheduler_data_from_http_request.dynamic_sidecar.were_containers_created = True + + assert ( + _is_observation_task_present( + dynamic_sidecar_scheduler, scheduler_data_from_http_request + ) + is False + ) + + # NOTE: this will create the observation task as well! + # Simulates user action like going back to the dashboard. + await dynamic_sidecar_scheduler.mark_service_for_removal( + scheduler_data_from_http_request.node_uuid, can_save=can_save + ) + + assert ( + _is_observation_task_present( + dynamic_sidecar_scheduler, scheduler_data_from_http_request + ) + is True + ) + + await _apply_observation_cycle( + dynamic_sidecar_scheduler, scheduler_data_from_http_request + ) + + assert ( + _is_observation_task_present( + dynamic_sidecar_scheduler, scheduler_data_from_http_request + ) + is False + )