From 6d4847111040fa5a2bb23d146f41aed2c79b414f Mon Sep 17 00:00:00 2001 From: sanderegg <35365065+sanderegg@users.noreply.github.com> Date: Thu, 1 Dec 2022 15:26:52 +0100 Subject: [PATCH] =?UTF-8?q?Revert=20"=E2=AC=86=EF=B8=8F=20Maintenance:=20U?= =?UTF-8?q?pgrade=20python-socketio=20+=20flakyness=20(#3622)"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit db03a28713d14aa64698ba6deffa339e95ed2803. --- services/web/server/requirements/_base.in | 2 +- services/web/server/requirements/_base.txt | 15 +- services/web/server/requirements/_test.txt | 3 +- services/web/server/requirements/_tools.txt | 35 --- .../socketio/server.py | 30 +-- .../integration/01/test_garbage_collection.py | 12 +- .../tests/integration/02/test_rabbit.py | 82 ++++--- .../test_resource_manager.py | 223 +++++++----------- 8 files changed, 137 insertions(+), 265 deletions(-) diff --git a/services/web/server/requirements/_base.in b/services/web/server/requirements/_base.in index f73f7f7a306..b7b5d6d9f2e 100644 --- a/services/web/server/requirements/_base.in +++ b/services/web/server/requirements/_base.in @@ -22,7 +22,7 @@ # From 5.0.0, https://github.com/miguelgrinberg/python-socketio/blob/main/CHANGES.md # test_resource_manager.py::test_websocket_resource_management fails because # socket_id saved in redis does not correspond to client's sio -python-socketio +python-socketio~=4.6.1 diff --git a/services/web/server/requirements/_base.txt b/services/web/server/requirements/_base.txt index 19d8aaf4f5c..6cb58a08db2 100644 --- a/services/web/server/requirements/_base.txt +++ b/services/web/server/requirements/_base.txt @@ -5,11 +5,7 @@ # pip-compile --output-file=requirements/_base.txt --strip-extras requirements/_base.in # aio-pika==8.2.4 - # via - # -c requirements/../../../../packages/service-library/requirements/./_base.in - # -r requirements/../../../../packages/service-library/requirements/_base.in - # -r requirements/../../../../packages/simcore-sdk/requirements/../../../packages/service-library/requirements/_base.in - # -r requirements/_base.in + # via -r requirements/_base.in aiocache==0.11.1 # via -r requirements/../../../../packages/simcore-sdk/requirements/_base.in aiodebug==2.3.0 @@ -88,8 +84,6 @@ attrs==21.4.0 # aiohttp # jsonschema # openapi-core -bidict==0.22.0 - # via python-socketio certifi==2022.6.15 # via requests cffi==1.15.0 @@ -204,7 +198,6 @@ openapi-spec-validator==0.4.0 # -c requirements/../../../../packages/service-library/requirements/././constraints.txt # -c requirements/../../../../packages/service-library/requirements/./constraints.txt # -c requirements/../../../../packages/simcore-sdk/requirements/../../../packages/service-library/requirements/./constraints.txt - # -c requirements/../../../../packages/simcore-sdk/requirements/./constraints.txt # openapi-core openpyxl==3.0.9 # via -r requirements/_base.in @@ -269,11 +262,11 @@ pyparsing==3.0.9 # via packaging pyrsistent==0.18.1 # via jsonschema -python-engineio==4.3.4 +python-engineio==3.14.2 # via python-socketio python-magic==0.4.25 # via -r requirements/_base.in -python-socketio==5.7.2 +python-socketio==4.6.1 # via -r requirements/_base.in pytz==2022.1 # via twilio @@ -309,6 +302,8 @@ six==1.16.0 # isodate # jsonschema # openapi-core + # python-engineio + # python-socketio sqlalchemy==1.4.37 # via # -c requirements/../../../../packages/models-library/requirements/../../../requirements/constraints.txt diff --git a/services/web/server/requirements/_test.txt b/services/web/server/requirements/_test.txt index 7c50ef89677..3e2e777e99c 100644 --- a/services/web/server/requirements/_test.txt +++ b/services/web/server/requirements/_test.txt @@ -155,7 +155,6 @@ ptvsd==4.3.2 # via -r requirements/_test.in py==1.11.0 # via - # -c requirements/../../../../requirements/constraints.txt # -r requirements/_test.in # pytest-forked pylint==2.15.5 @@ -260,7 +259,7 @@ urllib3==1.26.11 # requests websocket-client==1.4.1 # via docker -websockets==10.4 +websockets==10.3 # via -r requirements/_test.in wrapt==1.14.1 # via diff --git a/services/web/server/requirements/_tools.txt b/services/web/server/requirements/_tools.txt index c2c50f01f8e..71c6cfbb518 100644 --- a/services/web/server/requirements/_tools.txt +++ b/services/web/server/requirements/_tools.txt @@ -4,10 +4,6 @@ # # pip-compile --output-file=requirements/_tools.txt --strip-extras requirements/_tools.in # -astroid==2.12.12 - # via - # -c requirements/_test.txt - # pylint black==22.10.0 # via -r requirements/../../../../requirements/devenv.txt build==0.8.0 @@ -22,10 +18,6 @@ click==8.1.3 # -c requirements/_test.txt # black # pip-tools -dill==0.3.5.1 - # via - # -c requirements/_test.txt - # pylint distlib==0.3.6 # via virtualenv filelock==3.8.0 @@ -38,16 +30,6 @@ isort==5.10.1 # via # -c requirements/_test.txt # -r requirements/../../../../requirements/devenv.txt - # pylint -lazy-object-proxy==1.7.1 - # via - # -c requirements/_base.txt - # -c requirements/_test.txt - # astroid -mccabe==0.7.0 - # via - # -c requirements/_test.txt - # pylint mypy-extensions==0.4.3 # via black nodeenv==1.7.0 @@ -69,14 +51,9 @@ platformdirs==2.5.2 # via # -c requirements/_test.txt # black - # pylint # virtualenv pre-commit==2.20.0 # via -r requirements/../../../../requirements/devenv.txt -pylint==2.15.5 - # via - # -c requirements/_test.txt - # -r requirements/../../../../requirements/devenv.txt pyparsing==3.0.9 # via # -c requirements/_base.txt @@ -96,27 +73,15 @@ tomli==2.0.1 # black # build # pep517 - # pylint -tomlkit==0.11.5 - # via - # -c requirements/_test.txt - # pylint typing-extensions==4.3.0 # via # -c requirements/_base.txt # -c requirements/_test.txt - # astroid # black - # pylint virtualenv==20.16.5 # via pre-commit wheel==0.37.1 # via pip-tools -wrapt==1.14.1 - # via - # -c requirements/_base.txt - # -c requirements/_test.txt - # astroid # The following packages are considered to be unsafe in a requirements file: # pip diff --git a/services/web/server/src/simcore_service_webserver/socketio/server.py b/services/web/server/src/simcore_service_webserver/socketio/server.py index 6ec40a6e365..d82b5a6c676 100644 --- a/services/web/server/src/simcore_service_webserver/socketio/server.py +++ b/services/web/server/src/simcore_service_webserver/socketio/server.py @@ -1,6 +1,4 @@ -import asyncio import logging -from typing import AsyncIterator from aiohttp import web from socketio import AsyncServer @@ -15,40 +13,14 @@ def get_socket_server(app: web.Application) -> AsyncServer: return app[APP_CLIENT_SOCKET_SERVER_KEY] -async def _socketio_server_cleanup_ctx(_app: web.Application) -> AsyncIterator[None]: - yield - # NOTE: this is ugly. It seems though that python-enginio does not - # cleanup its background tasks properly. - # https://github.com/miguelgrinberg/python-socketio/discussions/1092 - current_tasks = asyncio.tasks.all_tasks() - cancelled_tasks = [] - for task in current_tasks: - coro = task.get_coro() - if any( - coro_name in coro.__qualname__ # type: ignore - for coro_name in [ - "AsyncServer._service_task", - "AsyncSocket.schedule_ping", - ] - ): - task.cancel() - cancelled_tasks.append(task) - await asyncio.gather(*cancelled_tasks, return_exceptions=True) - - def setup_socketio_server(app: web.Application): if app.get(APP_CLIENT_SOCKET_SERVER_KEY) is None: # SEE https://github.com/miguelgrinberg/python-socketio/blob/v4.6.1/docs/server.rst#aiohttp # TODO: ujson to speed up? # TODO: client_manager= to socketio.AsyncRedisManager/AsyncAioPikaManager for horizontal scaling (shared sessions) - sio = AsyncServer( - async_mode="aiohttp", - logger=log, # type: ignore - engineio_logger=False, - ) + sio = AsyncServer(async_mode="aiohttp", logger=log, engineio_logger=False) sio.attach(app) app[APP_CLIENT_SOCKET_SERVER_KEY] = sio - app.cleanup_ctx.append(_socketio_server_cleanup_ctx) return get_socket_server(app) diff --git a/services/web/server/tests/integration/01/test_garbage_collection.py b/services/web/server/tests/integration/01/test_garbage_collection.py index 4185a184853..7324b5ca59e 100644 --- a/services/web/server/tests/integration/01/test_garbage_collection.py +++ b/services/web/server/tests/integration/01/test_garbage_collection.py @@ -279,12 +279,8 @@ async def connect_to_socketio(client, user, socketio_client_factory: Callable): "user_id": str(user["id"]), "client_session_id": cur_client_session_id, } - assert await socket_registry.find_keys(("socket_id", sio.get_sid())) == [ - resource_key - ] - assert sio.get_sid() in await socket_registry.find_resources( - resource_key, "socket_id" - ) + assert await socket_registry.find_keys(("socket_id", sio.sid)) == [resource_key] + assert sio.sid in await socket_registry.find_resources(resource_key, "socket_id") assert len(await socket_registry.find_resources(resource_key, "socket_id")) == 1 sio_connection_data = sio, resource_key return sio_connection_data @@ -293,12 +289,12 @@ async def connect_to_socketio(client, user, socketio_client_factory: Callable): async def disconnect_user_from_socketio(client, sio_connection_data): """disconnect a previously connected socket.io connection""" sio, resource_key = sio_connection_data - sid = sio.get_sid() + sid = sio.sid socket_registry = get_registry(client.server.app) await sio.disconnect() assert not sio.sid await asyncio.sleep(0) # just to ensure there is a context switch - assert not await socket_registry.find_keys(("socket_id", sio.get_sid())) + assert not await socket_registry.find_keys(("socket_id", sio.sid)) assert not sid in await socket_registry.find_resources(resource_key, "socket_id") assert not await socket_registry.find_resources(resource_key, "socket_id") diff --git a/services/web/server/tests/integration/02/test_rabbit.py b/services/web/server/tests/integration/02/test_rabbit.py index 0659436f6a1..adfed7d71f7 100644 --- a/services/web/server/tests/integration/02/test_rabbit.py +++ b/services/web/server/tests/integration/02/test_rabbit.py @@ -5,6 +5,7 @@ import asyncio import json import logging +from asyncio import sleep from dataclasses import dataclass from typing import Any, AsyncIterator, Awaitable, Callable from unittest import mock @@ -28,7 +29,6 @@ from models_library.users import UserID from pytest_mock import MockerFixture from pytest_simcore.helpers.utils_login import UserInfoDict -from redis import Redis from servicelib.aiohttp.application import create_safe_application from settings_library.rabbit import RabbitSettings from simcore_postgres_database.models.comp_tasks import NodeClass @@ -185,10 +185,9 @@ async def _publish_in_rabbit( def client( event_loop: asyncio.AbstractEventLoop, aiohttp_client: Callable, - app_config: dict[str, Any], - rabbit_service: RabbitSettings, + app_config: dict[str, Any], ## waits until swarm with *_services are up + rabbit_service: RabbitSettings, ## waits until rabbit is responsive and set env vars postgres_db: sa.engine.Engine, - redis_client: Redis, monkeypatch_setenv_from_app_config: Callable, ): app_config["storage"]["enabled"] = False @@ -258,7 +257,7 @@ async def socketio_subscriber_handlers( socketio_client_factory: Callable, client_session_id: UUIDStr, mocker: MockerFixture, -) -> AsyncIterator[SocketIoHandlers]: +) -> SocketIoHandlers: """socketio SUBSCRIBER @@ -283,7 +282,7 @@ async def socketio_subscriber_handlers( # called on event mock_event_handler = mocker.Mock() sio.on(SOCKET_IO_EVENT, handler=mock_event_handler) - yield SocketIoHandlers( + return SocketIoHandlers( mock_log_handler, mock_node_update_handler, mock_event_handler ) @@ -316,7 +315,7 @@ def user_role() -> UserRole: return UserRole.USER -@pytest.fixture +@pytest.fixture(scope="function") async def rabbit_exchanges( rabbit_settings: RabbitSettings, rabbit_channel: aio_pika.Channel, @@ -373,9 +372,10 @@ async def rabbit_exchanges( # to them # +POLLING_TIME = 0.2 TIMEOUT_S = 10 RETRY_POLICY = dict( - wait=wait_fixed(0.2), + wait=wait_fixed(POLLING_TIME), stop=stop_after_delay(TIMEOUT_S), before_sleep=before_sleep_log(logger, log_level=logging.WARNING), retry=retry_if_exception_type(AssertionError), @@ -389,6 +389,7 @@ async def rabbit_exchanges( ] +@pytest.mark.flaky(max_runs=3) @pytest.mark.parametrize("user_role", USER_ROLES) async def test_publish_to_other_user( not_logged_user_id: UserID, @@ -409,13 +410,14 @@ async def test_publish_to_other_user( not_in_project_node_uuid, NUMBER_OF_MESSAGES, ) + await sleep(TIMEOUT_S) - await asyncio.sleep(TIMEOUT_S) socketio_subscriber_handlers.mock_log.assert_not_called() socketio_subscriber_handlers.mock_node_updated.assert_not_called() socketio_subscriber_handlers.mock_event.assert_not_called() +@pytest.mark.flaky(max_runs=3) @pytest.mark.parametrize("user_role", USER_ROLES) async def test_publish_to_user( logged_user: UserInfoDict, @@ -442,22 +444,21 @@ async def test_publish_to_user( NUMBER_OF_MESSAGES ) - async for attempt in AsyncRetrying(**RETRY_POLICY): - with attempt: - for mock_call, expected_message in zip( - socketio_subscriber_handlers.mock_log.mock_log_handler.call_args_list, - log_messages, - ): - value = mock_call[0] - deserialized_value = json.loads(value[0]) - assert deserialized_value == json.loads( - expected_message.json(exclude={"user_id"}) - ) - socketio_subscriber_handlers.mock_node_updated.assert_not_called() - socketio_subscriber_handlers.mock_event.assert_called_once() + for mock_call, expected_message in zip( + socketio_subscriber_handlers.mock_log.mock_log_handler.call_args_list, + log_messages, + ): + value = mock_call[0] + deserialized_value = json.loads(value[0]) + assert deserialized_value == json.loads( + expected_message.json(exclude={"user_id"}) + ) + socketio_subscriber_handlers.mock_node_updated.assert_not_called() + socketio_subscriber_handlers.mock_event.assert_called_once() @pytest.mark.parametrize("user_role", USER_ROLES) +@pytest.mark.flaky(max_runs=3) async def test_publish_about_users_project( logged_user: UserInfoDict, user_project: dict[str, Any], @@ -483,20 +484,19 @@ async def test_publish_about_users_project( NUMBER_OF_MESSAGES ) - async for attempt in AsyncRetrying(**RETRY_POLICY): - with attempt: - for mock_call, expected_message in zip( - socketio_subscriber_handlers.mock_log.call_args_list, log_messages - ): - value = mock_call[0] - deserialized_value = json.loads(value[0]) - assert deserialized_value == json.loads( - expected_message.json(exclude={"user_id"}) - ) - socketio_subscriber_handlers.mock_node_updated.assert_not_called() - socketio_subscriber_handlers.mock_event.assert_called_once() + for mock_call, expected_message in zip( + socketio_subscriber_handlers.mock_log.call_args_list, log_messages + ): + value = mock_call[0] + deserialized_value = json.loads(value[0]) + assert deserialized_value == json.loads( + expected_message.json(exclude={"user_id"}) + ) + socketio_subscriber_handlers.mock_node_updated.assert_not_called() + socketio_subscriber_handlers.mock_event.assert_called_once() +@pytest.mark.flaky(max_runs=3) @pytest.mark.parametrize("user_role", USER_ROLES) async def test_publish_about_users_projects_node( logged_user: UserInfoDict, @@ -535,11 +535,9 @@ async def test_publish_about_users_projects_node( expected_message.json(exclude={"user_id"}) ) - async for attempt in AsyncRetrying(**RETRY_POLICY): - with attempt: - # mock_log_handler.assert_has_calls(log_calls, any_order=True) - socketio_subscriber_handlers.mock_node_updated.assert_called() - assert socketio_subscriber_handlers.mock_node_updated.call_count == ( - NUMBER_OF_MESSAGES - ) - socketio_subscriber_handlers.mock_event.assert_called_once() + # mock_log_handler.assert_has_calls(log_calls, any_order=True) + socketio_subscriber_handlers.mock_node_updated.assert_called() + assert socketio_subscriber_handlers.mock_node_updated.call_count == ( + NUMBER_OF_MESSAGES + ) + socketio_subscriber_handlers.mock_event.assert_called_once() diff --git a/services/web/server/tests/unit/with_dbs/03/garbage_collector/test_resource_manager.py b/services/web/server/tests/unit/with_dbs/03/garbage_collector/test_resource_manager.py index a376d0e54fb..48f30e70a92 100644 --- a/services/web/server/tests/unit/with_dbs/03/garbage_collector/test_resource_manager.py +++ b/services/web/server/tests/unit/with_dbs/03/garbage_collector/test_resource_manager.py @@ -10,7 +10,7 @@ from asyncio import Future from copy import deepcopy from pathlib import Path -from typing import Any, AsyncIterator, Awaitable, Callable +from typing import Any, AsyncIterator, Callable from unittest import mock from unittest.mock import call @@ -32,6 +32,7 @@ from simcore_service_webserver._meta import API_VTAG from simcore_service_webserver.application_settings import setup_settings from simcore_service_webserver.db import setup_db +from simcore_service_webserver.director.plugin import setup_director from simcore_service_webserver.director_v2 import setup_director_v2 from simcore_service_webserver.login.plugin import setup_login from simcore_service_webserver.products import setup_products @@ -44,7 +45,6 @@ from simcore_service_webserver.resource_manager.plugin import setup_resource_manager from simcore_service_webserver.resource_manager.registry import ( RedisResourceRegistry, - RegistryKeyPrefixDict, get_registry, ) from simcore_service_webserver.rest import setup_rest @@ -57,8 +57,7 @@ from simcore_service_webserver.users_api import delete_user from simcore_service_webserver.users_exceptions import UserNotFoundError from tenacity._asyncio import AsyncRetrying -from tenacity.retry import retry_if_exception_type -from tenacity.stop import stop_after_delay +from tenacity.stop import stop_after_attempt, stop_after_delay from tenacity.wait import wait_fixed from yarl import URL @@ -68,33 +67,18 @@ SERVICE_DELETION_DELAY = 1 +async def open_project(client, project_uuid: str, client_session_id: str) -> None: + url = client.app.router["open_project"].url_for(project_id=project_uuid) + resp = await client.post(url, json=client_session_id) + await assert_status(resp, web.HTTPOk) + + async def close_project(client, project_uuid: str, client_session_id: str) -> None: url = client.app.router["close_project"].url_for(project_id=project_uuid) resp = await client.post(url, json=client_session_id) await assert_status(resp, web.HTTPNoContent) -@pytest.fixture -async def open_project() -> AsyncIterator[Callable[..., Awaitable[None]]]: - opened_projects = [] - - async def open_project(client, project_uuid: str, client_session_id: str) -> None: - url = client.app.router["open_project"].url_for(project_id=project_uuid) - resp = await client.post(url, json=client_session_id) - await assert_status(resp, web.HTTPOk) - opened_projects.append((client, project_uuid, client_session_id)) - - yield open_project - # cleanup, if we cannot close that is because the user_role might not allow it - await asyncio.gather( - *( - close_project(client, project_uuid, client_session_id) - for client, project_uuid, client_session_id in opened_projects - ), - return_exceptions=True, - ) - - @pytest.fixture def client( event_loop: asyncio.AbstractEventLoop, @@ -132,6 +116,7 @@ def client( setup_users(app) setup_socketio(app) setup_projects(app) + setup_director(app) setup_director_v2(app) assert setup_resource_manager(app) setup_products(app) @@ -260,32 +245,25 @@ async def test_websocket_resource_management( ): cur_client_session_id = client_session_id_factory() sio = await socketio_client_factory(cur_client_session_id) - sid = sio.get_sid() - resource_key = RegistryKeyPrefixDict( - user_id=f"{logged_user['id']}", client_session_id=cur_client_session_id - ) - - assert await socket_registry.find_keys(("socket_id", sio.get_sid())) == [ - resource_key - ] - assert sio.get_sid() in await socket_registry.find_resources( - resource_key, "socket_id" - ) + sid = sio.sid + resource_key = { + "user_id": str(logged_user["id"]), + "client_session_id": cur_client_session_id, + } + # FIXME: this check fails with python-socketio>=5.0.0 (see requirements/_base.in) + assert await socket_registry.find_keys(("socket_id", sio.sid)) == [resource_key] + assert sio.sid in await socket_registry.find_resources(resource_key, "socket_id") assert len(await socket_registry.find_resources(resource_key, "socket_id")) == 1 # NOTE: the socket.io client needs the websockets package in order to upgrade to websocket transport await sio.disconnect() - await sio.wait() - assert not sio.get_sid() - - async for attempt in AsyncRetrying(**_TENACITY_ASSERT_RETRY): - with attempt: - # now the entries should be removed - assert not await socket_registry.find_keys(("socket_id", sio.get_sid())) - assert not sid in await socket_registry.find_resources( - resource_key, "socket_id" - ) - assert not await socket_registry.find_resources(resource_key, "socket_id") + assert not sio.sid + # NOTE: let the disconnection propagate + await asyncio.sleep(1) + # now the entries should be removed + assert not await socket_registry.find_keys(("socket_id", sio.sid)) + assert not sid in await socket_registry.find_resources(resource_key, "socket_id") + assert not await socket_registry.find_resources(resource_key, "socket_id") @pytest.mark.parametrize( @@ -304,20 +282,19 @@ async def test_websocket_multiple_connections( client_session_id_factory: Callable[[], str], ): NUMBER_OF_SOCKETS = 5 - resource_keys: list[RegistryKeyPrefixDict] = [] + resource_key = {} # connect multiple clients clients = [] for socket_count in range(1, NUMBER_OF_SOCKETS + 1): cur_client_session_id = client_session_id_factory() sio = await socketio_client_factory(cur_client_session_id) - resource_key = RegistryKeyPrefixDict( - user_id=f"{logged_user['id']}", client_session_id=cur_client_session_id - ) - assert await socket_registry.find_keys(("socket_id", sio.get_sid())) == [ - resource_key - ] - assert [sio.get_sid()] == await socket_registry.find_resources( + resource_key = { + "user_id": str(logged_user["id"]), + "client_session_id": cur_client_session_id, + } + assert await socket_registry.find_keys(("socket_id", sio.sid)) == [resource_key] + assert [sio.sid] == await socket_registry.find_resources( resource_key, "socket_id" ) assert ( @@ -330,50 +307,21 @@ async def test_websocket_multiple_connections( == socket_count ) clients.append(sio) - resource_keys.append(resource_key) - for sio, resource_key in zip(clients, resource_keys): - sid = sio.get_sid() + for sio in clients: + sid = sio.sid await sio.disconnect() - await sio.wait() - + # need to attend the disconnect event to pass through the socketio internal queues + await asyncio.sleep( + 0.1 + ) # must be >= 0.01 to work without issues, added some padding assert not sio.sid - assert not await socket_registry.find_keys(("socket_id", sio.get_sid())) + assert not await socket_registry.find_keys(("socket_id", sio.sid)) assert not sid in await socket_registry.find_resources( resource_key, "socket_id" ) - for resource_key in resource_keys: - assert not await socket_registry.find_resources(resource_key, "socket_id") - - -_TENACITY_ASSERT_RETRY = dict( - reraise=True, - retry=retry_if_exception_type(AssertionError), - wait=wait_fixed(0.5), - stop=stop_after_delay(30), -) - - -@pytest.mark.skip( - reason="this test is here to show warnings when closing " - "the socketio server and could be useful as a proof" - "see https://github.com/miguelgrinberg/python-socketio/discussions/1092" - "and simcore_service_webserver.socketio.server _socketio_server_cleanup_ctx" -) -@pytest.mark.parametrize( - "user_role", - [ - (UserRole.TESTER), - ], -) -async def test_asyncio_task_pending_on_close( - client: TestClient, - logged_user: dict[str, Any], - socketio_client_factory: Callable, -): - sio = await socketio_client_factory() - # this test generates warnings on its own + assert not await socket_registry.find_resources(resource_key, "socket_id") @pytest.mark.parametrize( @@ -424,20 +372,21 @@ async def test_websocket_disconnected_after_logout( await assert_status(r, expected) # the socket2 should be gone - async for attempt in AsyncRetrying(**_TENACITY_ASSERT_RETRY): - with attempt: - assert not sio2.sid - socket_logout_mock_callable2.assert_not_called() + await asyncio.sleep(1) + assert not sio2.sid + socket_logout_mock_callable2.assert_not_called() - # the others should receive a logout message through their respective sockets - socket_logout_mock_callable.assert_called_once() - socket_logout_mock_callable2.assert_not_called() # note 2 should be not called ever - socket_logout_mock_callable3.assert_called_once() + # the others should receive a logout message through their respective sockets + await asyncio.sleep(3) + socket_logout_mock_callable.assert_called_once() + socket_logout_mock_callable2.assert_not_called() # note 2 should be not called ever + socket_logout_mock_callable3.assert_called_once() - # first socket should be closed now - assert not sio.sid - # second socket also closed - assert not sio3.sid + await asyncio.sleep(3) + # first socket should be closed now + assert not sio.sid + # second socket also closed + assert not sio3.sid @pytest.mark.parametrize( @@ -448,6 +397,7 @@ async def test_websocket_disconnected_after_logout( (UserRole.TESTER, True), ], ) +@pytest.mark.flaky(max_runs=3) # TODO: remove this flaky mark async def test_interactive_services_removed_after_logout( client: TestClient, logged_user: dict[str, Any], @@ -459,7 +409,6 @@ async def test_interactive_services_removed_after_logout( storage_subsystem_mock: MockedStorageSubsystem, # when guest user logs out garbage is collected director_v2_service_mock: aioresponses, expected_save_state: bool, - open_project: Callable, ): assert client.app @@ -487,10 +436,14 @@ async def test_interactive_services_removed_after_logout( await garbage_collector_core.collect_garbage(client.app) # assert dynamic service is removed *this is done in a fire/forget way so give a bit of leeway - async for attempt in AsyncRetrying(**_TENACITY_ASSERT_RETRY): + async for attempt in AsyncRetrying( + reraise=True, stop=stop_after_attempt(10), wait=wait_fixed(1) + ): with attempt: - print( - f"--> Waiting for stop_dynamic_service with: {service['service_uuid']}, {expected_save_state=}", + logger.warning( + "Waiting for stop to have been called service_uuid=%s, save_state=%s", + service["service_uuid"], + expected_save_state, ) mocked_director_v2_api[ "director_v2_core_dynamic_services.stop_dynamic_service" @@ -520,7 +473,6 @@ async def test_interactive_services_remain_after_websocket_reconnection_from_2_t storage_subsystem_mock, # when guest user logs out garbage is collected expected_save_state: bool, mocker: MockerFixture, - open_project: Callable, ): # login - logged_user fixture @@ -549,29 +501,26 @@ async def test_interactive_services_remain_after_websocket_reconnection_from_2_t # We have no mock-up for the heatbeat... await sio.disconnect() assert not sio.sid - async for attempt in AsyncRetrying( - **(_TENACITY_ASSERT_RETRY | dict(wait=wait_fixed(0.1))) - ): - with attempt: - socket_project_state_update_mock_callable.assert_called_with( - json.dumps( - { - "project_uuid": empty_user_project["uuid"], - "data": { - "locked": { - "value": False, - "owner": { - "user_id": logged_user["id"], - "first_name": logged_user["name"], - "last_name": "", - }, - "status": "OPENED", - }, - "state": {"value": "NOT_STARTED"}, + await asyncio.sleep(0.5) # let the thread call the method + socket_project_state_update_mock_callable.assert_called_with( + json.dumps( + { + "project_uuid": empty_user_project["uuid"], + "data": { + "locked": { + "value": False, + "owner": { + "user_id": logged_user["id"], + "first_name": logged_user["name"], + "last_name": "", }, - } - ) - ) + "status": "OPENED", + }, + "state": {"value": "NOT_STARTED"}, + }, + } + ) + ) # open project in second client await open_project(client, empty_user_project["uuid"], client_session_id2) # ensure sufficient time is wasted here @@ -594,14 +543,13 @@ async def test_interactive_services_remain_after_websocket_reconnection_from_2_t mocked_director_v2_api["director_v2_api.stop_dynamic_service"].assert_not_called() # now really disconnect await sio2.disconnect() - await sio2.wait() assert not sio2.sid # run the garbage collector # event after waiting some time await asyncio.sleep(SERVICE_DELETION_DELAY + 1) await garbage_collector_core.collect_garbage(client.app) - await asyncio.sleep(0) + await asyncio.sleep(1) # assert dynamic service is gone calls = [ call( @@ -648,7 +596,6 @@ async def test_interactive_services_removed_per_project( asyncpg_storage_system_mock, storage_subsystem_mock, # when guest user logs out garbage is collected expected_save_state: bool, - open_project: Callable, ): # create server with delay set to DELAY # login - logged_user fixture @@ -750,7 +697,6 @@ async def test_services_remain_after_closing_one_out_of_two_tabs( socketio_client_factory: Callable, client_session_id_factory: Callable[[], str], expected_save_state: bool, - open_project: Callable, ): # create server with delay set to DELAY # login - logged_user fixture @@ -804,7 +750,6 @@ async def test_websocket_disconnected_remove_or_maintain_files_based_on_role( storage_subsystem_mock, # when guest user logs out garbage is collected expect_call: bool, expected_save_state: bool, - open_project: Callable, ): # login - logged_user fixture # create empty study - empty_user_project fixture @@ -840,7 +785,7 @@ async def test_websocket_disconnected_remove_or_maintain_files_based_on_role( ].assert_has_calls(calls) # this call is done async, so wait a bit here to ensure it is correctly done - async for attempt in AsyncRetrying(**_TENACITY_ASSERT_RETRY): + async for attempt in AsyncRetrying(reraise=True, stop=stop_after_delay(10)): with attempt: if expect_call: # make sure `delete_project` is called @@ -888,6 +833,8 @@ async def test_regression_removing_unexisting_user( user_name={"first_name": "my name is", "last_name": "pytest"}, ) # since the call to delete is happening as fire and forget task, let's wait until it is done - async for attempt in AsyncRetrying(**_TENACITY_ASSERT_RETRY): + async for attempt in AsyncRetrying(reraise=True, stop=stop_after_delay(20)): with attempt: mock_storage_delete_data_folders.assert_called() + # wait a bit here so the task is completed to prevent having unclosed loops + await asyncio.sleep(1)