diff --git a/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py b/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py index 6276b69b945..8a4fa23f6ea 100644 --- a/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py +++ b/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py @@ -142,6 +142,3 @@ def _creator(client_name: str) -> RabbitMQClient: yield _creator # cleanup, properly close the clients await asyncio.gather(*(client.close() for client in created_clients)) - for client in created_clients: - assert client._channel_pool # pylint: disable=protected-access - assert client._channel_pool.is_closed # pylint: disable=protected-access diff --git a/packages/service-library/src/servicelib/rabbitmq.py b/packages/service-library/src/servicelib/rabbitmq.py index 062010ee5d0..daa143a3e10 100644 --- a/packages/service-library/src/servicelib/rabbitmq.py +++ b/packages/service-library/src/servicelib/rabbitmq.py @@ -107,9 +107,11 @@ async def rpc_initialize(self) -> None: await self._rpc.initialize() async def close(self) -> None: - with log_context(_logger, logging.INFO, msg="Closing connection to RabbitMQ"): - assert self._channel_pool # nosec - await self._channel_pool.close() + with log_context( + _logger, + logging.INFO, + msg=f"{self.client_name} closing connection to RabbitMQ", + ): assert self._connection_pool # nosec await self._connection_pool.close() diff --git a/packages/service-library/tests/test_rabbitmq.py b/packages/service-library/tests/test_rabbitmq.py index 93abade8c6d..a5522ff40b6 100644 --- a/packages/service-library/tests/test_rabbitmq.py +++ b/packages/service-library/tests/test_rabbitmq.py @@ -44,8 +44,6 @@ async def test_rabbit_client(rabbit_client_name: str, rabbit_service: RabbitSett await client.close() assert client._connection_pool assert client._connection_pool.is_closed - assert client._channel_pool - assert client._channel_pool.is_closed @pytest.fixture diff --git a/services/docker-compose.yml b/services/docker-compose.yml index fd6e6718b72..5f8ae754d45 100644 --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -266,6 +266,8 @@ services: - traefik.http.services.${SWARM_STACK_NAME}_webserver.loadbalancer.healthcheck.path=/v0/ - traefik.http.services.${SWARM_STACK_NAME}_webserver.loadbalancer.healthcheck.interval=2000ms - traefik.http.services.${SWARM_STACK_NAME}_webserver.loadbalancer.healthcheck.timeout=1000ms + # NOTE: stickyness must remain until the long running tasks in the webserver are removed + # and also https://github.com/ITISFoundation/osparc-simcore/pull/4180 is resolved. - traefik.http.services.${SWARM_STACK_NAME}_webserver.loadbalancer.sticky.cookie=true - traefik.http.services.${SWARM_STACK_NAME}_webserver.loadbalancer.sticky.cookie.samesite=lax - traefik.http.services.${SWARM_STACK_NAME}_webserver.loadbalancer.sticky.cookie.httponly=true diff --git a/services/web/server/src/simcore_service_webserver/groups_api.py b/services/web/server/src/simcore_service_webserver/groups_api.py index c3a0813677e..83ce98f9603 100644 --- a/services/web/server/src/simcore_service_webserver/groups_api.py +++ b/services/web/server/src/simcore_service_webserver/groups_api.py @@ -1,6 +1,6 @@ import logging import re -from typing import Any, Optional +from typing import Any import sqlalchemy as sa from aiohttp import web @@ -139,7 +139,7 @@ async def get_product_group_for_user( async def create_user_group( app: web.Application, user_id: int, new_group: dict -) -> dict[str, str]: +) -> dict[str, Any]: engine = app[APP_DB_ENGINE_KEY] async with engine.acquire() as conn: result = await conn.execute( @@ -281,9 +281,9 @@ async def add_user_in_group( user_id: int, gid: int, *, - new_user_id: Optional[int] = None, - new_user_email: Optional[str] = None, - access_rights: Optional[AccessRightsDict] = None, + new_user_id: int | None = None, + new_user_email: str | None = None, + access_rights: AccessRightsDict | None = None, ) -> None: """ adds new_user (either by id or email) in group (with gid) owned by user_id @@ -323,14 +323,13 @@ async def add_user_in_group( async def _get_user_in_group_permissions( conn: SAConnection, gid: int, the_user_id_in_group: int ) -> RowProxy: - # now get the user result = await conn.execute( sa.select([users, user_to_groups.c.access_rights]) .select_from(users.join(user_to_groups, users.c.id == user_to_groups.c.uid)) .where(and_(user_to_groups.c.gid == gid, users.c.id == the_user_id_in_group)) ) - the_user: RowProxy = await result.fetchone() + the_user: RowProxy | None = await result.fetchone() if not the_user: raise UserInGroupNotFoundError(the_user_id_in_group, gid) return the_user @@ -410,7 +409,7 @@ async def delete_user_in_group( ) -async def get_group_from_gid(app: web.Application, gid: int) -> Optional[RowProxy]: +async def get_group_from_gid(app: web.Application, gid: int) -> RowProxy | None: engine: Engine = app[APP_DB_ENGINE_KEY] async with engine.acquire() as conn: diff --git a/services/web/server/src/simcore_service_webserver/notifications/_constants.py b/services/web/server/src/simcore_service_webserver/notifications/_constants.py new file mode 100644 index 00000000000..7fa1915ef09 --- /dev/null +++ b/services/web/server/src/simcore_service_webserver/notifications/_constants.py @@ -0,0 +1,3 @@ +from typing import Final + +APP_RABBITMQ_CONSUMERS_KEY: Final[str] = f"{__name__}.rabbit_consumers" diff --git a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_consumers.py b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_consumers.py index b298b13210c..8222b404597 100644 --- a/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_consumers.py +++ b/services/web/server/src/simcore_service_webserver/notifications/_rabbitmq_consumers.py @@ -20,7 +20,7 @@ ) from servicelib.json_serialization import json_dumps from servicelib.logging_utils import log_context -from servicelib.rabbitmq import BIND_TO_ALL_TOPICS, RabbitMQClient +from servicelib.rabbitmq import RabbitMQClient from servicelib.utils import logged_gather from ..projects import projects_api @@ -35,6 +35,7 @@ SocketMessageDict, send_messages, ) +from ._constants import APP_RABBITMQ_CONSUMERS_KEY _logger = logging.getLogger(__name__) @@ -170,7 +171,7 @@ async def _events_message_parser(app: web.Application, data: bytes) -> bool: ( LoggerRabbitMessage.get_channel_name(), _log_message_parser, - dict(topics=[BIND_TO_ALL_TOPICS]), + dict(topics=[]), ), ( ProgressRabbitMessageNode.get_channel_name(), @@ -201,6 +202,12 @@ async def setup_rabbitmq_consumers(app: web.Application) -> AsyncIterator[None]: for exchange_name, parser_fct, queue_kwargs in EXCHANGE_TO_PARSER_CONFIG ) ) + app[APP_RABBITMQ_CONSUMERS_KEY] = { + exchange_name: queue_name + for (exchange_name, *_), queue_name in zip( + EXCHANGE_TO_PARSER_CONFIG, subscribed_queues + ) + } yield diff --git a/services/web/server/src/simcore_service_webserver/notifications/project_logs.py b/services/web/server/src/simcore_service_webserver/notifications/project_logs.py new file mode 100644 index 00000000000..2064e727c2b --- /dev/null +++ b/services/web/server/src/simcore_service_webserver/notifications/project_logs.py @@ -0,0 +1,31 @@ +from aiohttp import web +from models_library.projects import ProjectID +from models_library.rabbitmq_messages import LoggerRabbitMessage +from servicelib.rabbitmq import RabbitMQClient + +from ..rabbitmq import get_rabbitmq_client +from ._constants import APP_RABBITMQ_CONSUMERS_KEY + + +def _get_queue_name_from_exchange_name(app: web.Application, exchange_name: str) -> str: + exchange_to_queues = app[APP_RABBITMQ_CONSUMERS_KEY] + queue_name = exchange_to_queues[exchange_name] + return queue_name + + +async def subscribe(app: web.Application, project_id: ProjectID) -> None: + rabbit_client: RabbitMQClient = get_rabbitmq_client(app) + exchange_name = LoggerRabbitMessage.get_channel_name() + queue_name = _get_queue_name_from_exchange_name(app, exchange_name) + await rabbit_client.add_topics( + exchange_name, queue_name, topics=[f"{project_id}.*"] + ) + + +async def unsubscribe(app: web.Application, project_id: ProjectID) -> None: + rabbit_client: RabbitMQClient = get_rabbitmq_client(app) + exchange_name = LoggerRabbitMessage.get_channel_name() + queue_name = _get_queue_name_from_exchange_name(app, exchange_name) + await rabbit_client.remove_topics( + exchange_name, queue_name, topics=[f"{project_id}.*"] + ) diff --git a/services/web/server/src/simcore_service_webserver/projects/projects_events.py b/services/web/server/src/simcore_service_webserver/projects/projects_events.py index e248efac890..ac5aa8a090e 100644 --- a/services/web/server/src/simcore_service_webserver/projects/projects_events.py +++ b/services/web/server/src/simcore_service_webserver/projects/projects_events.py @@ -5,10 +5,12 @@ import logging from aiohttp import web +from models_library.projects import ProjectID from servicelib.observer import event_registry as _event_registry from servicelib.observer import observe from servicelib.utils import logged_gather +from ..notifications import project_logs from ..resource_manager.websocket_manager import PROJECT_ID_KEY, managed_resource from .projects_api import retrieve_and_notify_project_locked_state @@ -23,6 +25,10 @@ async def _on_user_disconnected( with managed_resource(user_id, client_session_id, app) as rt: list_projects: list[str] = await rt.find(PROJECT_ID_KEY) + await logged_gather( + *[project_logs.unsubscribe(app, ProjectID(prj)) for prj in list_projects] + ) + await logged_gather( *[ retrieve_and_notify_project_locked_state( @@ -33,7 +39,7 @@ async def _on_user_disconnected( ) -def setup_project_events(_app: web.Application): +def setup_project_events(_app: web.Application) -> None: # For the moment, this is only used as a placeholder to import this file # This way the functions above are registered as handlers of a give event # using the @observe decorator diff --git a/services/web/server/src/simcore_service_webserver/projects/projects_handlers.py b/services/web/server/src/simcore_service_webserver/projects/projects_handlers.py index 66574b2212c..f9a9acfa8db 100644 --- a/services/web/server/src/simcore_service_webserver/projects/projects_handlers.py +++ b/services/web/server/src/simcore_service_webserver/projects/projects_handlers.py @@ -29,6 +29,7 @@ from .._meta import api_version_prefix as VTAG from ..director_v2.exceptions import DirectorServiceError from ..login.decorators import login_required +from ..notifications import project_logs from ..products.plugin import Product, get_current_product from ..security.decorators import permission_required from . import projects_api @@ -101,6 +102,9 @@ async def open_project(request: web.Request) -> web.Response: request.app, path_params.project_id, req_ctx.product_name ) + # we now need to receive logs for that project + await project_logs.subscribe(request.app, path_params.project_id) + # user id opened project uuid if not query_params.disable_service_auto_start: with contextlib.suppress(ProjectStartsTooManyDynamicNodes): @@ -189,6 +193,7 @@ async def close_project(request: web.Request) -> web.Response: X_SIMCORE_USER_AGENT, UNDEFINED_DEFAULT_SIMCORE_USER_AGENT_VALUE ), ) + await project_logs.unsubscribe(request.app, path_params.project_id) raise web.HTTPNoContent(content_type=MIMETYPE_APPLICATION_JSON) except ProjectNotFoundError as exc: raise web.HTTPNotFound( diff --git a/services/web/server/tests/integration/02/notifications/test_rabbitmq_consumers.py b/services/web/server/tests/integration/02/notifications/test_rabbitmq_consumers.py index 275453b20a0..74f35dbdecf 100644 --- a/services/web/server/tests/integration/02/notifications/test_rabbitmq_consumers.py +++ b/services/web/server/tests/integration/02/notifications/test_rabbitmq_consumers.py @@ -1,6 +1,7 @@ # pylint: disable=redefined-outer-name # pylint: disable=unused-argument # pylint: disable=unused-variable +# pylint: disable=too-many-arguments import asyncio import json @@ -45,6 +46,7 @@ from simcore_service_webserver.diagnostics.plugin import setup_diagnostics from simcore_service_webserver.director_v2.plugin import setup_director_v2 from simcore_service_webserver.login.plugin import setup_login +from simcore_service_webserver.notifications import project_logs from simcore_service_webserver.notifications.plugin import setup_notifications from simcore_service_webserver.projects.plugin import setup_projects from simcore_service_webserver.projects.project_models import ProjectDict @@ -73,20 +75,23 @@ pytest_simcore_ops_services_selection = [] +_STABLE_DELAY_S = 3 + async def _assert_no_handler_not_called(handler: mock.Mock) -> None: with pytest.raises(RetryError): async for attempt in AsyncRetrying( retry=retry_always, - stop=stop_after_delay(5), + stop=stop_after_delay(_STABLE_DELAY_S), reraise=True, wait=wait_fixed(1), ): with attempt: print( - f"--> checking no mesage reached webclient... {attempt.retry_state.attempt_number} attempt" + f"--> checking no message reached webclient for {attempt.retry_state.attempt_number}/{_STABLE_DELAY_S}s..." ) handler.assert_not_called() + print(f"no calls received for {_STABLE_DELAY_S}s. very good.") async def _assert_handler_called(handler: mock.Mock, expected_call: mock._Call) -> None: @@ -177,6 +182,9 @@ async def rabbitmq_publisher( @pytest.mark.parametrize( "sender_same_user_id", [True, False], ids=lambda id: f"same_sender_id={id}" ) +@pytest.mark.parametrize( + "subscribe_to_logs", [True, False], ids=lambda id: f"subscribed={id}" +) async def test_log_workflow( client: TestClient, rabbitmq_publisher: RabbitMQClient, @@ -190,9 +198,10 @@ async def test_log_workflow( project_hidden: bool, aiopg_engine: aiopg.sa.Engine, sender_same_user_id: bool, + subscribe_to_logs: bool, ): """ - RabbitMQ --> Webserver --> Redis --> webclient (socketio) + RabbitMQ (TOPIC) --> Webserver --> Redis --> webclient (socketio) """ socket_io_conn = await socketio_client_factory(None, client) @@ -208,19 +217,25 @@ async def test_log_workflow( mock_log_handler = mocker.MagicMock() socket_io_conn.on(SOCKET_IO_LOG_EVENT, handler=mock_log_handler) + project_id = ProjectID(user_project["uuid"]) random_node_id_in_project = NodeID(choice(list(user_project["workbench"]))) sender_user_id = UserID(logged_user["id"]) if sender_same_user_id is False: sender_user_id = UserID(faker.pyint(min_value=logged_user["id"] + 1)) + + if subscribe_to_logs: + assert client.app + await project_logs.subscribe(client.app, project_id) + log_message = LoggerRabbitMessage( user_id=sender_user_id, - project_id=ProjectID(user_project["uuid"]), + project_id=project_id, node_id=random_node_id_in_project, messages=[faker.text() for _ in range(10)], ) await rabbitmq_publisher.publish(log_message.channel_name, log_message) - call_expected = not project_hidden and sender_same_user_id + call_expected = not project_hidden and sender_same_user_id and subscribe_to_logs if call_expected: expected_call = jsonable_encoder( log_message, exclude={"user_id", "channel_name"} @@ -230,6 +245,58 @@ async def test_log_workflow( await _assert_no_handler_not_called(mock_log_handler) +@pytest.mark.parametrize("user_role", [UserRole.GUEST], ids=str) +async def test_log_workflow_only_receives_messages_if_subscribed( + client: TestClient, + rabbitmq_publisher: RabbitMQClient, + logged_user: UserInfoDict, + user_project: ProjectDict, + faker: Faker, + mocker: MockerFixture, +): + """ + RabbitMQ (TOPIC) --> Webserver + + """ + mocked_send_messages = mocker.patch( + "simcore_service_webserver.notifications._rabbitmq_consumers.send_messages", + autospec=True, + ) + + project_id = ProjectID(user_project["uuid"]) + random_node_id_in_project = NodeID(choice(list(user_project["workbench"]))) + sender_user_id = UserID(logged_user["id"]) + + assert client.app + await project_logs.subscribe(client.app, project_id) + + log_message = LoggerRabbitMessage( + user_id=sender_user_id, + project_id=project_id, + node_id=random_node_id_in_project, + messages=[faker.text() for _ in range(10)], + ) + await rabbitmq_publisher.publish(log_message.channel_name, log_message) + await _assert_handler_called( + mocked_send_messages, + mock.call( + client.app, + f"{log_message.user_id}", + [ + { + "event_type": SOCKET_IO_LOG_EVENT, + "data": log_message.dict(exclude={"user_id", "channel_name"}), + } + ], + ), + ) + mocked_send_messages.reset_mock() + + # when unsubscribed, we do not receive the messages anymore + await project_logs.unsubscribe(client.app, project_id) + await _assert_no_handler_not_called(mocked_send_messages) + + @pytest.mark.parametrize("user_role", [UserRole.GUEST], ids=str) @pytest.mark.parametrize( "progress_type", diff --git a/services/web/server/tests/unit/with_dbs/02/test_projects_handlers__delete.py b/services/web/server/tests/unit/with_dbs/02/test_projects_handlers__delete.py index 8ac6a2a7e03..43776d8d05f 100644 --- a/services/web/server/tests/unit/with_dbs/02/test_projects_handlers__delete.py +++ b/services/web/server/tests/unit/with_dbs/02/test_projects_handlers__delete.py @@ -13,6 +13,7 @@ from aiohttp import web from aiohttp.test_utils import TestClient from faker import Faker +from models_library.projects import ProjectID from models_library.projects_state import ProjectStatus from pytest_simcore.helpers.utils_assert import assert_status from pytest_simcore.helpers.utils_webserver_unit_with_db import ( @@ -113,6 +114,7 @@ async def test_delete_project( ], ) async def test_delete_multiple_opened_project_forbidden( + mocked_notifications_plugin: dict[str, mock.Mock], mock_catalog_api: dict[str, mock.Mock], client, logged_user, @@ -128,7 +130,6 @@ async def test_delete_multiple_opened_project_forbidden( ): # service in project service = await create_dynamic_service_mock(logged_user["id"], user_project["uuid"]) - # open project in tab1 client_session_id1 = client_session_id_factory() try: @@ -139,7 +140,13 @@ async def test_delete_multiple_opened_project_forbidden( url = client.app.router["open_project"].url_for(project_id=user_project["uuid"]) resp = await client.post(url, json=client_session_id1) - await assert_status(resp, expected_ok) + data, error = await assert_status(resp, expected_ok) + if data: + mocked_notifications_plugin["subscribe"].assert_called_once_with( + client.app, ProjectID(user_project["uuid"]) + ) + else: + mocked_notifications_plugin["subscribe"].assert_not_called() # delete project in tab2 client_session_id2 = client_session_id_factory() diff --git a/services/web/server/tests/unit/with_dbs/02/test_projects_handlers__open_close.py b/services/web/server/tests/unit/with_dbs/02/test_projects_handlers__open_close.py index a5b859983ae..48de21d8c80 100644 --- a/services/web/server/tests/unit/with_dbs/02/test_projects_handlers__open_close.py +++ b/services/web/server/tests/unit/with_dbs/02/test_projects_handlers__open_close.py @@ -19,7 +19,8 @@ from aiohttp import ClientResponse, web from aiohttp.test_utils import TestClient, TestServer from faker import Faker -from models_library.projects_access import Owner +from models_library.projects import ProjectID +from models_library.projects_access import Owner, PositiveIntWithExclusiveMinimumRemoved from models_library.projects_state import ( ProjectLocked, ProjectRunningState, @@ -315,6 +316,7 @@ async def test_open_project( mock_orphaned_services: mock.Mock, mock_catalog_api: dict[str, mock.Mock], osparc_product_name: str, + mocked_notifications_plugin: dict[str, mock.Mock], ): # POST /v0/projects/{project_id}:open # open project @@ -325,6 +327,9 @@ async def test_open_project( await assert_status(resp, expected) if resp.status == web.HTTPOk.status_code: + mocked_notifications_plugin["subscribe"].assert_called_once_with( + client.app, ProjectID(user_project["uuid"]) + ) dynamic_services = { service_uuid: service for service_uuid, service in user_project["workbench"].items() @@ -354,6 +359,8 @@ async def test_open_project( mocked_director_v2_api["director_v2.api.run_dynamic_service"].assert_has_calls( calls ) + else: + mocked_notifications_plugin["subscribe"].assert_not_called() @pytest.mark.parametrize( @@ -376,6 +383,7 @@ async def test_open_template_project_for_edition( mock_orphaned_services: mock.Mock, mock_catalog_api: dict[str, mock.Mock], osparc_product_name: str, + mocked_notifications_plugin: dict[str, mock.Mock], ): # POST /v0/projects/{project_id}:open # open project @@ -390,6 +398,9 @@ async def test_open_template_project_for_edition( resp = await client.post(f"{url}", json=client_session_id_factory()) await assert_status(resp, expected) if resp.status == web.HTTPOk.status_code: + mocked_notifications_plugin["subscribe"].assert_called_once_with( + client.app, ProjectID(template_project["uuid"]) + ) dynamic_services = { service_uuid: service for service_uuid, service in template_project["workbench"].items() @@ -419,6 +430,8 @@ async def test_open_template_project_for_edition( mocked_director_v2_api["director_v2.api.run_dynamic_service"].assert_has_calls( calls ) + else: + mocked_notifications_plugin["subscribe"].assert_not_called() @pytest.mark.parametrize( @@ -472,6 +485,7 @@ async def test_open_project_with_small_amount_of_dynamic_services_starts_them_au mock_catalog_api: dict[str, mock.Mock], max_amount_of_auto_started_dyn_services: int, faker: Faker, + mocked_notifications_plugin: dict[str, mock.Mock], ): assert client.app num_of_dyn_services = max_amount_of_auto_started_dyn_services or faker.pyint( @@ -488,6 +502,10 @@ async def test_open_project_with_small_amount_of_dynamic_services_starts_them_au url = client.app.router["open_project"].url_for(project_id=project["uuid"]) resp = await client.post(f"{url}", json=client_session_id_factory()) await assert_status(resp, expected.ok) + mocked_notifications_plugin["subscribe"].assert_called_once_with( + client.app, ProjectID(project["uuid"]) + ) + mocked_notifications_plugin["subscribe"].reset_mock() assert mocked_director_v2_api[ "director_v2.api.run_dynamic_service" ].call_count == (num_of_dyn_services - num_service_already_running) @@ -505,6 +523,7 @@ async def test_open_project_with_disable_service_auto_start_set_overrides_behavi mock_catalog_api: dict[str, mock.Mock], max_amount_of_auto_started_dyn_services: int, faker: Faker, + mocked_notifications_plugin: dict[str, mock.Mock], ): assert client.app num_of_dyn_services = max_amount_of_auto_started_dyn_services or faker.pyint( @@ -526,6 +545,10 @@ async def test_open_project_with_disable_service_auto_start_set_overrides_behavi resp = await client.post(f"{url}", json=client_session_id_factory()) await assert_status(resp, expected.ok) + mocked_notifications_plugin["subscribe"].assert_called_once_with( + client.app, ProjectID(project["uuid"]) + ) + mocked_notifications_plugin["subscribe"].reset_mock() mocked_director_v2_api[ "director_v2.api.run_dynamic_service" ].assert_not_called() @@ -541,7 +564,7 @@ async def test_open_project_with_large_amount_of_dynamic_services_does_not_start mocked_director_v2_api: dict[str, mock.Mock], mock_catalog_api: dict[str, mock.Mock], max_amount_of_auto_started_dyn_services: int, - faker: Faker, + mocked_notifications_plugin: dict[str, mock.Mock], ): assert client.app @@ -557,6 +580,10 @@ async def test_open_project_with_large_amount_of_dynamic_services_does_not_start url = client.app.router["open_project"].url_for(project_id=project["uuid"]) resp = await client.post(f"{url}", json=client_session_id_factory()) await assert_status(resp, expected.ok) + mocked_notifications_plugin["subscribe"].assert_called_once_with( + client.app, ProjectID(project["uuid"]) + ) + mocked_notifications_plugin["subscribe"].reset_mock() mocked_director_v2_api[ "director_v2.api.run_dynamic_service" ].assert_not_called() @@ -575,6 +602,7 @@ async def test_open_project_with_large_amount_of_dynamic_services_starts_them_if mock_catalog_api: dict[str, mock.Mock], max_amount_of_auto_started_dyn_services: int, faker: Faker, + mocked_notifications_plugin: dict[str, mock.Mock], ): assert client.app assert max_amount_of_auto_started_dyn_services == 0, "setting not disabled!" @@ -582,7 +610,7 @@ async def test_open_project_with_large_amount_of_dynamic_services_starts_them_if # - services start in a sequence with a lock # - lock is a bit slower to acquire and release then without the non locking version # 20 services ~ 55 second runtime - num_of_dyn_services = faker.pyint(min_value=10, max_value=20) + num_of_dyn_services = 7 project = await user_project_with_num_dynamic_services(num_of_dyn_services + 1) all_service_uuids = list(project["workbench"]) for num_service_already_running in range(num_of_dyn_services): @@ -593,6 +621,10 @@ async def test_open_project_with_large_amount_of_dynamic_services_starts_them_if url = client.app.router["open_project"].url_for(project_id=project["uuid"]) resp = await client.post(f"{url}", json=client_session_id_factory()) await assert_status(resp, expected.ok) + mocked_notifications_plugin["subscribe"].assert_called_once_with( + client.app, ProjectID(project["uuid"]) + ) + mocked_notifications_plugin["subscribe"].reset_mock() mocked_director_v2_api["director_v2.api.run_dynamic_service"].assert_called() @@ -607,6 +639,7 @@ async def test_open_project_with_deprecated_services_ok_but_does_not_start_dynam mock_service_resources: ServiceResourcesDict, mock_orphaned_services, mock_catalog_api: dict[str, mock.Mock], + mocked_notifications_plugin: dict[str, mock.Mock], ): mock_catalog_api["get_service"].return_value["deprecated"] = ( datetime.utcnow() - timedelta(days=1) @@ -614,6 +647,9 @@ async def test_open_project_with_deprecated_services_ok_but_does_not_start_dynam url = client.app.router["open_project"].url_for(project_id=user_project["uuid"]) resp = await client.post(url, json=client_session_id_factory()) await assert_status(resp, expected.ok) + mocked_notifications_plugin["subscribe"].assert_called_once_with( + client.app, ProjectID(user_project["uuid"]) + ) mocked_director_v2_api["director_v2.api.run_dynamic_service"].assert_not_called() @@ -654,6 +690,7 @@ async def test_open_project_more_than_limitation_of_max_studies_open_per_user( mocked_director_v2_api: dict[str, mock.Mock], mock_catalog_api: dict[str, mock.Mock], user_role: UserRole, + mocked_notifications_plugin: dict[str, mock.Mock], ): client_id_1 = client_session_id_factory() await _open_project( @@ -680,9 +717,11 @@ async def test_close_project( client_session_id_factory: Callable, expected, mocked_director_v2_api: dict[str, mock.Mock], + mock_catalog_api: dict[str, mock.Mock], fake_services, mock_rabbitmq: None, mock_progress_bar: Any, + mocked_notifications_plugin: dict[str, mock.Mock], ): # POST /v0/projects/{project_id}:close fake_dynamic_services = fake_services(number_services=5) @@ -697,12 +736,15 @@ async def test_close_project( resp = await client.post(url, json=client_id) if resp.status == web.HTTPOk.status_code: + mocked_notifications_plugin["subscribe"].assert_called_once_with( + client.app, ProjectID(user_project["uuid"]) + ) mocked_director_v2_api["director_v2.api.list_dynamic_services"].assert_any_call( client.server.app, logged_user["id"], user_project["uuid"] ) - mocked_director_v2_api[ - "director_v2._core_dynamic_services.list_dynamic_services" - ].reset_mock() + mocked_director_v2_api["director_v2.api.list_dynamic_services"].reset_mock() + else: + mocked_notifications_plugin["subscribe"].assert_not_called() # close project url = client.app.router["close_project"].url_for(project_id=user_project["uuid"]) @@ -710,6 +752,9 @@ async def test_close_project( await assert_status(resp, expected.no_content) if resp.status == web.HTTPNoContent.status_code: + mocked_notifications_plugin["unsubscribe"].assert_called_once_with( + client.app, ProjectID(user_project["uuid"]) + ) # These checks are after a fire&forget, so we wait a moment await asyncio.sleep(2) @@ -759,6 +804,7 @@ async def test_get_active_project( socketio_client_factory: Callable, mocked_director_v2_api: dict[str, mock.Mock], mock_catalog_api: dict[str, mock.Mock], + mocked_notifications_plugin: dict[str, mock.Mock], ): # login with socket using client session id client_id1 = client_session_id_factory() @@ -792,6 +838,9 @@ async def test_get_active_project( resp = await client.get(get_active_projects_url) data, error = await assert_status(resp, expected) if resp.status == web.HTTPOk.status_code: + mocked_notifications_plugin["subscribe"].assert_called_once_with( + client.app, ProjectID(user_project["uuid"]) + ) assert not error assert ProjectState(**data.pop("state")).locked.value @@ -800,6 +849,8 @@ async def test_get_active_project( assert user_project_last_change_date < data_last_change_date assert data == user_project + else: + mocked_notifications_plugin["subscribe"].assert_not_called() # login with socket using client session id2 client_id2 = client_session_id_factory() @@ -1007,7 +1058,7 @@ async def finalize(): @pytest.fixture -def clean_redis_table(redis_client): +def clean_redis_table(redis_client) -> None: """this just ensures the redis table is cleaned up between test runs""" @@ -1025,7 +1076,9 @@ async def test_open_shared_project_2_users_locked( mocked_director_v2_api: dict[str, mock.Mock], mock_orphaned_services, mock_catalog_api: dict[str, mock.Mock], - clean_redis_table, + clean_redis_table: None, + mock_rabbitmq: None, + mocked_notifications_plugin: dict[str, mock.Mock], ): # Use-case: user 1 opens a shared project, user 2 tries to open it as well mock_project_state_updated_handler = mocker.Mock() @@ -1165,7 +1218,7 @@ async def test_open_shared_project_2_users_locked( expected_project_state_client_2.locked.value = True expected_project_state_client_2.locked.status = ProjectStatus.OPENED owner2 = Owner( - user_id=user_2["id"], + user_id=PositiveIntWithExclusiveMinimumRemoved(user_2["id"]), first_name=(user_2["name"].split(".") + [""])[0], last_name=(user_2["name"].split(".") + [""])[1], ) @@ -1206,8 +1259,10 @@ async def test_open_shared_project_at_same_time( mock_orphaned_services, mock_catalog_api: dict[str, mock.Mock], clean_redis_table, + mock_rabbitmq: None, + mocked_notifications_plugin: dict[str, mock.Mock], ): - NUMBER_OF_ADDITIONAL_CLIENTS = 20 + NUMBER_OF_ADDITIONAL_CLIENTS = 10 # log client 1 client_1 = client client_id1 = client_session_id_factory() @@ -1289,6 +1344,7 @@ async def test_opened_project_can_still_be_opened_after_refreshing_tab( mock_orphaned_services, mock_catalog_api: dict[str, mock.Mock], clean_redis_table, + mocked_notifications_plugin: dict[str, mock.Mock], ): """Simulating a refresh goes as follows: The user opens a project, then hit the F5 refresh page. diff --git a/services/web/server/tests/unit/with_dbs/03/garbage_collector/test_resource_manager.py b/services/web/server/tests/unit/with_dbs/03/garbage_collector/test_resource_manager.py index 0216a0477d3..82665067ea2 100644 --- a/services/web/server/tests/unit/with_dbs/03/garbage_collector/test_resource_manager.py +++ b/services/web/server/tests/unit/with_dbs/03/garbage_collector/test_resource_manager.py @@ -462,6 +462,7 @@ async def test_interactive_services_removed_after_logout( expected_save_state: bool, open_project: Callable, mock_progress_bar: Any, + mocked_notifications_plugin: dict[str, mock.Mock], ): assert client.app @@ -526,6 +527,7 @@ async def test_interactive_services_remain_after_websocket_reconnection_from_2_t mocker: MockerFixture, open_project: Callable, mock_progress_bar: Any, + mocked_notifications_plugin: dict[str, mock.Mock], ): # login - logged_user fixture # create empty study - empty_user_project fixture @@ -656,6 +658,7 @@ async def test_interactive_services_removed_per_project( expected_save_state: bool, open_project: Callable, mock_progress_bar: Any, + mocked_notifications_plugin: dict[str, mock.Mock], ): # create server with delay set to DELAY # login - logged_user fixture @@ -819,6 +822,7 @@ async def test_websocket_disconnected_remove_or_maintain_files_based_on_role( expected_save_state: bool, open_project: Callable, mock_progress_bar: Any, + mocked_notifications_plugin: dict[str, mock.Mock], ): # login - logged_user fixture # create empty study - empty_user_project fixture diff --git a/services/web/server/tests/unit/with_dbs/conftest.py b/services/web/server/tests/unit/with_dbs/conftest.py index 4351a00ca8a..7358efb1082 100644 --- a/services/web/server/tests/unit/with_dbs/conftest.py +++ b/services/web/server/tests/unit/with_dbs/conftest.py @@ -16,6 +16,7 @@ from copy import deepcopy from pathlib import Path from typing import Any, AsyncIterator, Callable, Iterator +from unittest import mock from unittest.mock import AsyncMock, MagicMock, Mock from uuid import uuid4 @@ -533,6 +534,7 @@ async def primary_group( client: TestClient, logged_user: UserInfoDict, ) -> dict[str, Any]: + assert client.app primary_group, _, _ = await list_user_groups(client.app, logged_user["id"]) return primary_group @@ -542,6 +544,7 @@ async def standard_groups( client: TestClient, logged_user: UserInfoDict, ) -> AsyncIterator[list[dict[str, Any]]]: + assert client.app sparc_group = { "gid": "5", # this will be replaced "label": "SPARC", @@ -601,6 +604,7 @@ async def all_group( client: TestClient, logged_user: UserInfoDict, ) -> dict[str, str]: + assert client.app _, _, all_group = await list_user_groups(client.app, logged_user["id"]) return all_group @@ -614,6 +618,20 @@ def mock_rabbitmq(mocker: MockerFixture) -> None: ) +@pytest.fixture +def mocked_notifications_plugin(mocker: MockerFixture) -> dict[str, mock.Mock]: + mocked_subscribe = mocker.patch( + "simcore_service_webserver.notifications.project_logs.subscribe", + autospec=True, + ) + mocked_unsubscribe = mocker.patch( + "simcore_service_webserver.notifications.project_logs.unsubscribe", + autospec=True, + ) + + return {"subscribe": mocked_subscribe, "unsubscribe": mocked_unsubscribe} + + @pytest.fixture def mock_progress_bar(mocker: MockerFixture) -> Any: sub_progress = Mock()