diff --git a/services/web/server/tests/unit/with_dbs/03/garbage_collector/test_resource_manager.py b/services/web/server/tests/unit/with_dbs/03/garbage_collector/test_resource_manager.py index 766810030dc2..e8f919d2b49d 100644 --- a/services/web/server/tests/unit/with_dbs/03/garbage_collector/test_resource_manager.py +++ b/services/web/server/tests/unit/with_dbs/03/garbage_collector/test_resource_manager.py @@ -10,7 +10,7 @@ from asyncio import Future from copy import deepcopy from pathlib import Path -from typing import Any, AsyncIterator, Callable +from typing import Any, AsyncIterator, Awaitable, Callable from unittest import mock from unittest.mock import call @@ -45,6 +45,7 @@ from simcore_service_webserver.resource_manager.plugin import setup_resource_manager from simcore_service_webserver.resource_manager.registry import ( RedisResourceRegistry, + RegistryKeyPrefixDict, get_registry, ) from simcore_service_webserver.rest import setup_rest @@ -57,7 +58,8 @@ from simcore_service_webserver.users_api import delete_user from simcore_service_webserver.users_exceptions import UserNotFoundError from tenacity._asyncio import AsyncRetrying -from tenacity.stop import stop_after_attempt, stop_after_delay +from tenacity.retry import retry_if_exception_type +from tenacity.stop import stop_after_delay from tenacity.wait import wait_fixed from yarl import URL @@ -67,18 +69,33 @@ SERVICE_DELETION_DELAY = 1 -async def open_project(client, project_uuid: str, client_session_id: str) -> None: - url = client.app.router["open_project"].url_for(project_id=project_uuid) - resp = await client.post(url, json=client_session_id) - await assert_status(resp, web.HTTPOk) - - async def close_project(client, project_uuid: str, client_session_id: str) -> None: url = client.app.router["close_project"].url_for(project_id=project_uuid) resp = await client.post(url, json=client_session_id) await assert_status(resp, web.HTTPNoContent) +@pytest.fixture +async def open_project() -> AsyncIterator[Callable[..., Awaitable[None]]]: + opened_projects = [] + + async def open_project(client, project_uuid: str, client_session_id: str) -> None: + url = client.app.router["open_project"].url_for(project_id=project_uuid) + resp = await client.post(url, json=client_session_id) + await assert_status(resp, web.HTTPOk) + opened_projects.append((client, project_uuid, client_session_id)) + + yield open_project + # cleanup, if we cannot close that is because the user_role might not allow it + await asyncio.gather( + *( + close_project(client, project_uuid, client_session_id) + for client, project_uuid, client_session_id in opened_projects + ), + return_exceptions=True, + ) + + @pytest.fixture def client( event_loop: asyncio.AbstractEventLoop, @@ -246,11 +263,10 @@ async def test_websocket_resource_management( cur_client_session_id = client_session_id_factory() sio = await socketio_client_factory(cur_client_session_id) sid = sio.get_sid() - resource_key = { - "user_id": str(logged_user["id"]), - "client_session_id": cur_client_session_id, - } - # FIXME: this check fails with python-socketio>=5.0.0 (see requirements/_base.in) + resource_key = RegistryKeyPrefixDict( + user_id=f"{logged_user['id']}", client_session_id=cur_client_session_id + ) + assert await socket_registry.find_keys(("socket_id", sio.get_sid())) == [ resource_key ] @@ -261,13 +277,17 @@ async def test_websocket_resource_management( # NOTE: the socket.io client needs the websockets package in order to upgrade to websocket transport await sio.disconnect() + await sio.wait() assert not sio.get_sid() - # NOTE: let the disconnection propagate - await asyncio.sleep(1) - # now the entries should be removed - assert not await socket_registry.find_keys(("socket_id", sio.get_sid())) - assert not sid in await socket_registry.find_resources(resource_key, "socket_id") - assert not await socket_registry.find_resources(resource_key, "socket_id") + + async for attempt in AsyncRetrying(**_TENACITY_ASSERT_RETRY): + with attempt: + # now the entries should be removed + assert not await socket_registry.find_keys(("socket_id", sio.get_sid())) + assert not sid in await socket_registry.find_resources( + resource_key, "socket_id" + ) + assert not await socket_registry.find_resources(resource_key, "socket_id") @pytest.mark.parametrize( @@ -286,19 +306,20 @@ async def test_websocket_multiple_connections( client_session_id_factory: Callable[[], str], ): NUMBER_OF_SOCKETS = 5 - resource_key = {} + resource_keys: list[RegistryKeyPrefixDict] = [] # connect multiple clients clients = [] for socket_count in range(1, NUMBER_OF_SOCKETS + 1): cur_client_session_id = client_session_id_factory() sio = await socketio_client_factory(cur_client_session_id) - resource_key = { - "user_id": str(logged_user["id"]), - "client_session_id": cur_client_session_id, - } - assert await socket_registry.find_keys(("socket_id", sio.sid)) == [resource_key] - assert [sio.sid] == await socket_registry.find_resources( + resource_key = RegistryKeyPrefixDict( + user_id=f"{logged_user['id']}", client_session_id=cur_client_session_id + ) + assert await socket_registry.find_keys(("socket_id", sio.get_sid())) == [ + resource_key + ] + assert [sio.get_sid()] == await socket_registry.find_resources( resource_key, "socket_id" ) assert ( @@ -311,21 +332,29 @@ async def test_websocket_multiple_connections( == socket_count ) clients.append(sio) + resource_keys.append(resource_key) - for sio in clients: - sid = sio.sid + for sio, resource_key in zip(clients, resource_keys): + sid = sio.get_sid() await sio.disconnect() - # need to attend the disconnect event to pass through the socketio internal queues - await asyncio.sleep( - 0.1 - ) # must be >= 0.01 to work without issues, added some padding + await sio.wait() + assert not sio.sid - assert not await socket_registry.find_keys(("socket_id", sio.sid)) + assert not await socket_registry.find_keys(("socket_id", sio.get_sid())) assert not sid in await socket_registry.find_resources( resource_key, "socket_id" ) - assert not await socket_registry.find_resources(resource_key, "socket_id") + for resource_key in resource_keys: + assert not await socket_registry.find_resources(resource_key, "socket_id") + + +_TENACITY_ASSERT_RETRY = dict( + reraise=True, + retry=retry_if_exception_type(AssertionError), + wait=wait_fixed(0.5), + stop=stop_after_delay(30), +) @pytest.mark.parametrize( @@ -376,21 +405,20 @@ async def test_websocket_disconnected_after_logout( await assert_status(r, expected) # the socket2 should be gone - await asyncio.sleep(1) - assert not sio2.sid - socket_logout_mock_callable2.assert_not_called() + async for attempt in AsyncRetrying(**_TENACITY_ASSERT_RETRY): + with attempt: + assert not sio2.sid + socket_logout_mock_callable2.assert_not_called() - # the others should receive a logout message through their respective sockets - await asyncio.sleep(3) - socket_logout_mock_callable.assert_called_once() - socket_logout_mock_callable2.assert_not_called() # note 2 should be not called ever - socket_logout_mock_callable3.assert_called_once() + # the others should receive a logout message through their respective sockets + socket_logout_mock_callable.assert_called_once() + socket_logout_mock_callable2.assert_not_called() # note 2 should be not called ever + socket_logout_mock_callable3.assert_called_once() - await asyncio.sleep(3) - # first socket should be closed now - assert not sio.sid - # second socket also closed - assert not sio3.sid + # first socket should be closed now + assert not sio.sid + # second socket also closed + assert not sio3.sid @pytest.mark.parametrize( @@ -401,7 +429,6 @@ async def test_websocket_disconnected_after_logout( (UserRole.TESTER, True), ], ) -@pytest.mark.flaky(max_runs=3) # TODO: remove this flaky mark async def test_interactive_services_removed_after_logout( client: TestClient, logged_user: dict[str, Any], @@ -413,6 +440,7 @@ async def test_interactive_services_removed_after_logout( storage_subsystem_mock: MockedStorageSubsystem, # when guest user logs out garbage is collected director_v2_service_mock: aioresponses, expected_save_state: bool, + open_project: Callable, ): assert client.app @@ -440,14 +468,10 @@ async def test_interactive_services_removed_after_logout( await garbage_collector_core.collect_garbage(client.app) # assert dynamic service is removed *this is done in a fire/forget way so give a bit of leeway - async for attempt in AsyncRetrying( - reraise=True, stop=stop_after_attempt(10), wait=wait_fixed(1) - ): + async for attempt in AsyncRetrying(**_TENACITY_ASSERT_RETRY): with attempt: - logger.warning( - "Waiting for stop to have been called service_uuid=%s, save_state=%s", - service["service_uuid"], - expected_save_state, + print( + f"--> Waiting for stop_dynamic_service with: {service['service_uuid']}, {expected_save_state=}", ) mocked_director_v2_api[ "director_v2_core_dynamic_services.stop_dynamic_service" @@ -477,6 +501,7 @@ async def test_interactive_services_remain_after_websocket_reconnection_from_2_t storage_subsystem_mock, # when guest user logs out garbage is collected expected_save_state: bool, mocker: MockerFixture, + open_project: Callable, ): # login - logged_user fixture @@ -505,26 +530,29 @@ async def test_interactive_services_remain_after_websocket_reconnection_from_2_t # We have no mock-up for the heatbeat... await sio.disconnect() assert not sio.sid - await asyncio.sleep(0.5) # let the thread call the method - socket_project_state_update_mock_callable.assert_called_with( - json.dumps( - { - "project_uuid": empty_user_project["uuid"], - "data": { - "locked": { - "value": False, - "owner": { - "user_id": logged_user["id"], - "first_name": logged_user["name"], - "last_name": "", + async for attempt in AsyncRetrying( + **(_TENACITY_ASSERT_RETRY | dict(wait=wait_fixed(0.1))) + ): + with attempt: + socket_project_state_update_mock_callable.assert_called_with( + json.dumps( + { + "project_uuid": empty_user_project["uuid"], + "data": { + "locked": { + "value": False, + "owner": { + "user_id": logged_user["id"], + "first_name": logged_user["name"], + "last_name": "", + }, + "status": "OPENED", + }, + "state": {"value": "NOT_STARTED"}, }, - "status": "OPENED", - }, - "state": {"value": "NOT_STARTED"}, - }, - } - ) - ) + } + ) + ) # open project in second client await open_project(client, empty_user_project["uuid"], client_session_id2) # ensure sufficient time is wasted here @@ -547,13 +575,14 @@ async def test_interactive_services_remain_after_websocket_reconnection_from_2_t mocked_director_v2_api["director_v2_api.stop_dynamic_service"].assert_not_called() # now really disconnect await sio2.disconnect() + await sio2.wait() assert not sio2.sid # run the garbage collector # event after waiting some time await asyncio.sleep(SERVICE_DELETION_DELAY + 1) await garbage_collector_core.collect_garbage(client.app) - await asyncio.sleep(1) + await asyncio.sleep(0) # assert dynamic service is gone calls = [ call( @@ -600,6 +629,7 @@ async def test_interactive_services_removed_per_project( asyncpg_storage_system_mock, storage_subsystem_mock, # when guest user logs out garbage is collected expected_save_state: bool, + open_project: Callable, ): # create server with delay set to DELAY # login - logged_user fixture @@ -701,6 +731,7 @@ async def test_services_remain_after_closing_one_out_of_two_tabs( socketio_client_factory: Callable, client_session_id_factory: Callable[[], str], expected_save_state: bool, + open_project: Callable, ): # create server with delay set to DELAY # login - logged_user fixture @@ -754,6 +785,7 @@ async def test_websocket_disconnected_remove_or_maintain_files_based_on_role( storage_subsystem_mock, # when guest user logs out garbage is collected expect_call: bool, expected_save_state: bool, + open_project: Callable, ): # login - logged_user fixture # create empty study - empty_user_project fixture @@ -789,7 +821,7 @@ async def test_websocket_disconnected_remove_or_maintain_files_based_on_role( ].assert_has_calls(calls) # this call is done async, so wait a bit here to ensure it is correctly done - async for attempt in AsyncRetrying(reraise=True, stop=stop_after_delay(10)): + async for attempt in AsyncRetrying(**_TENACITY_ASSERT_RETRY): with attempt: if expect_call: # make sure `delete_project` is called @@ -837,8 +869,6 @@ async def test_regression_removing_unexisting_user( user_name={"first_name": "my name is", "last_name": "pytest"}, ) # since the call to delete is happening as fire and forget task, let's wait until it is done - async for attempt in AsyncRetrying(reraise=True, stop=stop_after_delay(20)): + async for attempt in AsyncRetrying(**_TENACITY_ASSERT_RETRY): with attempt: mock_storage_delete_data_folders.assert_called() - # wait a bit here so the task is completed to prevent having unclosed loops - await asyncio.sleep(1)