From e052991090b5806d3fe472f75ad79370fc5c4a6d Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Tue, 14 Feb 2023 11:10:38 +0100 Subject: [PATCH 1/6] add more information about the leaked connections --- .../modules/dynamic_sidecar/api_client/_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_base.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_base.py index 102c4c6cc2d..c2052da7d97 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_base.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_base.py @@ -27,7 +27,7 @@ def _log_pool_status(client: AsyncClient, event_name: str) -> None: (r.request.method, r.request.url, r.request.headers) for r in client._transport._pool._requests ], - client._transport._pool.connections, + [(id(c), c.__dict__) for c in client._transport._pool.connections], ) From 66cdbc3d17ede8bdbf6a0217161e981ffb94c6cd Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Tue, 14 Feb 2023 11:14:40 +0100 Subject: [PATCH 2/6] adding more information to log --- .../modules/dynamic_sidecar/api_client/_base.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_base.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_base.py index c2052da7d97..ee65eb434fa 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_base.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_base.py @@ -21,12 +21,14 @@ def _log_pool_status(client: AsyncClient, event_name: str) -> None: # pylint: disable=protected-access logger.warning( - "Pool status @ '%s': requests=%s, connections=%s", + "Pool status @ '%s': requests(%s)=%s, connections(%s)=%s", event_name.upper(), + len(client._transport._pool._requests), [ (r.request.method, r.request.url, r.request.headers) for r in client._transport._pool._requests ], + len(client._transport._pool.connections), [(id(c), c.__dict__) for c in client._transport._pool.connections], ) From c342fc1bf339407be12dff8c4a0d14bbc2e1830e Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Tue, 14 Feb 2023 11:18:48 +0100 Subject: [PATCH 3/6] keep track of hanging requests --- .../modules/dynamic_sidecar/api_client/_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_base.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_base.py index ee65eb434fa..c13ef6b7713 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_base.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/api_client/_base.py @@ -25,7 +25,7 @@ def _log_pool_status(client: AsyncClient, event_name: str) -> None: event_name.upper(), len(client._transport._pool._requests), [ - (r.request.method, r.request.url, r.request.headers) + (id(r), r.request.method, r.request.url, r.request.headers) for r in client._transport._pool._requests ], len(client._transport._pool.connections), From c50b499c5def0e6d9dda4fb0f6842222fab50949 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Tue, 14 Feb 2023 16:18:02 +0100 Subject: [PATCH 4/6] breaking infinite loops --- .../modules/dynamic_sidecar/scheduler/_abc.py | 5 +- .../scheduler/_core/_observer.py | 1 + .../scheduler/_core/_scheduler.py | 16 +- .../dynamic_sidecar/scheduler/_task.py | 9 +- .../test_modules_dynamic_sidecar_observer.py | 161 ++++++++++++++++++ 5 files changed, 185 insertions(+), 7 deletions(-) create mode 100644 services/director-v2/tests/unit/test_modules_dynamic_sidecar_observer.py diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_abc.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_abc.py index 596a8f96a50..7ca0d80c323 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_abc.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_abc.py @@ -98,7 +98,10 @@ def list_services( @abstractmethod async def mark_service_for_removal( - self, node_uuid: NodeID, can_save: Optional[bool] + self, + node_uuid: NodeID, + can_save: Optional[bool], + skip_observation_recreation: bool = False, ) -> None: """The service will be removed as soon as possible""" diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_observer.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_observer.py index 14b957dded5..ffff436862c 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_observer.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_observer.py @@ -59,6 +59,7 @@ async def _apply_observation_cycle( await scheduler.mark_service_for_removal( node_uuid=scheduler_data.node_uuid, can_save=scheduler_data.dynamic_sidecar.were_containers_created, + skip_observation_recreation=True, ) for dynamic_scheduler_event in REGISTERED_EVENTS: diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py index 2b16ef629b2..1043899b15e 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_core/_scheduler.py @@ -234,7 +234,10 @@ def _is_scheduled(node_id: NodeID) -> bool: ) async def mark_service_for_removal( - self, node_uuid: NodeID, can_save: Optional[bool] + self, + node_uuid: NodeID, + can_save: Optional[bool], + skip_observation_recreation: bool = False, ) -> None: """Marks service for removal, causing RemoveMarkedService to trigger""" async with self._lock: @@ -268,6 +271,9 @@ async def _await_task(task: asyncio.Task) -> None: except asyncio.TimeoutError: pass + if skip_observation_recreation: + return + # recreate new observation dynamic_sidecar_settings: DynamicSidecarSettings = ( self.app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SIDECAR @@ -275,7 +281,9 @@ async def _await_task(task: asyncio.Task) -> None: dynamic_scheduler: DynamicServicesSchedulerSettings = ( self.app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SCHEDULER ) - self._service_observation_task[service_name] = self.__get_observation_task( + self._service_observation_task[ + service_name + ] = self.__create_observation_task( dynamic_sidecar_settings, dynamic_scheduler, service_name ) @@ -461,7 +469,7 @@ async def restart_containers(self, node_uuid: NodeID) -> None: def _enqueue_observation_from_service_name(self, service_name: str) -> None: self._trigger_observation_queue.put_nowait(service_name) - def __get_observation_task( + def __create_observation_task( self, dynamic_sidecar_settings: DynamicSidecarSettings, dynamic_scheduler: DynamicServicesSchedulerSettings, @@ -509,7 +517,7 @@ async def _run_trigger_observation_queue_task(self) -> None: if self._service_observation_task.get(service_name) is None: self._service_observation_task[ service_name - ] = self.__get_observation_task( + ] = self.__create_observation_task( dynamic_sidecar_settings, dynamic_scheduler, service_name ) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_task.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_task.py index 6948fafca5b..8ba2858d816 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_task.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/scheduler/_task.py @@ -87,9 +87,14 @@ def list_services( return self._scheduler.list_services(user_id=user_id, project_id=project_id) async def mark_service_for_removal( - self, node_uuid: NodeID, can_save: Optional[bool] + self, + node_uuid: NodeID, + can_save: Optional[bool], + skip_observation_recreation: bool = False, ) -> None: - return await self._scheduler.mark_service_for_removal(node_uuid, can_save) + return await self._scheduler.mark_service_for_removal( + node_uuid, can_save, skip_observation_recreation + ) async def get_stack_status(self, node_uuid: NodeID) -> RunningDynamicServiceDetails: return await self._scheduler.get_stack_status(node_uuid) diff --git a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_observer.py b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_observer.py new file mode 100644 index 00000000000..aabd4f8dc40 --- /dev/null +++ b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_observer.py @@ -0,0 +1,161 @@ +# pylint:disable=protected-access +# pylint:disable=redefined-outer-name +# pylint:disable=unused-argument + +from typing import Optional +from unittest.mock import AsyncMock + +import pytest +from fastapi import FastAPI +from pytest import MonkeyPatch +from pytest_mock.plugin import MockerFixture +from pytest_simcore.helpers.typing_env import EnvVarsDict +from simcore_service_director_v2.core.settings import AppSettings +from simcore_service_director_v2.models.schemas.dynamic_services import SchedulerData +from simcore_service_director_v2.modules.dynamic_sidecar.api_client import ( + setup, + shutdown, +) +from simcore_service_director_v2.modules.dynamic_sidecar.scheduler import ( + DynamicSidecarsScheduler, + setup_scheduler, + shutdown_scheduler, +) +from simcore_service_director_v2.modules.dynamic_sidecar.scheduler._core._observer import ( + _apply_observation_cycle, +) + + +@pytest.fixture +def disable_observation(mocker: MockerFixture) -> None: + mocker.patch( + "simcore_service_director_v2.modules.dynamic_sidecar.scheduler._task.DynamicSidecarsScheduler.start", + autospec=True, + ) + + +@pytest.fixture +def mock_are_sidecar_and_proxy_services_present(mocker: MockerFixture) -> None: + mocker.patch( + "simcore_service_director_v2.modules.dynamic_sidecar.scheduler._core._observer.are_sidecar_and_proxy_services_present", + autospec=True, + return_value=False, + ) + + +@pytest.fixture +def mock_events(mocker: MockerFixture) -> None: + for event_to_mock in ( + "CreateSidecars", + "WaitForSidecarAPI", + "UpdateHealth", + "GetStatus", + "PrepareServicesEnvironment", + "CreateUserServices", + "AttachProjectsNetworks", + "RemoveUserCreatedServices", + ): + mocker.patch( + f"simcore_service_director_v2.modules.dynamic_sidecar.scheduler._core._events.{event_to_mock}.action", + autospec=True, + return_value=True, + ) + + +@pytest.fixture +def mock_env( + docker_swarm: None, + mock_env: EnvVarsDict, + monkeypatch: MonkeyPatch, +) -> None: + monkeypatch.setenv("SIMCORE_SERVICES_NETWORK_NAME", "test_network") + monkeypatch.setenv("DIRECTOR_HOST", "mocked_out") + monkeypatch.setenv("DIRECTOR_V2_DYNAMIC_SCHEDULER_ENABLED", "true") + monkeypatch.setenv("S3_ENDPOINT", "endpoint") + monkeypatch.setenv("S3_ACCESS_KEY", "access_key") + monkeypatch.setenv("S3_SECRET_KEY", "secret_key") + monkeypatch.setenv("S3_BUCKET_NAME", "bucket_name") + monkeypatch.setenv("S3_SECURE", "false") + monkeypatch.setenv("DIRECTOR_V2_POSTGRES_ENABLED", "false") + monkeypatch.setenv("POSTGRES_HOST", "test") + monkeypatch.setenv("POSTGRES_USER", "test") + monkeypatch.setenv("POSTGRES_PASSWORD", "test") + monkeypatch.setenv("POSTGRES_DB", "test") + + +@pytest.fixture +def mocked_app(mock_env: None) -> FastAPI: + app = FastAPI() + app.state.settings = AppSettings() + app.state.rabbitmq_client = AsyncMock() + return app + + +@pytest.fixture +async def dynamic_sidecar_scheduler(mocked_app: FastAPI) -> DynamicSidecarsScheduler: + await setup_scheduler(mocked_app) + await setup(mocked_app) + + yield mocked_app.state.dynamic_sidecar_scheduler + + await shutdown_scheduler(mocked_app) + await shutdown(mocked_app) + + +def _is_observation_task_present( + dynamic_sidecar_scheduler, + scheduler_data_from_http_request, +) -> bool: + return ( + scheduler_data_from_http_request.service_name + in dynamic_sidecar_scheduler._scheduler._service_observation_task + ) + + +@pytest.mark.parametrize("can_save", [None, False, False]) +async def test_regression_break_endless_loop_cancellation_edge_case( + disable_observation: None, + mock_are_sidecar_and_proxy_services_present: None, + mock_events: None, + dynamic_sidecar_scheduler: DynamicSidecarsScheduler, + scheduler_data_from_http_request: SchedulerData, + can_save: Optional[bool], +): + # in this situation the scheduler would never end loops forever + await dynamic_sidecar_scheduler._scheduler._add_service( + scheduler_data_from_http_request + ) + + # simulate edge case + scheduler_data_from_http_request.dynamic_sidecar.were_containers_created = True + + assert ( + _is_observation_task_present( + dynamic_sidecar_scheduler, scheduler_data_from_http_request + ) + is False + ) + + # NOTE: this will create the observation task as well! + # Simulates user action like going back to the dashboard. + await dynamic_sidecar_scheduler.mark_service_for_removal( + scheduler_data_from_http_request.node_uuid, can_save=can_save + ) + + assert ( + _is_observation_task_present( + dynamic_sidecar_scheduler, scheduler_data_from_http_request + ) + is True + ) + + await _apply_observation_cycle( + dynamic_sidecar_scheduler, scheduler_data_from_http_request + ) + + assert ( + _is_observation_task_present( + dynamic_sidecar_scheduler, scheduler_data_from_http_request + ) + is False + ) From 525a1b49b9a3d9809a5a92bae545eaf360ee2f48 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Tue, 14 Feb 2023 16:29:05 +0100 Subject: [PATCH 5/6] fix broken unittest --- .../tests/unit/test_modules_dynamic_sidecar_client_api_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_base.py b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_base.py index 62f549f6d2a..a6e61a9b5f5 100644 --- a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_base.py +++ b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_client_api_base.py @@ -121,7 +121,7 @@ async def raises_request_error(self) -> Response: connections_message = caplog_info_level.messages[-1] assert ( connections_message - == "Pool status @ 'POOL TIMEOUT': requests=[], connections=[]" + == "Pool status @ 'POOL TIMEOUT': requests(0)=[], connections(0)=[]" ) else: _assert_messages(caplog_info_level.messages) From 9b47d87d4bd3aab738a83619ba678d8eee7100c2 Mon Sep 17 00:00:00 2001 From: Andrei Neagu Date: Wed, 15 Feb 2023 08:36:08 +0100 Subject: [PATCH 6/6] refactor --- .../test_modules_dynamic_sidecar_observer.py | 33 +++++++++++-------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_observer.py b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_observer.py index aabd4f8dc40..35b7576b773 100644 --- a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_observer.py +++ b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_observer.py @@ -9,7 +9,7 @@ from fastapi import FastAPI from pytest import MonkeyPatch from pytest_mock.plugin import MockerFixture -from pytest_simcore.helpers.typing_env import EnvVarsDict +from pytest_simcore.helpers.utils_envs import EnvVarsDict, setenvs_from_dict from simcore_service_director_v2.core.settings import AppSettings from simcore_service_director_v2.models.schemas.dynamic_services import SchedulerData from simcore_service_director_v2.modules.dynamic_sidecar.api_client import ( @@ -68,19 +68,24 @@ def mock_env( mock_env: EnvVarsDict, monkeypatch: MonkeyPatch, ) -> None: - monkeypatch.setenv("SIMCORE_SERVICES_NETWORK_NAME", "test_network") - monkeypatch.setenv("DIRECTOR_HOST", "mocked_out") - monkeypatch.setenv("DIRECTOR_V2_DYNAMIC_SCHEDULER_ENABLED", "true") - monkeypatch.setenv("S3_ENDPOINT", "endpoint") - monkeypatch.setenv("S3_ACCESS_KEY", "access_key") - monkeypatch.setenv("S3_SECRET_KEY", "secret_key") - monkeypatch.setenv("S3_BUCKET_NAME", "bucket_name") - monkeypatch.setenv("S3_SECURE", "false") - monkeypatch.setenv("DIRECTOR_V2_POSTGRES_ENABLED", "false") - monkeypatch.setenv("POSTGRES_HOST", "test") - monkeypatch.setenv("POSTGRES_USER", "test") - monkeypatch.setenv("POSTGRES_PASSWORD", "test") - monkeypatch.setenv("POSTGRES_DB", "test") + setenvs_from_dict( + monkeypatch, + { + "SIMCORE_SERVICES_NETWORK_NAME": "test_network", + "DIRECTOR_HOST": "mocked_out", + "DIRECTOR_V2_DYNAMIC_SCHEDULER_ENABLED": "true", + "S3_ENDPOINT": "endpoint", + "S3_ACCESS_KEY": "access_key", + "S3_SECRET_KEY": "secret_key", + "S3_BUCKET_NAME": "bucket_name", + "S3_SECURE": "false", + "DIRECTOR_V2_POSTGRES_ENABLED": "false", + "POSTGRES_HOST": "test", + "POSTGRES_USER": "test", + "POSTGRES_PASSWORD": "test", + "POSTGRES_DB": "test", + }, + ) @pytest.fixture