Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Fix edge case when cancelling observation #3878

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
def _log_pool_status(client: AsyncClient, event_name: str) -> None:
# pylint: disable=protected-access
logger.warning(
"Pool status @ '%s': requests=%s, connections=%s",
"Pool status @ '%s': requests(%s)=%s, connections(%s)=%s",
event_name.upper(),
len(client._transport._pool._requests),
[
(r.request.method, r.request.url, r.request.headers)
(id(r), r.request.method, r.request.url, r.request.headers)
for r in client._transport._pool._requests
],
client._transport._pool.connections,
len(client._transport._pool.connections),
[(id(c), c.__dict__) for c in client._transport._pool.connections],
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,10 @@ def list_services(

@abstractmethod
async def mark_service_for_removal(
self, node_uuid: NodeID, can_save: Optional[bool]
self,
node_uuid: NodeID,
can_save: Optional[bool],
skip_observation_recreation: bool = False,
) -> None:
"""The service will be removed as soon as possible"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ async def _apply_observation_cycle(
await scheduler.mark_service_for_removal(
node_uuid=scheduler_data.node_uuid,
can_save=scheduler_data.dynamic_sidecar.were_containers_created,
skip_observation_recreation=True,
)

for dynamic_scheduler_event in REGISTERED_EVENTS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,10 @@ def _is_scheduled(node_id: NodeID) -> bool:
)

async def mark_service_for_removal(
self, node_uuid: NodeID, can_save: Optional[bool]
self,
node_uuid: NodeID,
can_save: Optional[bool],
skip_observation_recreation: bool = False,
) -> None:
"""Marks service for removal, causing RemoveMarkedService to trigger"""
async with self._lock:
Expand Down Expand Up @@ -268,14 +271,19 @@ async def _await_task(task: asyncio.Task) -> None:
except asyncio.TimeoutError:
pass

if skip_observation_recreation:
return

# recreate new observation
dynamic_sidecar_settings: DynamicSidecarSettings = (
self.app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SIDECAR
)
dynamic_scheduler: DynamicServicesSchedulerSettings = (
self.app.state.settings.DYNAMIC_SERVICES.DYNAMIC_SCHEDULER
)
self._service_observation_task[service_name] = self.__get_observation_task(
self._service_observation_task[
service_name
] = self.__create_observation_task(
dynamic_sidecar_settings, dynamic_scheduler, service_name
)

Expand Down Expand Up @@ -461,7 +469,7 @@ async def restart_containers(self, node_uuid: NodeID) -> None:
def _enqueue_observation_from_service_name(self, service_name: str) -> None:
self._trigger_observation_queue.put_nowait(service_name)

def __get_observation_task(
def __create_observation_task(
self,
dynamic_sidecar_settings: DynamicSidecarSettings,
dynamic_scheduler: DynamicServicesSchedulerSettings,
Expand Down Expand Up @@ -509,7 +517,7 @@ async def _run_trigger_observation_queue_task(self) -> None:
if self._service_observation_task.get(service_name) is None:
self._service_observation_task[
service_name
] = self.__get_observation_task(
] = self.__create_observation_task(
dynamic_sidecar_settings, dynamic_scheduler, service_name
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,14 @@ def list_services(
return self._scheduler.list_services(user_id=user_id, project_id=project_id)

async def mark_service_for_removal(
self, node_uuid: NodeID, can_save: Optional[bool]
self,
node_uuid: NodeID,
can_save: Optional[bool],
skip_observation_recreation: bool = False,
) -> None:
return await self._scheduler.mark_service_for_removal(node_uuid, can_save)
return await self._scheduler.mark_service_for_removal(
node_uuid, can_save, skip_observation_recreation
)

async def get_stack_status(self, node_uuid: NodeID) -> RunningDynamicServiceDetails:
return await self._scheduler.get_stack_status(node_uuid)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ async def raises_request_error(self) -> Response:
connections_message = caplog_info_level.messages[-1]
assert (
connections_message
== "Pool status @ 'POOL TIMEOUT': requests=[], connections=[]"
== "Pool status @ 'POOL TIMEOUT': requests(0)=[], connections(0)=[]"
GitHK marked this conversation as resolved.
Show resolved Hide resolved
)
else:
_assert_messages(caplog_info_level.messages)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# pylint:disable=protected-access
# pylint:disable=redefined-outer-name
# pylint:disable=unused-argument

from typing import Optional
from unittest.mock import AsyncMock

import pytest
from fastapi import FastAPI
from pytest import MonkeyPatch
from pytest_mock.plugin import MockerFixture
from pytest_simcore.helpers.utils_envs import EnvVarsDict, setenvs_from_dict
from simcore_service_director_v2.core.settings import AppSettings
from simcore_service_director_v2.models.schemas.dynamic_services import SchedulerData
from simcore_service_director_v2.modules.dynamic_sidecar.api_client import (
setup,
shutdown,
)
from simcore_service_director_v2.modules.dynamic_sidecar.scheduler import (
DynamicSidecarsScheduler,
setup_scheduler,
shutdown_scheduler,
)
from simcore_service_director_v2.modules.dynamic_sidecar.scheduler._core._observer import (
_apply_observation_cycle,
)


@pytest.fixture
def disable_observation(mocker: MockerFixture) -> None:
mocker.patch(
"simcore_service_director_v2.modules.dynamic_sidecar.scheduler._task.DynamicSidecarsScheduler.start",
autospec=True,
)


@pytest.fixture
def mock_are_sidecar_and_proxy_services_present(mocker: MockerFixture) -> None:
mocker.patch(
"simcore_service_director_v2.modules.dynamic_sidecar.scheduler._core._observer.are_sidecar_and_proxy_services_present",
autospec=True,
return_value=False,
)


@pytest.fixture
def mock_events(mocker: MockerFixture) -> None:
for event_to_mock in (
"CreateSidecars",
"WaitForSidecarAPI",
"UpdateHealth",
"GetStatus",
"PrepareServicesEnvironment",
"CreateUserServices",
"AttachProjectsNetworks",
"RemoveUserCreatedServices",
):
mocker.patch(
f"simcore_service_director_v2.modules.dynamic_sidecar.scheduler._core._events.{event_to_mock}.action",
autospec=True,
return_value=True,
)


@pytest.fixture
def mock_env(
docker_swarm: None,
mock_env: EnvVarsDict,
monkeypatch: MonkeyPatch,
) -> None:
setenvs_from_dict(
monkeypatch,
{
"SIMCORE_SERVICES_NETWORK_NAME": "test_network",
"DIRECTOR_HOST": "mocked_out",
"DIRECTOR_V2_DYNAMIC_SCHEDULER_ENABLED": "true",
"S3_ENDPOINT": "endpoint",
"S3_ACCESS_KEY": "access_key",
"S3_SECRET_KEY": "secret_key",
"S3_BUCKET_NAME": "bucket_name",
"S3_SECURE": "false",
"DIRECTOR_V2_POSTGRES_ENABLED": "false",
"POSTGRES_HOST": "test",
"POSTGRES_USER": "test",
"POSTGRES_PASSWORD": "test",
"POSTGRES_DB": "test",
},
)


@pytest.fixture
def mocked_app(mock_env: None) -> FastAPI:
app = FastAPI()
app.state.settings = AppSettings()
app.state.rabbitmq_client = AsyncMock()
return app


@pytest.fixture
async def dynamic_sidecar_scheduler(mocked_app: FastAPI) -> DynamicSidecarsScheduler:
await setup_scheduler(mocked_app)
await setup(mocked_app)

yield mocked_app.state.dynamic_sidecar_scheduler

await shutdown_scheduler(mocked_app)
await shutdown(mocked_app)


def _is_observation_task_present(
dynamic_sidecar_scheduler,
scheduler_data_from_http_request,
) -> bool:
return (
scheduler_data_from_http_request.service_name
in dynamic_sidecar_scheduler._scheduler._service_observation_task
)


@pytest.mark.parametrize("can_save", [None, False, False])
async def test_regression_break_endless_loop_cancellation_edge_case(
disable_observation: None,
mock_are_sidecar_and_proxy_services_present: None,
mock_events: None,
dynamic_sidecar_scheduler: DynamicSidecarsScheduler,
scheduler_data_from_http_request: SchedulerData,
can_save: Optional[bool],
):
# in this situation the scheduler would never end loops forever
await dynamic_sidecar_scheduler._scheduler._add_service(
scheduler_data_from_http_request
)

# simulate edge case
scheduler_data_from_http_request.dynamic_sidecar.were_containers_created = True

assert (
_is_observation_task_present(
dynamic_sidecar_scheduler, scheduler_data_from_http_request
)
is False
)

# NOTE: this will create the observation task as well!
# Simulates user action like going back to the dashboard.
await dynamic_sidecar_scheduler.mark_service_for_removal(
scheduler_data_from_http_request.node_uuid, can_save=can_save
)

assert (
_is_observation_task_present(
dynamic_sidecar_scheduler, scheduler_data_from_http_request
)
is True
)

await _apply_observation_cycle(
dynamic_sidecar_scheduler, scheduler_data_from_http_request
)

assert (
_is_observation_task_present(
dynamic_sidecar_scheduler, scheduler_data_from_http_request
)
is False
)