From c370521f7f3516d7ef4a6a27f6a70bbd660746d1 Mon Sep 17 00:00:00 2001 From: Sylvain <35365065+sanderegg@users.noreply.github.com> Date: Tue, 15 Nov 2022 22:11:51 +0100 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20Use=20common=20RabbitMQ=20?= =?UTF-8?q?client=20=20(=E2=9A=A0=EF=B8=8F=20devops)=20(#3502)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .env-devel | 1 - .github/workflows/ci-testing-deploy.yml | 1 + .../src/models_library/rabbitmq_messages.py | 21 +- .../src/pytest_simcore/docker_swarm.py | 11 +- .../src/pytest_simcore/rabbit_service.py | 82 ++---- .../src/pytest_simcore/websocket_client.py | 6 +- packages/service-library/Makefile | 5 + .../service-library/requirements/_base.in | 1 + .../service-library/requirements/_base.txt | 14 + .../service-library/requirements/_test.in | 2 + .../service-library/requirements/_test.txt | 15 +- packages/service-library/requirements/ci.txt | 1 + .../requirements/ci[aiohttp].txt | 1 + .../service-library/requirements/ci[all].txt | 1 + .../requirements/ci[fastapi].txt | 1 + packages/service-library/requirements/dev.txt | 1 + .../requirements/dev[aiohttp].txt | 1 + .../service-library/requirements/dev[all].txt | 1 + .../requirements/dev[fastapi].txt | 1 + .../src/servicelib/rabbitmq.py | 136 ++++++++++ .../src/servicelib/rabbitmq_utils.py | 13 + packages/service-library/tests/conftest.py | 5 + .../service-library/tests/test_rabbitmq.py | 192 ++++++++++++++ .../src/settings_library/rabbit.py | 8 - .../modules/comp_scheduler/dask_scheduler.py | 24 +- .../modules/comp_scheduler/factory.py | 8 +- .../docker_service_specs/sidecar.py | 1 - .../modules/projects_networks.py | 8 +- .../modules/rabbitmq.py | 95 +------ ...ic_sidecar_docker_service_specs_sidecar.py | 1 - .../with_dbs/test_api_route_computations.py | 6 +- ...t_modules_comp_scheduler_dask_scheduler.py | 15 +- ...es_dynamic_sidecar_docker_service_specs.py | 13 +- services/docker-compose-ops.yml | 4 +- services/docker-compose.yml | 6 +- .../api/_dependencies.py | 5 - .../api/containers_long_running_tasks.py | 26 +- .../simcore_service_dynamic_sidecar/cli.py | 15 +- .../core/docker_logs.py | 11 +- .../core/rabbitmq.py | 239 +++++------------- .../modules/directory_watcher/_core.py | 4 +- .../modules/long_running_tasks.py | 56 ++-- .../dynamic-sidecar/tests/unit/conftest.py | 17 +- .../tests/unit/test_api_containers.py | 6 +- .../test_api_containers_long_running_tasks.py | 6 +- .../dynamic-sidecar/tests/unit/test_cli.py | 17 +- .../unit/test_core_docker_compose_utils.py | 2 +- .../tests/unit/test_core_docker_logs.py | 7 +- .../tests/unit/test_core_utils.py | 2 +- .../unit/with_rabbit/test_core_rabbitmq.py | 94 ------- .../computation_subscribe.py | 190 ++++---------- .../integration/01/test_garbage_collection.py | 1 + .../tests/integration/02/test_rabbit.py | 192 ++++++++++---- 53 files changed, 784 insertions(+), 808 deletions(-) create mode 100644 packages/service-library/src/servicelib/rabbitmq.py create mode 100644 packages/service-library/tests/test_rabbitmq.py delete mode 100644 services/dynamic-sidecar/tests/unit/with_rabbit/test_core_rabbitmq.py diff --git a/.env-devel b/.env-devel index e678b9b72c2..211f7de19f3 100644 --- a/.env-devel +++ b/.env-devel @@ -39,7 +39,6 @@ POSTGRES_PASSWORD=adminadmin POSTGRES_PORT=5432 POSTGRES_USER=scu -RABBIT_CHANNELS='{"log": "simcore.services.logs", "progress": "simcore.services.progress", "instrumentation": "simcore.services.instrumentation", "events": "simcore.services.events"}' RABBIT_HOST=rabbit RABBIT_PASSWORD=adminadmin RABBIT_PORT=5672 diff --git a/.github/workflows/ci-testing-deploy.yml b/.github/workflows/ci-testing-deploy.yml index a95df0b506c..26253d344c1 100644 --- a/.github/workflows/ci-testing-deploy.yml +++ b/.github/workflows/ci-testing-deploy.yml @@ -1244,6 +1244,7 @@ jobs: if: ${{ contains(join(needs.*.result, ','), 'failure') }} run: | echo "::error title=ERROR::one of the unit-tests failed!" + echo "${{ join(needs.*.result, ',') }}" exit 1 - name: all the previous unit-tests were run successfully or skipped if: ${{ !contains(join(needs.*.result, ','), 'failure') }} diff --git a/packages/models-library/src/models_library/rabbitmq_messages.py b/packages/models-library/src/models_library/rabbitmq_messages.py index 7e7bf9abc68..63cc2b1010e 100644 --- a/packages/models-library/src/models_library/rabbitmq_messages.py +++ b/packages/models-library/src/models_library/rabbitmq_messages.py @@ -1,5 +1,5 @@ from enum import Enum -from typing import List, Optional, Union +from typing import Literal, Optional from models_library.projects import ProjectID from models_library.projects_nodes import NodeID @@ -15,32 +15,39 @@ class RabbitEventMessageType(str, Enum): class RabbitMessageBase(BaseModel): + channel_name: str node_id: NodeID user_id: UserID project_id: ProjectID + @classmethod + def get_channel_name(cls) -> str: + # NOTE: this returns the channel type name + return cls.__fields__["channel_name"].default + class LoggerRabbitMessage(RabbitMessageBase): - messages: List[str] + channel_name: Literal["simcore.services.logs"] = "simcore.services.logs" + messages: list[str] class EventRabbitMessage(RabbitMessageBase): + channel_name: Literal["simcore.services.events"] = "simcore.services.events" action: RabbitEventMessageType class ProgressRabbitMessage(RabbitMessageBase): + channel_name: Literal["simcore.services.progress"] = "simcore.services.progress" progress: NonNegativeFloat class InstrumentationRabbitMessage(RabbitMessageBase): + channel_name: Literal[ + "simcore.services.instrumentation" + ] = "simcore.services.instrumentation" metrics: str service_uuid: NodeID service_type: NodeClass service_key: str service_tag: str result: Optional[RunningState] = None - - -RabbitMessageTypes = Union[ - LoggerRabbitMessage, ProgressRabbitMessage, InstrumentationRabbitMessage -] diff --git a/packages/pytest-simcore/src/pytest_simcore/docker_swarm.py b/packages/pytest-simcore/src/pytest_simcore/docker_swarm.py index 44e2a227ad4..b5bb2f35b53 100644 --- a/packages/pytest-simcore/src/pytest_simcore/docker_swarm.py +++ b/packages/pytest-simcore/src/pytest_simcore/docker_swarm.py @@ -211,13 +211,10 @@ def docker_stack( # NOTE: if the migration service was already running prior to this call it must # be force updated so that it does its job. else it remains and tests will fail - migration_service_was_running_before = any( - filter( - lambda s: "migration" in s.name, docker_client.services.list() # type: ignore - ) - ) - for migration_service in filter( - lambda s: "migration" in s.name, docker_client.services.list() # type: ignore + for migration_service in ( + service + for service in docker_client.services.list() + if "migration" in service.name # type: ignore ): print( "WARNING: migration service detected before updating stack, it will be force-updated" diff --git a/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py b/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py index f3a72ad3583..b6a9f79111c 100644 --- a/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py +++ b/packages/pytest-simcore/src/pytest_simcore/rabbit_service.py @@ -2,11 +2,10 @@ # pylint: disable=unused-argument # pylint: disable=unused-variable -import json +import asyncio import logging import os import socket -from dataclasses import dataclass from typing import Any, AsyncIterator, Optional import aio_pika @@ -49,7 +48,6 @@ async def rabbit_settings( RABBIT_PASSWORD=testing_environ_vars["RABBIT_PASSWORD"], RABBIT_HOST=get_localhost_ip(), RABBIT_PORT=int(port), - RABBIT_CHANNELS=json.loads(testing_environ_vars["RABBIT_CHANNELS"]), ) await wait_till_rabbit_responsive(settings.dsn) @@ -59,7 +57,7 @@ async def rabbit_settings( @pytest.fixture(scope="function") async def rabbit_service( - rabbit_settings: RabbitSettings, monkeypatch + rabbit_settings: RabbitSettings, monkeypatch: pytest.MonkeyPatch ) -> RabbitSettings: """Sets env vars for a rabbit service is up and responsive and returns its settings as well @@ -71,7 +69,6 @@ async def rabbit_service( monkeypatch.setenv( "RABBIT_PASSWORD", rabbit_settings.RABBIT_PASSWORD.get_secret_value() ) - monkeypatch.setenv("RABBIT_CHANNELS", json.dumps(rabbit_settings.RABBIT_CHANNELS)) return rabbit_settings @@ -83,6 +80,11 @@ async def rabbit_connection( def _reconnect_callback(): pytest.fail("rabbit reconnected") + def _connection_close_callback(sender: Any, exc: Optional[BaseException] = None): + if exc and not isinstance(exc, asyncio.CancelledError): + pytest.fail(f"rabbit connection closed with exception {exc} from {sender}!") + print("<-- connection closed") + # create connection # NOTE: to show the connection name in the rabbitMQ UI see there # https://www.bountysource.com/issues/89342433-setting-custom-connection-name-via-client_properties-doesn-t-work-when-connecting-using-an-amqp-url @@ -93,6 +95,7 @@ def _reconnect_callback(): assert connection assert not connection.is_closed connection.reconnect_callbacks.add(_reconnect_callback) + connection.close_callbacks.add(_connection_close_callback) yield connection # close connection @@ -105,68 +108,13 @@ async def rabbit_channel( rabbit_connection: aio_pika.abc.AbstractConnection, ) -> AsyncIterator[aio_pika.abc.AbstractChannel]: def _channel_close_callback(sender: Any, exc: Optional[BaseException] = None): - if exc: - pytest.fail("rabbit channel closed!") - else: - print("sender was '{sender}'") + if exc and not isinstance(exc, asyncio.CancelledError): + pytest.fail(f"rabbit channel closed with exception {exc} from {sender}!") + print("<-- rabbit channel closed") # create channel - async with rabbit_connection.channel(publisher_confirms=False) as channel: + async with rabbit_connection.channel() as channel: + print("--> rabbit channel created") + channel.close_callbacks.add(_channel_close_callback) yield channel - - -@dataclass -class RabbitExchanges: - logs: aio_pika.abc.AbstractExchange - progress: aio_pika.abc.AbstractExchange - instrumentation: aio_pika.abc.AbstractExchange - - -@pytest.fixture(scope="function") -async def rabbit_exchanges( - rabbit_settings: RabbitSettings, - rabbit_channel: aio_pika.Channel, -) -> RabbitExchanges: - """ - Declares and returns 'log' and 'instrumentation' exchange channels with rabbit - """ - - # declare log exchange - LOG_EXCHANGE_NAME: str = rabbit_settings.RABBIT_CHANNELS["log"] - logs_exchange = await rabbit_channel.declare_exchange( - LOG_EXCHANGE_NAME, aio_pika.ExchangeType.FANOUT - ) - assert logs_exchange - - # declare progress exchange - PROGRESS_EXCHANGE_NAME: str = rabbit_settings.RABBIT_CHANNELS["progress"] - progress_exchange = await rabbit_channel.declare_exchange( - PROGRESS_EXCHANGE_NAME, aio_pika.ExchangeType.FANOUT - ) - assert progress_exchange - - # declare instrumentation exchange - INSTRUMENTATION_EXCHANGE_NAME: str = rabbit_settings.RABBIT_CHANNELS[ - "instrumentation" - ] - instrumentation_exchange = await rabbit_channel.declare_exchange( - INSTRUMENTATION_EXCHANGE_NAME, aio_pika.ExchangeType.FANOUT - ) - assert instrumentation_exchange - - return RabbitExchanges(logs_exchange, progress_exchange, instrumentation_exchange) - - -@pytest.fixture(scope="function") -async def rabbit_queue( - rabbit_channel: aio_pika.Channel, - rabbit_exchanges: RabbitExchanges, -) -> AsyncIterator[aio_pika.abc.AbstractQueue]: - queue = await rabbit_channel.declare_queue(exclusive=True) - assert queue - - # Binding queue to exchange - await queue.bind(rabbit_exchanges.logs) - await queue.bind(rabbit_exchanges.progress) - await queue.bind(rabbit_exchanges.instrumentation) - yield queue + assert channel.is_closed diff --git a/packages/pytest-simcore/src/pytest_simcore/websocket_client.py b/packages/pytest-simcore/src/pytest_simcore/websocket_client.py index 536a9d1d98c..ca787265bac 100644 --- a/packages/pytest-simcore/src/pytest_simcore/websocket_client.py +++ b/packages/pytest-simcore/src/pytest_simcore/websocket_client.py @@ -85,9 +85,10 @@ async def _connect( # WARNING: engineio fails with empty cookies. Expects "key=value" headers.update({"Cookie": cookie}) - logger.debug("Connecting socketio client to %s ...", url) + print(f"--> Connecting socketio client to {url} ...") await sio.connect(url, headers=headers) assert sio.sid + print("... connection done") clients.append(sio) return sio @@ -96,8 +97,9 @@ async def _connect( # cleans up clients produce by _connect(*) calls for sio in clients: if sio.connected: - logger.debug("Disconnecting socketio client %s", sio) + print(f"<--Disconnecting socketio client {sio}") await sio.disconnect() await sio.wait() + print(f"... disconnection from {sio} done.") assert not sio.sid diff --git a/packages/service-library/Makefile b/packages/service-library/Makefile index 8d8dbd296df..26c0740b96c 100644 --- a/packages/service-library/Makefile +++ b/packages/service-library/Makefile @@ -38,6 +38,7 @@ test-dev: ## runs unit tests in w/o extras --durations=10 \ --exitfirst \ --failed-first \ + --keep-docker-up \ --ignore=tests/aiohttp \ --ignore=tests/fastapi \ --pdb \ @@ -58,6 +59,7 @@ test-dev[aiohttp]: ## runs unit common tests and aiohttp extras --durations=10 \ --exitfirst \ --failed-first \ + --keep-docker-up \ --ignore=tests/fastapi \ --pdb \ -vv \ @@ -76,6 +78,7 @@ test-dev[fastapi]: ## runs unit common tests and fastapi extras --durations=10 \ --exitfirst \ --failed-first \ + --keep-docker-up \ --ignore=tests/aiohttp \ --pdb \ -vv \ @@ -93,6 +96,7 @@ test-dev[all]: ## runs unit tests w/ all extras --durations=10 \ --exitfirst \ --failed-first \ + --keep-docker-up \ --pdb \ -vv \ $(CURDIR)/tests @@ -109,6 +113,7 @@ test-ci[all]: ## runs unit tests w/ all extras --cov-report=xml \ --cov=$(APP_PACKAGE_NAME) \ --durations=10 \ + --keep-docker-up \ --log-date-format="%Y-%m-%d %H:%M:%S" \ --log-format="%(asctime)s %(levelname)s %(message)s" \ --verbose \ diff --git a/packages/service-library/requirements/_base.in b/packages/service-library/requirements/_base.in index 478f0ce5dee..5ab654a4fb6 100644 --- a/packages/service-library/requirements/_base.in +++ b/packages/service-library/requirements/_base.in @@ -6,6 +6,7 @@ aiodebug aiofiles +aio-pika pydantic pyinstrument pyyaml diff --git a/packages/service-library/requirements/_base.txt b/packages/service-library/requirements/_base.txt index 9047d10e6d3..0d596b3d361 100644 --- a/packages/service-library/requirements/_base.txt +++ b/packages/service-library/requirements/_base.txt @@ -4,10 +4,20 @@ # # pip-compile --output-file=requirements/_base.txt --strip-extras requirements/_base.in # +aio-pika==8.2.4 + # via -r requirements/_base.in aiodebug==2.3.0 # via -r requirements/_base.in aiofiles==22.1.0 # via -r requirements/_base.in +aiormq==6.4.2 + # via aio-pika +idna==3.4 + # via yarl +multidict==6.0.2 + # via yarl +pamqp==3.2.1 + # via aiormq pydantic==1.10.2 # via # -c requirements/../../../requirements/constraints.txt @@ -27,3 +37,7 @@ typing-extensions==4.4.0 # via # aiodebug # pydantic +yarl==1.8.1 + # via + # aio-pika + # aiormq diff --git a/packages/service-library/requirements/_test.in b/packages/service-library/requirements/_test.in index d2da808d106..9a2dc179bc1 100644 --- a/packages/service-library/requirements/_test.in +++ b/packages/service-library/requirements/_test.in @@ -14,6 +14,7 @@ asgi_lifespan coverage coveralls +docker faker flaky openapi-spec-validator @@ -26,3 +27,4 @@ pytest-mock pytest-runner pytest-sugar pytest-xdist +python-dotenv diff --git a/packages/service-library/requirements/_test.txt b/packages/service-library/requirements/_test.txt index 00df82821b5..5641c1dc318 100644 --- a/packages/service-library/requirements/_test.txt +++ b/packages/service-library/requirements/_test.txt @@ -42,6 +42,8 @@ coverage==6.5.0 # pytest-cov coveralls==3.3.1 # via -r requirements/_test.in +docker==6.0.1 + # via -r requirements/_test.in docopt==0.6.2 # via coveralls exceptiongroup==1.0.1 @@ -60,6 +62,7 @@ frozenlist==1.3.1 idna==3.4 # via # -c requirements/_aiohttp.txt + # -c requirements/_base.txt # -c requirements/_fastapi.txt # requests # yarl @@ -73,6 +76,7 @@ jsonschema==3.2.0 multidict==6.0.2 # via # -c requirements/_aiohttp.txt + # -c requirements/_base.txt # aiohttp # yarl openapi-schema-validator==0.2.3 @@ -85,6 +89,7 @@ openapi-spec-validator==0.4.0 # -r requirements/_test.in packaging==21.3 # via + # docker # pytest # pytest-sugar pluggy==1.0.0 @@ -126,6 +131,8 @@ pytest-xdist==3.0.2 # via -r requirements/_test.in python-dateutil==2.8.2 # via faker +python-dotenv==0.21.0 + # via -r requirements/_test.in pyyaml==5.4.1 # via # -c requirements/../../../requirements/constraints.txt @@ -133,7 +140,9 @@ pyyaml==5.4.1 # -c requirements/_base.txt # openapi-spec-validator requests==2.28.1 - # via coveralls + # via + # coveralls + # docker six==1.16.0 # via # -c requirements/_aiohttp.txt @@ -153,10 +162,14 @@ tomli==2.0.1 urllib3==1.26.12 # via # -c requirements/../../../requirements/constraints.txt + # docker # requests +websocket-client==1.4.2 + # via docker yarl==1.8.1 # via # -c requirements/_aiohttp.txt + # -c requirements/_base.txt # aiohttp # The following packages are considered to be unsafe in a requirements file: diff --git a/packages/service-library/requirements/ci.txt b/packages/service-library/requirements/ci.txt index cc585a2c23e..21133de95e0 100644 --- a/packages/service-library/requirements/ci.txt +++ b/packages/service-library/requirements/ci.txt @@ -11,6 +11,7 @@ --requirement _test.txt # installs this repo's packages +../settings-library/ ../pytest-simcore/ # current module diff --git a/packages/service-library/requirements/ci[aiohttp].txt b/packages/service-library/requirements/ci[aiohttp].txt index d48ddbe6be4..25a23441216 100644 --- a/packages/service-library/requirements/ci[aiohttp].txt +++ b/packages/service-library/requirements/ci[aiohttp].txt @@ -12,6 +12,7 @@ --requirement _test.txt # installs this repo's packages +../settings-library/ ../pytest-simcore/ # current module diff --git a/packages/service-library/requirements/ci[all].txt b/packages/service-library/requirements/ci[all].txt index ba86387157c..81e9018cc9f 100644 --- a/packages/service-library/requirements/ci[all].txt +++ b/packages/service-library/requirements/ci[all].txt @@ -13,6 +13,7 @@ --requirement _test.txt # installs this repo's packages +../settings-library/ ../pytest-simcore/ # current module diff --git a/packages/service-library/requirements/ci[fastapi].txt b/packages/service-library/requirements/ci[fastapi].txt index cdff2e5b228..2e9ac0f68a6 100644 --- a/packages/service-library/requirements/ci[fastapi].txt +++ b/packages/service-library/requirements/ci[fastapi].txt @@ -12,6 +12,7 @@ --requirement _test.txt # installs this repo's packages +../settings-library/ ../pytest-simcore/ # current module diff --git a/packages/service-library/requirements/dev.txt b/packages/service-library/requirements/dev.txt index 47bb384951c..e54aeb608c2 100644 --- a/packages/service-library/requirements/dev.txt +++ b/packages/service-library/requirements/dev.txt @@ -12,6 +12,7 @@ --requirement _tools.txt # installs this repo's packages +--editable ../settings-library/ --editable ../pytest-simcore/ # current module diff --git a/packages/service-library/requirements/dev[aiohttp].txt b/packages/service-library/requirements/dev[aiohttp].txt index ad7518fbbfc..298d255dd6a 100644 --- a/packages/service-library/requirements/dev[aiohttp].txt +++ b/packages/service-library/requirements/dev[aiohttp].txt @@ -13,6 +13,7 @@ --requirement _tools.txt # installs this repo's packages +--editable ../settings-library/ --editable ../pytest-simcore/ # current module diff --git a/packages/service-library/requirements/dev[all].txt b/packages/service-library/requirements/dev[all].txt index f94292f1bff..c9d4ba4cd94 100644 --- a/packages/service-library/requirements/dev[all].txt +++ b/packages/service-library/requirements/dev[all].txt @@ -14,6 +14,7 @@ --requirement _tools.txt # installs this repo's packages +--editable ../settings-library/ --editable ../pytest-simcore/ # current module diff --git a/packages/service-library/requirements/dev[fastapi].txt b/packages/service-library/requirements/dev[fastapi].txt index c6474c2e494..126db34eb20 100644 --- a/packages/service-library/requirements/dev[fastapi].txt +++ b/packages/service-library/requirements/dev[fastapi].txt @@ -13,6 +13,7 @@ --requirement _tools.txt # installs this repo's packages +--editable ../settings-library/ --editable ../pytest-simcore/ # current module diff --git a/packages/service-library/src/servicelib/rabbitmq.py b/packages/service-library/src/servicelib/rabbitmq.py new file mode 100644 index 00000000000..cd8870683bb --- /dev/null +++ b/packages/service-library/src/servicelib/rabbitmq.py @@ -0,0 +1,136 @@ +import asyncio +import logging +import os +import socket +from dataclasses import dataclass, field +from typing import Any, Awaitable, Callable, Final, Optional + +import aio_pika +from servicelib.logging_utils import log_context +from settings_library.rabbit import RabbitSettings + +log = logging.getLogger(__name__) + + +def _connection_close_callback(sender: Any, exc: Optional[BaseException]) -> None: + if exc: + if isinstance(exc, asyncio.CancelledError): + log.info("Rabbit connection was cancelled") + else: + log.error( + "Rabbit connection closed with exception from %s:%s", + sender, + exc, + ) + + +def _channel_close_callback(sender: Any, exc: Optional[BaseException]) -> None: + if exc: + if isinstance(exc, asyncio.CancelledError): + log.info("Rabbit channel was cancelled") + else: + log.error( + "Rabbit channel closed with exception from %s:%s", + sender, + exc, + ) + + +async def _get_connection( + rabbit_broker: str, connection_name: str +) -> aio_pika.abc.AbstractRobustConnection: + # NOTE: to show the connection name in the rabbitMQ UI see there + # https://www.bountysource.com/issues/89342433-setting-custom-connection-name-via-client_properties-doesn-t-work-when-connecting-using-an-amqp-url + # + url = f"{rabbit_broker}?name={connection_name}_{socket.gethostname()}_{os.getpid()}" + connection = await aio_pika.connect_robust( + url, client_properties={"connection_name": connection_name} + ) + connection.close_callbacks.add(_connection_close_callback) + return connection + + +MessageHandler = Callable[[Any], Awaitable[bool]] +Message = str + +_MINUTE: Final[int] = 60 +_RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_S: Final[int] = 15 * _MINUTE + + +@dataclass +class RabbitMQClient: + client_name: str + settings: RabbitSettings + _connection_pool: Optional[aio_pika.pool.Pool] = field(init=False, default=None) + _channel_pool: Optional[aio_pika.pool.Pool] = field(init=False, default=None) + + def __post_init__(self): + # recommendations are 1 connection per process + self._connection_pool = aio_pika.pool.Pool( + _get_connection, self.settings.dsn, self.client_name, max_size=1 + ) + # channels are not thread safe, what about python? + self._channel_pool = aio_pika.pool.Pool(self._get_channel, max_size=10) + + async def close(self) -> None: + with log_context(log, logging.INFO, msg="Closing connection to RabbitMQ"): + assert self._channel_pool # nosec + await self._channel_pool.close() + assert self._connection_pool # nosec + await self._connection_pool.close() + + async def _get_channel(self) -> aio_pika.abc.AbstractChannel: + assert self._connection_pool # nosec + async with self._connection_pool.acquire() as connection: + connection: aio_pika.RobustConnection + channel = await connection.channel() + channel.close_callbacks.add(_channel_close_callback) + return channel + + async def subscribe( + self, exchange_name: str, message_handler: MessageHandler + ) -> None: + assert self._channel_pool # nosec + async with self._channel_pool.acquire() as channel: + channel: aio_pika.RobustChannel + _DEFAULT_PREFETCH_VALUE = 10 # this value is set to the default for now + await channel.set_qos(_DEFAULT_PREFETCH_VALUE) + + exchange = await channel.declare_exchange( + exchange_name, aio_pika.ExchangeType.FANOUT, durable=True + ) + + # NOTE: durable=True makes the queue persistent between RabbitMQ restarts/crashes + # consumer/publisher must set the same configuration for same queue + # exclusive means that the queue is only available for THIS very client + # and will be deleted when the client disconnects + queue = await channel.declare_queue( + durable=True, + exclusive=True, + arguments={"x-message-ttl": _RABBIT_QUEUE_MESSAGE_DEFAULT_TTL_S}, + ) + await queue.bind(exchange) + + async def _on_message( + message: aio_pika.abc.AbstractIncomingMessage, + ) -> None: + async with message.process(requeue=True): + with log_context( + log, logging.DEBUG, msg=f"Message received {message}" + ): + if not await message_handler(message.body): + await message.nack() + + await queue.consume(_on_message) + + async def publish(self, exchange_name: str, message: Message) -> None: + assert self._channel_pool # nosec + async with self._channel_pool.acquire() as channel: + channel: aio_pika.RobustChannel + exchange = await channel.declare_exchange( + exchange_name, aio_pika.ExchangeType.FANOUT, durable=True + ) + await exchange.publish( + aio_pika.Message(message.encode()), + routing_key="", + ) diff --git a/packages/service-library/src/servicelib/rabbitmq_utils.py b/packages/service-library/src/servicelib/rabbitmq_utils.py index 743c802af5b..d890cf71ed7 100644 --- a/packages/service-library/src/servicelib/rabbitmq_utils.py +++ b/packages/service-library/src/servicelib/rabbitmq_utils.py @@ -3,10 +3,14 @@ import logging from typing import Final, Optional +import aio_pika +from tenacity import retry from tenacity.before_sleep import before_sleep_log from tenacity.stop import stop_after_delay from tenacity.wait import wait_fixed +from .logging_utils import log_context + log = logging.getLogger(__file__) @@ -25,3 +29,12 @@ def __init__(self, logger: Optional[logging.Logger] = None): before_sleep=before_sleep_log(logger, logging.WARNING), reraise=True, ) + + +@retry(**RabbitMQRetryPolicyUponInitialization().kwargs) +async def wait_till_rabbitmq_responsive(url: str) -> bool: + """Check if something responds to ``url``""" + with log_context(log, logging.INFO, msg=f"checking RabbitMQ connection at {url=}"): + connection = await aio_pika.connect(url) + await connection.close() + return True diff --git a/packages/service-library/tests/conftest.py b/packages/service-library/tests/conftest.py index a964ef525d8..939d6fd6730 100644 --- a/packages/service-library/tests/conftest.py +++ b/packages/service-library/tests/conftest.py @@ -12,9 +12,14 @@ from faker import Faker pytest_plugins = [ + "pytest_simcore.docker_compose", + "pytest_simcore.docker_swarm", + "pytest_simcore.monkeypatch_extra", "pytest_simcore.pytest_global_environs", + "pytest_simcore.rabbit_service", "pytest_simcore.repository_paths", "pytest_simcore.simcore_service_library_fixtures", + "pytest_simcore.tmp_path_extra", ] diff --git a/packages/service-library/tests/test_rabbitmq.py b/packages/service-library/tests/test_rabbitmq.py new file mode 100644 index 00000000000..60c31ebf60e --- /dev/null +++ b/packages/service-library/tests/test_rabbitmq.py @@ -0,0 +1,192 @@ +# pylint:disable=unused-variable +# pylint:disable=unused-argument +# pylint:disable=redefined-outer-name +# pylint:disable=protected-access + + +import asyncio +from typing import AsyncIterator, Callable +from unittest import mock + +import pytest +from faker import Faker +from pytest_mock.plugin import MockerFixture +from servicelib.rabbitmq import RabbitMQClient +from settings_library.rabbit import RabbitSettings +from tenacity._asyncio import AsyncRetrying +from tenacity.retry import retry_if_exception_type +from tenacity.stop import stop_after_delay +from tenacity.wait import wait_fixed + +pytest_simcore_core_services_selection = [ + "rabbit", +] + + +@pytest.fixture +def rabbit_client_name(faker: Faker) -> str: + return faker.pystr() + + +async def test_rabbit_client(rabbit_client_name: str, rabbit_service: RabbitSettings): + client = RabbitMQClient(rabbit_client_name, rabbit_service) + assert client + # check it is correctly initialized + assert client._connection_pool + assert not client._connection_pool.is_closed + assert client._channel_pool + assert not client._channel_pool.is_closed + assert client.client_name == rabbit_client_name + assert client.settings == rabbit_service + await client.close() + assert client._connection_pool + assert client._connection_pool.is_closed + assert client._channel_pool + assert client._channel_pool.is_closed + + +@pytest.fixture +async def rabbitmq_client( + rabbit_service: RabbitSettings, +) -> AsyncIterator[Callable[[str], RabbitMQClient]]: + created_clients = [] + + def _creator(client_name: str) -> RabbitMQClient: + client = RabbitMQClient(f"pytest_{client_name}", rabbit_service) + assert client + assert client._connection_pool + assert not client._connection_pool.is_closed + assert client._channel_pool + assert not client._channel_pool.is_closed + assert client.client_name == f"pytest_{client_name}" + assert client.settings == rabbit_service + created_clients.append(client) + return client + + yield _creator + # cleanup, properly close the clients + await asyncio.gather(*(client.close() for client in created_clients)) + for client in created_clients: + assert client._channel_pool + assert client._channel_pool.is_closed + + +@pytest.fixture +def random_exchange_name(faker: Faker) -> str: + return f"pytest_fake_exchange_{faker.pystr()}" + + +async def _assert_message_received( + mocked_message_parser: mock.AsyncMock, + expected_call_count: int, + expected_message: str, +) -> None: + async for attempt in AsyncRetrying( + wait=wait_fixed(0.1), + stop=stop_after_delay(5), + retry=retry_if_exception_type(AssertionError), + reraise=True, + ): + with attempt: + # NOTE: this sleep is here to ensure that there are not multiple messages coming in + await asyncio.sleep(1) + assert mocked_message_parser.call_count == expected_call_count + if expected_call_count == 1: + mocked_message_parser.assert_called_once_with(expected_message.encode()) + elif expected_call_count == 0: + mocked_message_parser.assert_not_called() + else: + mocked_message_parser.assert_called_with(expected_message.encode()) + + +async def test_rabbit_client_pub_sub_message_is_lost_if_no_consumer_present( + rabbitmq_client: Callable[[str], RabbitMQClient], + random_exchange_name: str, + mocker: MockerFixture, + faker: Faker, +): + consumer = rabbitmq_client("consumer") + publisher = rabbitmq_client("publisher") + + message = faker.text() + + mocked_message_parser = mocker.AsyncMock(return_value=True) + await publisher.publish(random_exchange_name, message) + await consumer.subscribe(random_exchange_name, mocked_message_parser) + await _assert_message_received(mocked_message_parser, 0, "") + + +async def test_rabbit_client_pub_sub( + rabbitmq_client: Callable[[str], RabbitMQClient], + random_exchange_name: str, + mocker: MockerFixture, + faker: Faker, +): + consumer = rabbitmq_client("consumer") + publisher = rabbitmq_client("publisher") + + message = faker.text() + + mocked_message_parser = mocker.AsyncMock(return_value=True) + await consumer.subscribe(random_exchange_name, mocked_message_parser) + await publisher.publish(random_exchange_name, message) + await _assert_message_received(mocked_message_parser, 1, message) + + +@pytest.mark.parametrize("num_subs", [10]) +async def test_rabbit_client_pub_many_subs( + rabbitmq_client: Callable[[str], RabbitMQClient], + random_exchange_name: str, + mocker: MockerFixture, + faker: Faker, + num_subs: int, +): + consumers = (rabbitmq_client(f"consumer_{n}") for n in range(num_subs)) + mocked_message_parsers = ( + mocker.AsyncMock(return_value=True) for _ in range(num_subs) + ) + + publisher = rabbitmq_client("publisher") + message = faker.text() + + await asyncio.gather( + *( + consumer.subscribe(random_exchange_name, parser) + for consumer, parser in zip(consumers, mocked_message_parsers) + ) + ) + + await publisher.publish(random_exchange_name, message) + await asyncio.gather( + *( + _assert_message_received(parser, 1, message) + for parser in mocked_message_parsers + ) + ) + + +async def test_rabbit_client_pub_sub_republishes_if_exception_raised( + rabbitmq_client: Callable[[str], RabbitMQClient], + random_exchange_name: str, + mocker: MockerFixture, + faker: Faker, +): + publisher = rabbitmq_client("publisher") + consumer = rabbitmq_client("consumer") + + message = faker.text() + + def _raise_once_then_true(*args, **kwargs): + _raise_once_then_true.calls += 1 + + if _raise_once_then_true.calls == 1: + raise KeyError("this is a test!") + if _raise_once_then_true.calls == 2: + return False + return True + + _raise_once_then_true.calls = 0 + mocked_message_parser = mocker.AsyncMock(side_effect=_raise_once_then_true) + await consumer.subscribe(random_exchange_name, mocked_message_parser) + await publisher.publish(random_exchange_name, message) + await _assert_message_received(mocked_message_parser, 3, message) diff --git a/packages/settings-library/src/settings_library/rabbit.py b/packages/settings-library/src/settings_library/rabbit.py index 39ce527e671..5ace4430263 100644 --- a/packages/settings-library/src/settings_library/rabbit.py +++ b/packages/settings-library/src/settings_library/rabbit.py @@ -28,14 +28,6 @@ class RabbitSettings(BaseCustomSettings): RABBIT_USER: str = "simcore" RABBIT_PASSWORD: SecretStr = SecretStr("simcore") - # channels - RABBIT_CHANNELS: Channels = { - "log": "simcore.services.logs", - "progress": "simcore.services.progress", - "instrumentation": "simcore.services.instrumentation", - "events": "simcore.services.events", - } - @cached_property def dsn(self) -> str: return RabbitDsn.build( diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py index dd6e1ac5691..c05e1cbf495 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/dask_scheduler.py @@ -2,7 +2,7 @@ import logging from contextlib import asynccontextmanager from dataclasses import dataclass -from typing import AsyncIterator, Dict, List, Tuple, Union +from typing import AsyncIterator, Union from dask_task_models_library.container_tasks.errors import TaskCancelledError from dask_task_models_library.container_tasks.events import ( @@ -76,12 +76,12 @@ async def _start_tasks( user_id: UserID, project_id: ProjectID, cluster_id: ClusterID, - scheduled_tasks: Dict[NodeID, Image], + scheduled_tasks: dict[NodeID, Image], ): # now transfer the pipeline to the dask scheduler async with _cluster_dask_client(user_id, cluster_id, self) as client: - task_job_ids: List[ - Tuple[NodeID, str] + task_job_ids: list[ + tuple[NodeID, str] ] = await client.send_computation_tasks( user_id=user_id, project_id=project_id, @@ -106,13 +106,13 @@ async def _start_tasks( ) async def _get_tasks_status( - self, user_id: UserID, cluster_id: ClusterID, tasks: List[CompTaskAtDB] - ) -> List[RunningState]: + self, user_id: UserID, cluster_id: ClusterID, tasks: list[CompTaskAtDB] + ) -> list[RunningState]: async with _cluster_dask_client(user_id, cluster_id, self) as client: return await client.get_tasks_status([f"{t.job_id}" for t in tasks]) async def _stop_tasks( - self, user_id: UserID, cluster_id: ClusterID, tasks: List[CompTaskAtDB] + self, user_id: UserID, cluster_id: ClusterID, tasks: list[CompTaskAtDB] ) -> None: async with _cluster_dask_client(user_id, cluster_id, self) as client: await asyncio.gather( @@ -120,7 +120,7 @@ async def _stop_tasks( ) async def _process_completed_tasks( - self, user_id: UserID, cluster_id: ClusterID, tasks: List[CompTaskAtDB] + self, user_id: UserID, cluster_id: ClusterID, tasks: list[CompTaskAtDB] ) -> None: try: async with _cluster_dask_client(user_id, cluster_id, self) as client: @@ -199,7 +199,7 @@ async def _process_task_result( service_tag=service_version, result=task_final_state, ) - await self.rabbitmq_client.publish_message(message) + await self.rabbitmq_client.publish(message.channel_name, message.json()) await CompTasksRepository(self.db_engine).set_project_tasks_state( task.project_id, [task.node_id], task_final_state, errors=errors @@ -226,7 +226,7 @@ async def _task_state_change_handler(self, event: str) -> None: service_key=service_key, service_tag=service_version, ) - await self.rabbitmq_client.publish_message(message) + await self.rabbitmq_client.publish(message.channel_name, message.json()) await CompTasksRepository(self.db_engine).set_project_tasks_state( project_id, [node_id], task_state_event.state @@ -242,7 +242,7 @@ async def _task_progress_change_handler(self, event: str) -> None: node_id=node_id, progress=task_progress_event.progress, ) - await self.rabbitmq_client.publish_message(message) + await self.rabbitmq_client.publish(message.channel_name, message.json()) async def _task_log_change_handler(self, event: str) -> None: task_log_event = TaskLogEvent.parse_raw(event) @@ -255,4 +255,4 @@ async def _task_log_change_handler(self, event: str) -> None: messages=[task_log_event.log], ) - await self.rabbitmq_client.publish_message(message) + await self.rabbitmq_client.publish(message.channel_name, message.json()) diff --git a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/factory.py b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/factory.py index ead88f02c30..87346d69aa8 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/factory.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/comp_scheduler/factory.py @@ -1,5 +1,5 @@ import logging -from typing import List, cast +from typing import cast from fastapi import FastAPI from models_library.clusters import DEFAULT_CLUSTER_ID @@ -7,7 +7,7 @@ from ...core.errors import ConfigurationError from ...models.domains.comp_runs import CompRunsAtDB -from ...modules.rabbitmq import RabbitMQClient +from ...modules.rabbitmq import get_rabbitmq_client from ...utils.scheduler import SCHEDULED_STATES, get_repository from ..db.repositories.comp_runs import CompRunsRepository from .base_scheduler import BaseCompScheduler, ScheduledPipelineParams @@ -27,7 +27,7 @@ async def create_from_db(app: FastAPI) -> BaseCompScheduler: ) # get currently scheduled runs - runs: List[CompRunsAtDB] = await runs_repository.list( + runs: list[CompRunsAtDB] = await runs_repository.list( filter_by_state=SCHEDULED_STATES ) @@ -40,7 +40,7 @@ async def create_from_db(app: FastAPI) -> BaseCompScheduler: return DaskScheduler( settings=app.state.settings.DIRECTOR_V2_COMPUTATIONAL_BACKEND, dask_clients_pool=DaskClientsPool.instance(app), - rabbitmq_client=RabbitMQClient.instance(app), + rabbitmq_client=get_rabbitmq_client(app), db_engine=db_engine, scheduled_pipelines={ (r.user_id, r.project_uuid, r.iteration): ScheduledPipelineParams( diff --git a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py index df6603b65d2..0870aecfb6c 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/dynamic_sidecar/docker_service_specs/sidecar.py @@ -62,7 +62,6 @@ def _get_environment_variables( "POSTGRES_USER": f"{app_settings.POSTGRES.POSTGRES_USER}", "R_CLONE_ENABLED": f"{r_clone_settings.R_CLONE_ENABLED}", "R_CLONE_PROVIDER": r_clone_settings.R_CLONE_PROVIDER, - "RABBIT_CHANNELS": json_dumps(rabbit_settings.RABBIT_CHANNELS), "RABBIT_HOST": f"{rabbit_settings.RABBIT_HOST}", "RABBIT_PASSWORD": f"{rabbit_settings.RABBIT_PASSWORD.get_secret_value()}", "RABBIT_PORT": f"{rabbit_settings.RABBIT_PORT}", diff --git a/services/director-v2/src/simcore_service_director_v2/modules/projects_networks.py b/services/director-v2/src/simcore_service_director_v2/modules/projects_networks.py index 92c83b7b83b..4868de3e860 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/projects_networks.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/projects_networks.py @@ -1,6 +1,6 @@ import logging import urllib.parse -from typing import NamedTuple, Set +from typing import NamedTuple from uuid import UUID from models_library.projects import ProjectAtDB, ProjectID, Workbench @@ -85,7 +85,7 @@ async def _send_network_configuration_to_dynamic_sidecar( """ # REMOVING - to_remove_items: Set[_ToRemove] = set() + to_remove_items: set[_ToRemove] = set() # if network no longer exist remove it from all nodes for new_network_name, node_ids_and_aliases in new_networks_with_aliases.items(): @@ -139,7 +139,7 @@ async def _send_network_configuration_to_dynamic_sidecar( ) # ADDING - to_add_items: Set[_ToAdd] = set() + to_add_items: set[_ToAdd] = set() # all aliases which are different or missing should be added for new_network_name, node_ids_and_aliases in new_networks_with_aliases.items(): existing_node_ids_and_aliases = existing_networks_with_aliases.get( @@ -216,7 +216,7 @@ async def _get_networks_with_aliases_for_default_network( ) ], ) - await rabbitmq_client.publish_message(message) + await rabbitmq_client.publish(message.channel_name, message.json()) continue new_networks_with_aliases[default_network][f"{node_uuid}"] = network_alias diff --git a/services/director-v2/src/simcore_service_director_v2/modules/rabbitmq.py b/services/director-v2/src/simcore_service_director_v2/modules/rabbitmq.py index bb930c67ea1..ea854eb93d0 100644 --- a/services/director-v2/src/simcore_service_director_v2/modules/rabbitmq.py +++ b/services/director-v2/src/simcore_service_director_v2/modules/rabbitmq.py @@ -1,102 +1,35 @@ import logging -from dataclasses import dataclass +from typing import cast -import aio_pika from fastapi import FastAPI -from models_library.rabbitmq_messages import ( - InstrumentationRabbitMessage, - LoggerRabbitMessage, - ProgressRabbitMessage, - RabbitMessageTypes, -) +from servicelib.rabbitmq import RabbitMQClient +from servicelib.rabbitmq_utils import wait_till_rabbitmq_responsive from settings_library.rabbit import RabbitSettings -from tenacity import retry -from tenacity.before_sleep import before_sleep_log -from tenacity.wait import wait_random from ..core.errors import ConfigurationError logger = logging.getLogger(__name__) -rabbitmq_retry_policy = dict( - wait=wait_random(5, 10), - before_sleep=before_sleep_log(logger, logging.WARNING), - reraise=True, -) - - def setup(app: FastAPI) -> None: - @retry(**rabbitmq_retry_policy) async def on_startup() -> None: - app.state.rabbitmq_client = await RabbitMQClient.create(app) + settings: RabbitSettings = app.state.settings.DIRECTOR_V2_RABBITMQ + await wait_till_rabbitmq_responsive(settings.dsn) + app.state.rabbitmq_client = RabbitMQClient( + client_name="director-v2", settings=settings + ) async def on_shutdown() -> None: if app.state.rabbitmq_client: - await app.state.rabbitmq_client.delete() - del app.state.rabbitmq_client # type: ignore + await app.state.rabbitmq_client.close() app.add_event_handler("startup", on_startup) app.add_event_handler("shutdown", on_shutdown) -_MESSAGE_TO_EXCHANGE_MAP = [ - (LoggerRabbitMessage, "log"), - (ProgressRabbitMessage, "progress"), - (InstrumentationRabbitMessage, "instrumentation"), -] - - -@dataclass -class RabbitMQClient: - app: FastAPI - connection: aio_pika.RobustConnection - channel: aio_pika.RobustChannel - exchanges: dict[str, aio_pika.RobustExchange] - - @classmethod - async def create(cls, app: FastAPI) -> "RabbitMQClient": - settings: RabbitSettings = app.state.settings.DIRECTOR_V2_RABBITMQ - connection: aio_pika.RobustConnection = await aio_pika.connect_robust( - settings.dsn + f"?name={__name__}_{id(app)}", - client_properties={"connection_name": f"director-v2_{id(app)}"}, +def get_rabbitmq_client(app: FastAPI) -> RabbitMQClient: + if not hasattr(app.state, "rabbitmq_client"): + raise ConfigurationError( + "RabbitMQ client is not available. Please check the configuration." ) - channel = await connection.channel() - exchanges = {} - for exchange_name in ["log", "progress", "instrumentation"]: - exchanges[exchange_name] = await channel.declare_exchange( - settings.RABBIT_CHANNELS[exchange_name], aio_pika.ExchangeType.FANOUT - ) - - return cls(app=app, connection=connection, channel=channel, exchanges=exchanges) - - @classmethod - def instance(cls, app: FastAPI) -> "RabbitMQClient": - if not hasattr(app.state, "rabbitmq_client"): - raise ConfigurationError( - "RabbitMQ client is not available. Please check the configuration." - ) - return app.state.rabbitmq_client - - async def delete(self) -> None: - await self.connection.close() - - async def publish_message( - self, - message: RabbitMessageTypes, - ) -> None: - def get_exchange(message) -> aio_pika.Exchange: - for message_type, exchange_name in _MESSAGE_TO_EXCHANGE_MAP: - if isinstance(message, message_type): - assert exchange_name in self.exchanges # nosec - return self.exchanges[exchange_name] - - raise ValueError(f"message '{message}' type is of incorrect type") - - try: - await get_exchange(message).publish( - aio_pika.Message(message.json().encode(encoding="utf-8")), - routing_key="", - ) - except ValueError: - logger.warning("Unsupported rabbit message sent:", exc_info=True) + return cast(RabbitMQClient, app.state.rabbitmq_client) diff --git a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_docker_service_specs_sidecar.py b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_docker_service_specs_sidecar.py index 7aecc4de23b..9ef633a1a2a 100644 --- a/services/director-v2/tests/unit/test_modules_dynamic_sidecar_docker_service_specs_sidecar.py +++ b/services/director-v2/tests/unit/test_modules_dynamic_sidecar_docker_service_specs_sidecar.py @@ -33,7 +33,6 @@ "POSTGRES_USER", "R_CLONE_ENABLED", "R_CLONE_PROVIDER", - "RABBIT_CHANNELS", "RABBIT_HOST", "RABBIT_PASSWORD", "RABBIT_PORT", diff --git a/services/director-v2/tests/unit/with_dbs/test_api_route_computations.py b/services/director-v2/tests/unit/with_dbs/test_api_route_computations.py index 06108e2bf05..dcc7900a589 100644 --- a/services/director-v2/tests/unit/with_dbs/test_api_route_computations.py +++ b/services/director-v2/tests/unit/with_dbs/test_api_route_computations.py @@ -26,6 +26,7 @@ from pydantic import AnyHttpUrl, parse_obj_as from pytest_mock.plugin import MockerFixture from pytest_simcore.helpers.typing_env import EnvVarsDict +from settings_library.rabbit import RabbitSettings from simcore_postgres_database.models.comp_pipeline import StateType from simcore_postgres_database.models.comp_tasks import NodeClass from simcore_service_director_v2.models.domains.comp_pipelines import CompPipelineAtDB @@ -38,9 +39,7 @@ from simcore_service_director_v2.models.schemas.services import ServiceExtras from starlette import status -pytest_simcore_core_services_selection = [ - "postgres", -] +pytest_simcore_core_services_selection = ["postgres", "rabbit"] pytest_simcore_ops_services_selection = [ "adminer", ] @@ -58,6 +57,7 @@ def mocked_rabbit_mq_client(mocker: MockerFixture): def minimal_configuration( mock_env: EnvVarsDict, postgres_host_config: dict[str, str], + rabbit_service: RabbitSettings, monkeypatch: pytest.MonkeyPatch, mocked_rabbit_mq_client: None, ): diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py index e1cecaeb849..b8f074580d5 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_comp_scheduler_dask_scheduler.py @@ -33,6 +33,7 @@ from pytest import MonkeyPatch from pytest_mock.plugin import MockerFixture from pytest_simcore.helpers.typing_env import EnvVarsDict +from settings_library.rabbit import RabbitSettings from simcore_postgres_database.models.comp_pipeline import StateType from simcore_postgres_database.models.comp_runs import comp_runs from simcore_postgres_database.models.comp_tasks import NodeClass @@ -56,28 +57,18 @@ from simcore_service_director_v2.utils.scheduler import COMPLETED_STATES from starlette.testclient import TestClient -pytest_simcore_core_services_selection = [ - "postgres", -] +pytest_simcore_core_services_selection = ["postgres", "rabbit"] pytest_simcore_ops_services_selection = [ "adminer", ] -@pytest.fixture() -def mocked_rabbit_mq_client(mocker: MockerFixture): - mocker.patch( - "simcore_service_director_v2.core.application.rabbitmq.RabbitMQClient", - autospec=True, - ) - - @pytest.fixture def minimal_dask_scheduler_config( mock_env: EnvVarsDict, postgres_host_config: dict[str, str], monkeypatch: MonkeyPatch, - mocked_rabbit_mq_client: None, + rabbit_service: RabbitSettings, ) -> None: """set a minimal configuration for testing the dask connection only""" monkeypatch.setenv("DIRECTOR_V2_DYNAMIC_SIDECAR_ENABLED", "false") diff --git a/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_service_specs.py b/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_service_specs.py index 9694551bf91..d45243cf4b6 100644 --- a/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_service_specs.py +++ b/services/director-v2/tests/unit/with_dbs/test_modules_dynamic_sidecar_docker_service_specs.py @@ -188,14 +188,6 @@ def expected_dynamic_sidecar_spec(run_id: RunID) -> dict[str, Any]: "POSTGRES_USER": "test", "POSTGRES_PASSWORD": "test", "POSTGRES_ENDPOINT": "localhost:5432", - "RABBIT_CHANNELS": '{"log": ' - '"simcore.services.logs", ' - '"progress": ' - '"simcore.services.progress", ' - '"instrumentation": ' - '"simcore.services.instrumentation", ' - '"events": ' - '"simcore.services.events"}', "RABBIT_HOST": "rabbit", "RABBIT_PASSWORD": "adminadmin", "RABBIT_PORT": "5672", @@ -452,10 +444,7 @@ async def test_merge_dynamic_sidecar_specs_with_user_specific_specs( sorted_dict["TaskTemplate"]["ContainerSpec"]["Env"][key] = json.dumps( unsorted_list.sort() ) - assert ( - dynamic_sidecar_spec_dict - == expected_dynamic_sidecar_spec_dict - ) + assert dynamic_sidecar_spec_dict == expected_dynamic_sidecar_spec_dict catalog_client = CatalogClient.instance(minimal_app) user_service_specs: dict[ diff --git a/services/docker-compose-ops.yml b/services/docker-compose-ops.yml index 2ecef10c19c..376884fcacd 100644 --- a/services/docker-compose-ops.yml +++ b/services/docker-compose-ops.yml @@ -64,9 +64,9 @@ services: "--fail", "http://localhost:9000/minio/health/live" ] - interval: 30s + interval: 5s timeout: 20s - retries: 3 + retries: 5 filestash: image: machines/filestash:3a01b70 diff --git a/services/docker-compose.yml b/services/docker-compose.yml index 12ef20d5957..b073d52f084 100644 --- a/services/docker-compose.yml +++ b/services/docker-compose.yml @@ -385,10 +385,10 @@ services: healthcheck: # see https://www.rabbitmq.com/monitoring.html#individual-checks for info about health-checks available in rabbitmq test: rabbitmq-diagnostics -q status - interval: 30s + interval: 5s timeout: 30s retries: 5 - start_period: 15s + start_period: 5s migration: image: ${DOCKER_REGISTRY:-itisfoundation}/migration:${DOCKER_IMAGE_TAG:-latest} @@ -431,7 +431,7 @@ services: "--dbname", "${POSTGRES_DB}" ] - interval: 15s + interval: 5s retries: 5 # NOTES: this is not yet compatible with portainer deployment but could work also for other containers # works with Docker 19.03 and not yet with Portainer 1.23.0 (see https://github.com/portainer/portainer/issues/3551) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/_dependencies.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/_dependencies.py index 08e341d96bc..c1aa98dddfa 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/_dependencies.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/_dependencies.py @@ -5,7 +5,6 @@ from fastapi import Depends, FastAPI, Request from fastapi.datastructures import State -from ..core.rabbitmq import RabbitMQ from ..core.settings import ApplicationSettings from ..models.schemas.application_health import ApplicationHealth from ..models.shared_store import SharedStore @@ -34,9 +33,5 @@ def get_shared_store(app_state: State = Depends(get_app_state)) -> SharedStore: return app_state.shared_store # type: ignore -def get_rabbitmq(app_state: State = Depends(get_app_state)) -> RabbitMQ: - return app_state.rabbitmq # type: ignore - - def get_mounted_volumes(app_state: State = Depends(get_app_state)) -> MountedVolumes: return app_state.mounted_volumes # type: ignore diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers_long_running_tasks.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers_long_running_tasks.py index db93803271e..138443fa403 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers_long_running_tasks.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/api/containers_long_running_tasks.py @@ -12,7 +12,6 @@ ) from servicelib.fastapi.requests_decorators import cancel_on_disconnect -from ..core.rabbitmq import RabbitMQ from ..core.settings import ApplicationSettings from ..models.schemas.application_health import ApplicationHealth from ..models.schemas.containers import ContainersCreate @@ -32,7 +31,6 @@ get_application, get_application_health, get_mounted_volumes, - get_rabbitmq, get_settings, get_shared_store, ) @@ -71,7 +69,6 @@ async def create_service_containers_task( # pylint: disable=too-many-arguments shared_store: SharedStore = Depends(get_shared_store), app: FastAPI = Depends(get_application), application_health: ApplicationHealth = Depends(get_application_health), - rabbitmq: RabbitMQ = Depends(get_rabbitmq), mounted_volumes: MountedVolumes = Depends(get_mounted_volumes), ) -> TaskId: assert request # nosec @@ -87,7 +84,6 @@ async def create_service_containers_task( # pylint: disable=too-many-arguments mounted_volumes=mounted_volumes, app=app, application_health=application_health, - rabbitmq=rabbitmq, ) return task_id except TaskAlreadyRunningError as e: @@ -146,7 +142,7 @@ async def state_restore_task( tasks_manager: TasksManager = Depends(get_tasks_manager), settings: ApplicationSettings = Depends(get_settings), mounted_volumes: MountedVolumes = Depends(get_mounted_volumes), - rabbitmq: RabbitMQ = Depends(get_rabbitmq), + app: FastAPI = Depends(get_application), ) -> TaskId: assert request # nosec @@ -157,7 +153,7 @@ async def state_restore_task( unique=True, settings=settings, mounted_volumes=mounted_volumes, - rabbitmq=rabbitmq, + app=app, ) return task_id except TaskAlreadyRunningError as e: @@ -179,7 +175,7 @@ async def state_restore_task( async def state_save_task( request: Request, tasks_manager: TasksManager = Depends(get_tasks_manager), - rabbitmq: RabbitMQ = Depends(get_rabbitmq), + app: FastAPI = Depends(get_application), mounted_volumes: MountedVolumes = Depends(get_mounted_volumes), settings: ApplicationSettings = Depends(get_settings), ) -> TaskId: @@ -192,7 +188,7 @@ async def state_save_task( unique=True, settings=settings, mounted_volumes=mounted_volumes, - rabbitmq=rabbitmq, + app=app, ) return task_id except TaskAlreadyRunningError as e: @@ -215,7 +211,7 @@ async def ports_inputs_pull_task( request: Request, port_keys: Optional[list[str]] = None, tasks_manager: TasksManager = Depends(get_tasks_manager), - rabbitmq: RabbitMQ = Depends(get_rabbitmq), + app: FastAPI = Depends(get_application), mounted_volumes: MountedVolumes = Depends(get_mounted_volumes), ) -> TaskId: assert request # nosec @@ -227,7 +223,7 @@ async def ports_inputs_pull_task( unique=True, port_keys=port_keys, mounted_volumes=mounted_volumes, - rabbitmq=rabbitmq, + app=app, ) return task_id except TaskAlreadyRunningError as e: @@ -250,7 +246,7 @@ async def ports_outputs_pull_task( request: Request, port_keys: Optional[list[str]] = None, tasks_manager: TasksManager = Depends(get_tasks_manager), - rabbitmq: RabbitMQ = Depends(get_rabbitmq), + app: FastAPI = Depends(get_application), mounted_volumes: MountedVolumes = Depends(get_mounted_volumes), ) -> TaskId: assert request # nosec @@ -262,7 +258,7 @@ async def ports_outputs_pull_task( unique=True, port_keys=port_keys, mounted_volumes=mounted_volumes, - rabbitmq=rabbitmq, + app=app, ) return task_id except TaskAlreadyRunningError as e: @@ -285,7 +281,7 @@ async def ports_outputs_push_task( request: Request, port_keys: Optional[list[str]] = None, tasks_manager: TasksManager = Depends(get_tasks_manager), - rabbitmq: RabbitMQ = Depends(get_rabbitmq), + app: FastAPI = Depends(get_application), mounted_volumes: MountedVolumes = Depends(get_mounted_volumes), ) -> TaskId: assert request # nosec @@ -297,7 +293,7 @@ async def ports_outputs_push_task( unique=True, port_keys=port_keys, mounted_volumes=mounted_volumes, - rabbitmq=rabbitmq, + app=app, ) return task_id except TaskAlreadyRunningError as e: @@ -322,7 +318,6 @@ async def containers_restart_task( app: FastAPI = Depends(get_application), settings: ApplicationSettings = Depends(get_settings), shared_store: SharedStore = Depends(get_shared_store), - rabbitmq: RabbitMQ = Depends(get_rabbitmq), ) -> TaskId: assert request # nosec @@ -334,7 +329,6 @@ async def containers_restart_task( app=app, settings=settings, shared_store=shared_store, - rabbitmq=rabbitmq, ) return task_id except TaskAlreadyRunningError as e: diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/cli.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/cli.py index 9b2a26a81d7..f6854fd5469 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/cli.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/cli.py @@ -7,7 +7,6 @@ from servicelib.fastapi.long_running_tasks.server import TaskProgress from settings_library.utils_cli import create_settings_command from simcore_service_dynamic_sidecar.core.application import create_base_app -from simcore_service_dynamic_sidecar.core.rabbitmq import RabbitMQ from simcore_service_dynamic_sidecar.core.settings import ApplicationSettings from simcore_service_dynamic_sidecar.modules.long_running_tasks import ( task_ports_outputs_push, @@ -41,10 +40,6 @@ async def _setup_app_for_task_execution() -> FastAPI: # setup MountedVolumes setup_mounted_fs(app) - # setup RabbitMQ - app.state.rabbitmq = RabbitMQ(app) - await app.state.rabbitmq.connect() - return app @@ -77,11 +72,8 @@ async def _async_save_state() -> None: settings: ApplicationSettings = app.state.settings mounted_volumes: MountedVolumes = app.state.mounted_volumes - rabbitmq: RabbitMQ = app.state.rabbitmq - await task_save_state( - TaskProgress.create(), settings, mounted_volumes, rabbitmq - ) + await task_save_state(TaskProgress.create(), settings, mounted_volumes, app) asyncio.run(_async_save_state()) _print_highlight("state save finished successfully") @@ -95,11 +87,8 @@ async def _async_outputs_push() -> None: app = await _setup_app_for_task_execution() mounted_volumes: MountedVolumes = app.state.mounted_volumes - rabbitmq: RabbitMQ = app.state.rabbitmq - await task_ports_outputs_push( - TaskProgress.create(), None, mounted_volumes, rabbitmq - ) + await task_ports_outputs_push(TaskProgress.create(), None, mounted_volumes, app) asyncio.run(_async_outputs_push()) _print_highlight("output ports push finished successfully") diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_logs.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_logs.py index 9d9c78d087f..7b13ad816b2 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_logs.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/docker_logs.py @@ -13,8 +13,8 @@ from fastapi import FastAPI +from ..core.rabbitmq import post_log_message from .docker_utils import docker_client -from .rabbitmq import RabbitMQ logger = logging.getLogger(__name__) @@ -40,21 +40,14 @@ async def _logs_fetcher_worker( class BackgroundLogFetcher: def __init__(self, app: FastAPI) -> None: - # requires rabbitmq to be in place - assert app.state.rabbitmq # nosec - self._app: FastAPI = app self._log_processor_tasks: dict[str, Task[None]] = {} - @property - def rabbitmq(self) -> RabbitMQ: - return self._app.state.rabbitmq # type: ignore - async def _dispatch_logs(self, image_name: str, message: str) -> None: # sending the logs to the UI to facilitate the # user debugging process - await self.rabbitmq.post_log_message(f"[{image_name}] {message}") + await post_log_message(self._app, f"[{image_name}] {message}") async def start_log_feching(self, container_name: str) -> None: self._log_processor_tasks[container_name] = create_task( diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py index 23e2431c7e9..9689fcb8637 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/core/rabbitmq.py @@ -1,212 +1,89 @@ -from __future__ import annotations - -import asyncio import logging -import os -import socket -from asyncio import CancelledError, Queue, Task -from contextlib import suppress -from typing import Any +from functools import lru_cache +from typing import Union, cast -import aio_pika from fastapi import FastAPI -from models_library.projects import ProjectID -from models_library.projects_nodes import NodeID from models_library.rabbitmq_messages import ( EventRabbitMessage, LoggerRabbitMessage, RabbitEventMessageType, + RabbitMessageBase, ) -from models_library.users import UserID -from servicelib.rabbitmq_utils import RabbitMQRetryPolicyUponInitialization -from settings_library.rabbit import RabbitSettings -from tenacity._asyncio import AsyncRetrying +from servicelib.logging_utils import log_context +from servicelib.rabbitmq import RabbitMQClient +from servicelib.rabbitmq_utils import wait_till_rabbitmq_responsive from ..core.settings import ApplicationSettings log = logging.getLogger(__file__) -# limit logs displayed -logging.getLogger("aio_pika").setLevel(logging.WARNING) - - -SLEEP_BETWEEN_SENDS: float = 1.0 - -def _close_callback(sender: Any, exc: BaseException | None) -> None: - if exc: - if isinstance(exc, CancelledError): - log.info("Rabbit connection was cancelled", exc_info=True) - else: - log.error( - "Rabbit connection closed with exception from %s:", - sender, - exc_info=True, - ) - - -def _channel_close_callback(sender: Any, exc: BaseException | None) -> None: - if exc: - log.error( - "Rabbit channel closed with exception from %s:", sender, exc_info=True - ) +async def _post_rabbit_message(app: FastAPI, message: RabbitMessageBase) -> None: + # NOTE: this check is necessary when the dy-sidecar is used on the CLI + # where the rabbit is not initialized, it's not optimal but it allows + # to run the CLI without rabbit... + if _is_rabbitmq_initialized(app): + await get_rabbitmq_client(app).publish(message.channel_name, message.json()) -async def _wait_till_rabbit_responsive(url: str) -> None: - async for attempt in AsyncRetrying( - **RabbitMQRetryPolicyUponInitialization().kwargs - ): - with attempt: - connection = await aio_pika.connect(url, timeout=1.0) - await connection.close() - - -class RabbitMQ: # pylint: disable = too-many-instance-attributes - CHANNEL_LOG = "logger" - - def __init__(self, app: FastAPI, max_messages_to_send: int = 100) -> None: - settings: ApplicationSettings = app.state.settings - - assert settings.RABBIT_SETTINGS # nosec - self._rabbit_settings: RabbitSettings = settings.RABBIT_SETTINGS - self._user_id: UserID = settings.DY_SIDECAR_USER_ID - self._project_id: ProjectID = settings.DY_SIDECAR_PROJECT_ID - self._node_id: NodeID = settings.DY_SIDECAR_NODE_ID - - self._connection: aio_pika.abc.AbstractConnection | None = None - self._channel: aio_pika.abc.AbstractChannel | None = None - self._logs_exchange: aio_pika.abc.AbstractExchange | None = None - self._events_exchange: aio_pika.abc.AbstractExchange | None = None - - self.max_messages_to_send: int = max_messages_to_send - # pylint: disable=unsubscriptable-object - self._channel_queues: dict[str, Queue[str]] = {} - self._keep_running: bool = True - self._queues_worker: Task[Any] | None = None - - async def connect(self) -> None: - url = self._rabbit_settings.dsn - log.debug("Connecting to %s", url) - await _wait_till_rabbit_responsive(url) - - # NOTE: to show the connection name in the rabbitMQ UI see there [https://www.bountysource.com/issues/89342433-setting-custom-connection-name-via-client_properties-doesn-t-work-when-connecting-using-an-amqp-url] - hostname = socket.gethostname() - self._connection = await aio_pika.connect( - url + f"?name={__name__}_{id(hostname)}_{os.getpid()}", - client_properties={ - "connection_name": f"dynamic-sidecar_{self._node_id} {hostname}" - }, - ) - self._connection.close_callbacks.add(_close_callback) +async def post_log_message(app: FastAPI, logs: Union[str, list[str]]) -> None: + if isinstance(logs, str): + logs = [logs] - log.debug("Creating channel") - self._channel = await self._connection.channel(publisher_confirms=False) - self._channel.close_callbacks.add(_channel_close_callback) + app_settings: ApplicationSettings = app.state.settings + message = LoggerRabbitMessage( + node_id=app_settings.DY_SIDECAR_NODE_ID, + user_id=app_settings.DY_SIDECAR_USER_ID, + project_id=app_settings.DY_SIDECAR_PROJECT_ID, + messages=logs, + ) - log.debug("Declaring %s exchange", self._rabbit_settings.RABBIT_CHANNELS["log"]) - self._logs_exchange = await self._channel.declare_exchange( - self._rabbit_settings.RABBIT_CHANNELS["log"], aio_pika.ExchangeType.FANOUT - ) - self._channel_queues[self.CHANNEL_LOG] = Queue() + await _post_rabbit_message(app, message) - log.debug( - "Declaring %s exchange", self._rabbit_settings.RABBIT_CHANNELS["events"] - ) - self._events_exchange = await self._channel.declare_exchange( - self._rabbit_settings.RABBIT_CHANNELS["events"], - aio_pika.ExchangeType.FANOUT, - ) - # start background worker to dispatch messages - self._keep_running = True - self._queues_worker = asyncio.create_task(self._dispatch_messages_worker()) - - async def _dispatch_messages_worker(self) -> None: - while self._keep_running: - for queue in self._channel_queues.values(): - # in order to avoid blocking when dispatching messages - # it is important to fetch them an at most the existing - # messages in the queue - messages_to_fetch = min(self.max_messages_to_send, queue.qsize()) - messages = [await queue.get() for _ in range(messages_to_fetch)] - - # currently there are no messages do not broardcast - # an empty payload - if not messages: - continue - await self._publish_messages(messages) - - await asyncio.sleep(SLEEP_BETWEEN_SENDS) - - async def _publish_messages(self, messages: list[str]) -> None: - data = LoggerRabbitMessage( - node_id=self._node_id, - user_id=self._user_id, - project_id=self._project_id, - messages=messages, - ) +async def post_sidecar_log_message(app: FastAPI, logs: str) -> None: + await post_log_message(app, f"[sidecar] {logs}") - assert self._logs_exchange # nosec - await self._logs_exchange.publish( - aio_pika.Message(body=data.json().encode()), routing_key="" - ) - async def _publish_event(self, action: RabbitEventMessageType) -> None: - data = EventRabbitMessage( - node_id=self._node_id, - user_id=self._user_id, - project_id=self._project_id, - action=action, - ) - assert self._events_exchange # nosec - await self._events_exchange.publish( - aio_pika.Message(body=data.json().encode()), routing_key="" - ) - - async def send_event_reload_iframe(self) -> None: - await self._publish_event(action=RabbitEventMessageType.RELOAD_IFRAME) - - async def post_log_message(self, log_msg: str | list[str]) -> None: - if isinstance(log_msg, str): - log_msg = [log_msg] - - for message in log_msg: - await self._channel_queues[self.CHANNEL_LOG].put(message) - - async def close(self) -> None: - # wait for queues to be empty before sending the last messages - self._keep_running = False - if self._queues_worker is not None: - self._queues_worker.cancel() - with suppress(asyncio.CancelledError): - await self._queues_worker - - if self._channel is not None: - await self._channel.close() - if self._connection is not None: - await self._connection.close() - - -async def send_message(rabbitmq: RabbitMQ, msg: str) -> None: - log.debug(msg) - await rabbitmq.post_log_message(f"[sidecar] {msg}") +async def post_event_reload_iframe(app: FastAPI) -> None: + app_settings: ApplicationSettings = app.state.settings + message = EventRabbitMessage( + node_id=app_settings.DY_SIDECAR_NODE_ID, + user_id=app_settings.DY_SIDECAR_USER_ID, + project_id=app_settings.DY_SIDECAR_PROJECT_ID, + action=RabbitEventMessageType.RELOAD_IFRAME, + ) + await _post_rabbit_message(app, message) def setup_rabbitmq(app: FastAPI) -> None: async def on_startup() -> None: - app.state.rabbitmq = RabbitMQ(app) - - log.info("Connecting to rabbitmq") - await app.state.rabbitmq.connect() - log.info("Connected to rabbitmq") + app_settings: ApplicationSettings = app.state.settings + assert app_settings.RABBIT_SETTINGS # nosec + settings = app_settings.RABBIT_SETTINGS + await wait_till_rabbitmq_responsive(settings.dsn) + with log_context(log, logging.INFO, msg="Create RabbitMQClient"): + app.state.rabbitmq_client = RabbitMQClient( + client_name=f"dynamic-sidecar_{app_settings.DY_SIDECAR_NODE_ID}", + settings=settings, + ) async def on_shutdown() -> None: - if app.state.background_log_fetcher is None: - log.warning("No rabbitmq to close") - return - - await app.state.rabbitmq.close() - log.info("stopped rabbitmq") + if app.state.rabbitmq_client: + await app.state.rabbitmq_client.close() app.add_event_handler("startup", on_startup) app.add_event_handler("shutdown", on_shutdown) + + +@lru_cache(maxsize=1) +def _is_rabbitmq_initialized(app: FastAPI) -> bool: + return hasattr(app.state, "rabbitmq_client") + + +def get_rabbitmq_client(app: FastAPI) -> RabbitMQClient: + if not _is_rabbitmq_initialized(app): + raise RuntimeError( + "RabbitMQ client is not available. Please check the configuration." + ) + return cast(RabbitMQClient, app.state.rabbitmq_client) diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/directory_watcher/_core.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/directory_watcher/_core.py index df0bed76b7d..eb8a36dacb1 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/directory_watcher/_core.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/directory_watcher/_core.py @@ -16,7 +16,7 @@ from watchdog.events import FileSystemEvent, FileSystemEventHandler from watchdog.observers.api import BaseObserver -from ...core.rabbitmq import send_message +from ...core.rabbitmq import post_sidecar_log_message from ..mounted_fs import MountedVolumes from ._watchdog_extentions import ExtendedInotifyObserver @@ -226,7 +226,7 @@ async def on_startup() -> None: mounted_volumes = app.state.mounted_volumes # nosec io_log_redirect_cb = None if app.state.settings.RABBIT_SETTINGS: - io_log_redirect_cb = functools.partial(send_message, app.state.rabbitmq) + io_log_redirect_cb = functools.partial(post_sidecar_log_message, app) logger.debug( "setting up directory watcher %s", "with redirection of logs..." if io_log_redirect_cb else "...", diff --git a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py index 6d17d42b053..d00f37117c2 100644 --- a/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py +++ b/services/dynamic-sidecar/src/simcore_service_dynamic_sidecar/modules/long_running_tasks.py @@ -22,7 +22,7 @@ ) from ..core.docker_logs import start_log_fetching, stop_log_fetching from ..core.docker_utils import get_running_containers_count_from_names -from ..core.rabbitmq import RabbitMQ, send_message +from ..core.rabbitmq import post_event_reload_iframe, post_sidecar_log_message from ..core.settings import ApplicationSettings from ..core.utils import CommandResult, assemble_container_names from ..core.validation import parse_compose_spec, validate_compose_spec @@ -88,7 +88,6 @@ async def task_create_service_containers( mounted_volumes: MountedVolumes, app: FastAPI, application_health: ApplicationHealth, - rabbitmq: RabbitMQ, ) -> list[str]: progress.update(message="validating service spec", percent=0) @@ -102,7 +101,7 @@ async def task_create_service_containers( logger.info("Validated compose-spec:\n%s", f"{shared_store.compose_spec}") - await send_message(rabbitmq, "starting service containers") + await post_sidecar_log_message(app, "starting service containers") assert shared_store.compose_spec # nosec with directory_watcher_disabled(app): @@ -122,7 +121,7 @@ async def task_create_service_containers( message = f"Finished docker-compose start with output\n{r.message}" if r.success: - await send_message(rabbitmq, "service containers started") + await post_sidecar_log_message(app, "service containers started") logger.debug(message) for container_name in shared_store.container_names: await start_log_fetching(app, container_name) @@ -130,7 +129,7 @@ async def task_create_service_containers( application_health.is_healthy = False application_health.error_message = message logger.error("Marked sidecar as unhealthy, see below for details\n:%s", message) - await send_message(rabbitmq, "could not start service containers") + await post_sidecar_log_message(app, "could not start service containers") return shared_store.container_names @@ -169,7 +168,7 @@ async def task_restore_state( progress: TaskProgress, settings: ApplicationSettings, mounted_volumes: MountedVolumes, - rabbitmq: RabbitMQ, + app: FastAPI, ) -> None: progress.update(message="checking files", percent=0.0) # first check if there are files (no max concurrency here, these are just quick REST calls) @@ -187,8 +186,8 @@ async def task_restore_state( ) progress.update(message="Downloading state", percent=0.05) - await send_message( - rabbitmq, + await post_sidecar_log_message( + app, f"Downloading state files for {existing_files}...", ) await logged_gather( @@ -198,7 +197,7 @@ async def task_restore_state( project_id=str(settings.DY_SIDECAR_PROJECT_ID), node_uuid=str(settings.DY_SIDECAR_NODE_ID), file_or_folder=path, - io_log_redirect_cb=functools.partial(send_message, rabbitmq), + io_log_redirect_cb=functools.partial(post_sidecar_log_message, app), ) for path, exists in zip(mounted_volumes.disk_state_paths(), existing_files) if exists @@ -207,7 +206,7 @@ async def task_restore_state( reraise=True, # this should raise if there is an issue ) - await send_message(rabbitmq, "Finished state downloading") + await post_sidecar_log_message(app, "Finished state downloading") progress.update(message="state restored", percent=0.99) @@ -215,14 +214,14 @@ async def task_save_state( progress: TaskProgress, settings: ApplicationSettings, mounted_volumes: MountedVolumes, - rabbitmq: RabbitMQ, + app: FastAPI, ) -> None: awaitables: deque[Awaitable[Optional[Any]]] = deque() progress.update(message="starting state save", percent=0.0) for state_path in mounted_volumes.disk_state_paths(): - await send_message(rabbitmq, f"Saving state for {state_path}") + await post_sidecar_log_message(app, f"Saving state for {state_path}") awaitables.append( data_manager.push( user_id=settings.DY_SIDECAR_USER_ID, @@ -231,14 +230,14 @@ async def task_save_state( file_or_folder=state_path, r_clone_settings=settings.rclone_settings_for_nodeports, archive_exclude_patterns=mounted_volumes.state_exclude, - io_log_redirect_cb=functools.partial(send_message, rabbitmq), + io_log_redirect_cb=functools.partial(post_sidecar_log_message, app), ) ) progress.update(message="saving state", percent=0.1) await logged_gather(*awaitables, max_concurrency=CONCURRENCY_STATE_SAVE_RESTORE) - await send_message(rabbitmq, "Finished state saving") + await post_sidecar_log_message(app, "Finished state saving") progress.update(message="finished state saving", percent=0.99) @@ -246,20 +245,20 @@ async def task_ports_inputs_pull( progress: TaskProgress, port_keys: Optional[list[str]], mounted_volumes: MountedVolumes, - rabbitmq: RabbitMQ, + app: FastAPI, ) -> int: progress.update(message="starting inputs pulling", percent=0.0) port_keys = [] if port_keys is None else port_keys - await send_message(rabbitmq, f"Pulling inputs for {port_keys}") + await post_sidecar_log_message(app, f"Pulling inputs for {port_keys}") progress.update(message="pulling inputs", percent=0.1) transferred_bytes = await nodeports.download_target_ports( nodeports.PortTypeName.INPUTS, mounted_volumes.disk_inputs_path, port_keys=port_keys, - io_log_redirect_cb=functools.partial(send_message, rabbitmq), + io_log_redirect_cb=functools.partial(post_sidecar_log_message, app), ) - await send_message(rabbitmq, "Finished pulling inputs") + await post_sidecar_log_message(app, "Finished pulling inputs") progress.update(message="finished inputs pulling", percent=0.99) return int(transferred_bytes) @@ -268,19 +267,19 @@ async def task_ports_outputs_pull( progress: TaskProgress, port_keys: Optional[list[str]], mounted_volumes: MountedVolumes, - rabbitmq: RabbitMQ, + app: FastAPI, ) -> int: progress.update(message="starting outputs pulling", percent=0.0) port_keys = [] if port_keys is None else port_keys - await send_message(rabbitmq, f"Pulling output for {port_keys}") + await post_sidecar_log_message(app, f"Pulling output for {port_keys}") transferred_bytes = await nodeports.download_target_ports( nodeports.PortTypeName.OUTPUTS, mounted_volumes.disk_outputs_path, port_keys=port_keys, - io_log_redirect_cb=functools.partial(send_message, rabbitmq), + io_log_redirect_cb=functools.partial(post_sidecar_log_message, app), ) - await send_message(rabbitmq, "Finished pulling outputs") + await post_sidecar_log_message(app, "Finished pulling outputs") progress.update(message="finished outputs pulling", percent=0.99) return int(transferred_bytes) @@ -289,18 +288,18 @@ async def task_ports_outputs_push( progress: TaskProgress, port_keys: Optional[list[str]], mounted_volumes: MountedVolumes, - rabbitmq: RabbitMQ, + app: FastAPI, ) -> None: progress.update(message="starting outputs pushing", percent=0.0) port_keys = [] if port_keys is None else port_keys - await send_message(rabbitmq, f"Pushing outputs for {port_keys}") + await post_sidecar_log_message(app, f"Pushing outputs for {port_keys}") await nodeports.upload_outputs( mounted_volumes.disk_outputs_path, port_keys=port_keys, - io_log_redirect_cb=functools.partial(send_message, rabbitmq), + io_log_redirect_cb=functools.partial(post_sidecar_log_message, app), ) - await send_message(rabbitmq, "Finished pulling outputs") + await post_sidecar_log_message(app, "Finished pulling outputs") progress.update(message="finished outputs pushing", percent=0.99) @@ -309,7 +308,6 @@ async def task_containers_restart( app: FastAPI, settings: ApplicationSettings, shared_store: SharedStore, - rabbitmq: RabbitMQ, ) -> None: progress.update(message="starting containers restart", percent=0.0) if shared_store.compose_spec is None: @@ -335,6 +333,6 @@ async def task_containers_restart( progress.update(message="started log fetching", percent=0.9) - await send_message(rabbitmq, "Service was restarted please reload the UI") - await rabbitmq.send_event_reload_iframe() + await post_sidecar_log_message(app, "Service was restarted please reload the UI") + await post_event_reload_iframe(app) progress.update(message="started log fetching", percent=0.99) diff --git a/services/dynamic-sidecar/tests/unit/conftest.py b/services/dynamic-sidecar/tests/unit/conftest.py index bbf634c4c01..81cd0b6dd33 100644 --- a/services/dynamic-sidecar/tests/unit/conftest.py +++ b/services/dynamic-sidecar/tests/unit/conftest.py @@ -45,25 +45,20 @@ def mock_registry_service(mocker: MockerFixture) -> AsyncMock: @pytest.fixture def mock_core_rabbitmq(mocker: MockerFixture) -> dict[str, AsyncMock]: - """mocks simcore_service_dynamic_sidecar.core.rabbitmq.RabbitMQ member functions""" + """mocks simcore_service_dynamic_sidecar.core.rabbitmq.RabbitMQClient member functions""" return { - "connect": mocker.patch( - "simcore_service_dynamic_sidecar.core.rabbitmq.RabbitMQ.connect", - return_value=None, - autospec=True, - ), - "send_event_reload_iframe": mocker.patch( - "simcore_service_dynamic_sidecar.core.rabbitmq.RabbitMQ.send_event_reload_iframe", + "wait_till_rabbitmq_responsive": mocker.patch( + "simcore_service_dynamic_sidecar.core.rabbitmq.wait_till_rabbitmq_responsive", return_value=None, autospec=True, ), "post_log_message": mocker.patch( - "simcore_service_dynamic_sidecar.core.rabbitmq.RabbitMQ.post_log_message", + "simcore_service_dynamic_sidecar.core.rabbitmq._post_rabbit_message", return_value=None, autospec=True, ), "close": mocker.patch( - "simcore_service_dynamic_sidecar.core.rabbitmq.RabbitMQ.close", + "simcore_service_dynamic_sidecar.core.rabbitmq.RabbitMQClient.close", return_value=None, autospec=True, ), @@ -155,7 +150,7 @@ async def ensure_external_volumes( # CLEAN: # docker volume rm $(docker volume ls --format "{{.Name}} {{.Labels}}" | grep run_id | awk '{print $1}') - yield volumes + yield tuple(volumes) @retry( wait=wait_fixed(1), diff --git a/services/dynamic-sidecar/tests/unit/test_api_containers.py b/services/dynamic-sidecar/tests/unit/test_api_containers.py index faeb6c17257..4f1f0525ea2 100644 --- a/services/dynamic-sidecar/tests/unit/test_api_containers.py +++ b/services/dynamic-sidecar/tests/unit/test_api_containers.py @@ -155,12 +155,12 @@ def compose_spec(dynamic_sidecar_network_name: str) -> str: "version": "3", "services": { "first-box": { - "image": "busybox", + "image": "busybox:latest", "networks": [ dynamic_sidecar_network_name, ], }, - "second-box": {"image": "busybox"}, + "second-box": {"image": "busybox:latest"}, }, "networks": {dynamic_sidecar_network_name: {}}, } @@ -173,7 +173,7 @@ def compose_spec_single_service() -> str: { "version": "3", "services": { - "solo-box": {"image": "busybox"}, + "solo-box": {"image": "busybox:latest"}, }, } ) diff --git a/services/dynamic-sidecar/tests/unit/test_api_containers_long_running_tasks.py b/services/dynamic-sidecar/tests/unit/test_api_containers_long_running_tasks.py index 6e50f375eec..9c79ec2add9 100644 --- a/services/dynamic-sidecar/tests/unit/test_api_containers_long_running_tasks.py +++ b/services/dynamic-sidecar/tests/unit/test_api_containers_long_running_tasks.py @@ -121,19 +121,19 @@ def dynamic_sidecar_network_name() -> str: "version": "3", "services": { "first-box": { - "image": "busybox", + "image": "busybox:latest", "networks": [ _get_dynamic_sidecar_network_name(), ], }, - "second-box": {"image": "busybox"}, + "second-box": {"image": "busybox:latest"}, }, "networks": {_get_dynamic_sidecar_network_name(): {}}, }, { "version": "3", "services": { - "solo-box": {"image": "busybox"}, + "solo-box": {"image": "busybox:latest"}, }, }, ] diff --git a/services/dynamic-sidecar/tests/unit/test_cli.py b/services/dynamic-sidecar/tests/unit/test_cli.py index dcdb2741bfe..499650c9540 100644 --- a/services/dynamic-sidecar/tests/unit/test_cli.py +++ b/services/dynamic-sidecar/tests/unit/test_cli.py @@ -15,11 +15,6 @@ def cli_runner(mock_environment: EnvVarsDict) -> CliRunner: return CliRunner() -@pytest.fixture -def mock_rabbitmq(mocker: MockerFixture) -> None: - mocker.patch("simcore_service_dynamic_sidecar.cli.RabbitMQ", spec=True) - - @pytest.fixture def mock_data_manager(mocker: MockerFixture) -> None: mocker.patch( @@ -36,9 +31,7 @@ def mock_nodeports(mocker: MockerFixture) -> None: ) -def test_list_state_dirs( - cli_runner: CliRunner, mock_rabbitmq: None, mock_data_manager: None -): +def test_list_state_dirs(cli_runner: CliRunner, mock_data_manager: None): result = cli_runner.invoke(main, ["state-list-dirs"]) assert result.exit_code == os.EX_OK, result.stdout assert result.stdout.strip() == "\n".join( @@ -46,17 +39,13 @@ def test_list_state_dirs( ) -def test_outputs_push_interface( - cli_runner: CliRunner, mock_rabbitmq: None, mock_data_manager: None -): +def test_outputs_push_interface(cli_runner: CliRunner, mock_data_manager: None): result = cli_runner.invoke(main, ["state-save"]) assert result.exit_code == os.EX_OK, result.stdout assert result.stdout == "state save finished successfully\n" -def test_state_save_interface( - cli_runner: CliRunner, mock_rabbitmq: None, mock_nodeports: None -): +def test_state_save_interface(cli_runner: CliRunner, mock_nodeports: None): result = cli_runner.invoke(main, ["outputs-push"]) assert result.exit_code == os.EX_OK, result.stdout assert result.stdout == "output ports push finished successfully\n" diff --git a/services/dynamic-sidecar/tests/unit/test_core_docker_compose_utils.py b/services/dynamic-sidecar/tests/unit/test_core_docker_compose_utils.py index 610c87bd5dc..e9e911f3e0f 100644 --- a/services/dynamic-sidecar/tests/unit/test_core_docker_compose_utils.py +++ b/services/dynamic-sidecar/tests/unit/test_core_docker_compose_utils.py @@ -32,7 +32,7 @@ 'DY_SIDECAR_STATE_PATHS=["/work/workspace"]', ], "working_dir": "/work", - "image": "busybox", + "image": "busybox:latest", "command": f"sh -c \"echo 'setup {__name__}'; sleep {SLEEP_TIME_S}; echo 'teardown {__name__}'\"", } }, diff --git a/services/dynamic-sidecar/tests/unit/test_core_docker_logs.py b/services/dynamic-sidecar/tests/unit/test_core_docker_logs.py index f92ea6d74ed..bfabc27fbd6 100644 --- a/services/dynamic-sidecar/tests/unit/test_core_docker_logs.py +++ b/services/dynamic-sidecar/tests/unit/test_core_docker_logs.py @@ -29,7 +29,7 @@ async def container_name() -> AsyncIterable[str]: container = await client.containers.run( config={ "Cmd": ["/bin/ash", "-c", 'echo "test message"'], - "Image": "busybox", + "Image": "busybox:latest", } ) container_inspect = await container.show() @@ -46,14 +46,9 @@ async def test_background_log_fetcher( app: FastAPI, ): assert _get_background_log_fetcher(app=app) is not None - - assert mock_core_rabbitmq["connect"].call_count == 1 - await start_log_fetching(app=app, container_name=container_name) - # wait for background log fetcher await asyncio.sleep(1) assert mock_core_rabbitmq["post_log_message"].call_count == 1 await stop_log_fetching(app=app, container_name=container_name) - assert mock_core_rabbitmq["connect"].call_count == 1 diff --git a/services/dynamic-sidecar/tests/unit/test_core_utils.py b/services/dynamic-sidecar/tests/unit/test_core_utils.py index 2a26c7d9fba..f9034ca8052 100644 --- a/services/dynamic-sidecar/tests/unit/test_core_utils.py +++ b/services/dynamic-sidecar/tests/unit/test_core_utils.py @@ -49,7 +49,7 @@ def cmd(tmp_path: Path, sleep: int): - DY_SIDECAR_PATH_OUTPUTS=/work/outputs - DY_SIDECAR_STATE_PATHS=["/work/workspace"] working_dir: /work - image: busybox + image: busybox:latest command: sh -c "echo 'setup'; sleep {sleep}; echo 'teardown'" """ ) diff --git a/services/dynamic-sidecar/tests/unit/with_rabbit/test_core_rabbitmq.py b/services/dynamic-sidecar/tests/unit/with_rabbit/test_core_rabbitmq.py deleted file mode 100644 index ae2f8be0d58..00000000000 --- a/services/dynamic-sidecar/tests/unit/with_rabbit/test_core_rabbitmq.py +++ /dev/null @@ -1,94 +0,0 @@ -# pylint: disable=protected-access -# pylint: disable=redefined-outer-name -# pylint: disable=unused-argument -# pylint: disable=unused-variable - -import asyncio -from pprint import pformat - -import aio_pika -import pytest -from async_asgi_testclient import TestClient -from fastapi.applications import FastAPI -from models_library.projects import ProjectID -from models_library.projects_nodes import NodeID -from models_library.rabbitmq_messages import LoggerRabbitMessage -from models_library.users import UserID -from pytest_mock.plugin import MockerFixture -from pytest_simcore.helpers.typing_env import EnvVarsDict -from settings_library.rabbit import RabbitSettings -from simcore_service_dynamic_sidecar.core.application import create_app -from simcore_service_dynamic_sidecar.core.rabbitmq import SLEEP_BETWEEN_SENDS, RabbitMQ - -pytest_simcore_core_services_selection = [ - "rabbit", -] - - -@pytest.fixture -def app( - rabbit_service: RabbitSettings, docker_registry: str, mock_environment: EnvVarsDict -) -> FastAPI: - """app w/o mocking registry or rabbit""" - return create_app() - - -async def test_rabbitmq( - rabbit_queue: aio_pika.abc.AbstractQueue, - mocker: MockerFixture, - user_id: UserID, - project_id: ProjectID, - node_id: NodeID, - test_client: TestClient, -): - app = test_client.application - assert isinstance(app, FastAPI) - - rabbit = app.state.rabbitmq - assert isinstance(rabbit, RabbitMQ) - - incoming_data: list[LoggerRabbitMessage] = [] - - async def rabbit_message_handler( - message: aio_pika.abc.AbstractIncomingMessage, - ) -> None: - incoming_data.append(LoggerRabbitMessage.parse_raw(message.body)) - - await rabbit_queue.consume(rabbit_message_handler, exclusive=True, no_ack=True) - - assert rabbit._connection - assert rabbit._connection.ready - - log_msg: str = "I am logging" - log_messages: list[str] = ["I", "am a logger", "man..."] - log_more_messages: list[str] = [f"msg{1}" for i in range(10)] - - await rabbit.post_log_message(log_msg) - await rabbit.post_log_message(log_messages) - - # make sure the first 2 messages are - # sent in the same chunk - await asyncio.sleep(SLEEP_BETWEEN_SENDS * 1.1) - await rabbit.post_log_message(log_more_messages) - # wait for all the messages to be delivered, - # need to make sure all messages are delivered - await asyncio.sleep(SLEEP_BETWEEN_SENDS * 1.1) - - # if this fails the above sleep did not work - assert len(incoming_data) == 2, f"missing incoming data: {pformat(incoming_data)}" - assert incoming_data[0] == LoggerRabbitMessage( - messages=[log_msg] + log_messages, - node_id=node_id, - project_id=project_id, - user_id=user_id, - ) - - assert incoming_data[1] == LoggerRabbitMessage( - messages=log_more_messages, - node_id=node_id, - project_id=project_id, - user_id=user_id, - ) - - # ensure closes correctly - await rabbit.close() diff --git a/services/web/server/src/simcore_service_webserver/computation_subscribe.py b/services/web/server/src/simcore_service_webserver/computation_subscribe.py index df3cbaa9073..b4aa5e18aae 100644 --- a/services/web/server/src/simcore_service_webserver/computation_subscribe.py +++ b/services/web/server/src/simcore_service_webserver/computation_subscribe.py @@ -1,10 +1,7 @@ -import asyncio +import functools import logging -import os -import socket -from typing import Any, AsyncIterator, Awaitable, Callable +from typing import AsyncIterator -import aio_pika from aiohttp import web from models_library.rabbitmq_messages import ( EventRabbitMessage, @@ -19,9 +16,9 @@ service_stopped, ) from servicelib.json_serialization import json_dumps -from servicelib.rabbitmq_utils import RabbitMQRetryPolicyUponInitialization -from servicelib.utils import logged_gather -from tenacity import retry +from servicelib.logging_utils import log_context +from servicelib.rabbitmq import RabbitMQClient +from servicelib.rabbitmq_utils import wait_till_rabbitmq_responsive from .computation_settings import RabbitSettings, get_plugin_settings from .projects import projects_api @@ -37,7 +34,7 @@ log = logging.getLogger(__name__) -async def progress_message_parser(app: web.Application, data: bytes) -> None: +async def progress_message_parser(app: web.Application, data: bytes) -> bool: # update corresponding project, node, progress value rabbit_message = ProgressRabbitMessage.parse_raw(data) try: @@ -60,19 +57,23 @@ async def progress_message_parser(app: web.Application, data: bytes) -> None: } ] await send_messages(app, f"{rabbit_message.user_id}", messages) + return True except ProjectNotFoundError: log.warning( "project related to received rabbitMQ progress message not found: '%s'", json_dumps(rabbit_message, indent=2), ) + return True except NodeNotFoundError: log.warning( "node related to received rabbitMQ progress message not found: '%s'", json_dumps(rabbit_message, indent=2), ) + return True + return False -async def log_message_parser(app: web.Application, data: bytes) -> None: +async def log_message_parser(app: web.Application, data: bytes) -> bool: rabbit_message = LoggerRabbitMessage.parse_raw(data) socket_messages: list[SocketMessageDict] = [ @@ -82,9 +83,10 @@ async def log_message_parser(app: web.Application, data: bytes) -> None: } ] await send_messages(app, f"{rabbit_message.user_id}", socket_messages) + return True -async def instrumentation_message_parser(app: web.Application, data: bytes) -> None: +async def instrumentation_message_parser(app: web.Application, data: bytes) -> bool: rabbit_message = InstrumentationRabbitMessage.parse_raw(data) if rabbit_message.metrics == "service_started": service_started( @@ -94,9 +96,10 @@ async def instrumentation_message_parser(app: web.Application, data: bytes) -> N service_stopped( app, **{key: rabbit_message.dict()[key] for key in SERVICE_STOPPED_LABELS} ) + return True -async def events_message_parser(app: web.Application, data: bytes) -> None: +async def events_message_parser(app: web.Application, data: bytes) -> bool: rabbit_message = EventRabbitMessage.parse_raw(data) socket_messages: list[SocketMessageDict] = [ @@ -109,143 +112,52 @@ async def events_message_parser(app: web.Application, data: bytes) -> None: } ] await send_messages(app, f"{rabbit_message.user_id}", socket_messages) + return True -APP_RABBITMQ_POOL_KEY = f"{__name__}.pool" -_RABBITMQ_INTERVAL_BEFORE_RESTARTING_CONSUMER_S = 2 +EXCHANGE_TO_PARSER_CONFIG = ( + ( + LoggerRabbitMessage.get_channel_name(), + log_message_parser, + {"no_ack": True}, + ), + ( + ProgressRabbitMessage.get_channel_name(), + progress_message_parser, + {"no_ack": True}, + ), + ( + InstrumentationRabbitMessage.get_channel_name(), + instrumentation_message_parser, + {"no_ack": False}, + ), + ( + EventRabbitMessage.get_channel_name(), + events_message_parser, + {"no_ack": False}, + ), +) async def setup_rabbitmq_consumer(app: web.Application) -> AsyncIterator[None]: - # TODO: catch and deal with missing connections: - # e.g. CRITICAL:pika.adapters.base_connection:Could not get addresses to use: [Errno -2] Name or service not known (rabbit) - # This exception is catch and pika persists ... WARNING:pika.connection:Could not connect, 5 attempts l settings: RabbitSettings = get_plugin_settings(app) - rabbit_broker = settings.dsn - - log.info("Creating pika connection pool for %s", rabbit_broker) - await wait_till_rabbitmq_responsive(f"{rabbit_broker}") - # NOTE: to show the connection name in the rabbitMQ UI see there - # https://www.bountysource.com/issues/89342433-setting-custom-connection-name-via-client_properties-doesn-t-work-when-connecting-using-an-amqp-url - # - async def _get_connection() -> aio_pika.Connection: - return await aio_pika.connect_robust( - f"{rabbit_broker}" - + f"?name={__name__}_{socket.gethostname()}_{os.getpid()}", - client_properties={"connection_name": "webserver read connection"}, - ) + with log_context( + log, logging.INFO, msg=f"Check RabbitMQ backend is ready on {settings.dsn}" + ): + await wait_till_rabbitmq_responsive(f"{settings.dsn}") - app[APP_RABBITMQ_POOL_KEY] = connection_pool = aio_pika.pool.Pool( - _get_connection, max_size=2 - ) - assert connection_pool # nosec - - async def _get_channel() -> aio_pika.Channel: - async with connection_pool.acquire() as connection: - channel = await connection.channel() - # Finding a suitable prefetch value is a matter of trial and error - # and will vary from workload to workload. Values in the 100 - # through 300 range usually offer optimal throughput and do not - # run significant risk of overwhelming consumers. Higher values - # often run into the law of diminishing returns. - # Prefetch value of 1 is the most conservative. It will significantly - # reduce throughput, in particular in environments where consumer - # connection latency is high. For many applications, a higher value - # would be appropriate and optimal. - await channel.set_qos(prefetch_count=100) - return channel - - channel_pool = aio_pika.pool.Pool(_get_channel, max_size=10) - - consumer_running = True - - async def _exchange_consumer( - exchange_name: str, - parse_handler: Callable[[web.Application, bytes], Awaitable[None]], - consumer_kwargs: dict[str, Any], + with log_context( + log, logging.INFO, msg=f"Connect RabbitMQ client to {settings.dsn}" ): - while consumer_running: - try: - async with channel_pool.acquire() as channel: - exchange = await channel.declare_exchange( - exchange_name, aio_pika.ExchangeType.FANOUT - ) - # Declaring queue - queue = await channel.declare_queue( - f"webserver_{exchange_name}_{socket.gethostname()}_{os.getpid()}", - arguments={"x-message-ttl": 60000}, - ) - # Binding the queue to the exchange - await queue.bind(exchange) - # process - async with queue.iterator(**consumer_kwargs) as queue_iter: - async for message in queue_iter: - log.debug( - "Received message from exchange %s", exchange_name - ) - - await parse_handler(app, message.body) - log.debug("message parsed") - except asyncio.CancelledError: - log.info("stopping rabbitMQ consumer for %s", exchange_name) - raise - except Exception: # pylint: disable=broad-except - log.warning( - "unexpected error in consumer for %s, %s", - exchange_name, - "restarting..." if consumer_running else "stopping", - exc_info=True, - ) - if consumer_running: - await asyncio.sleep(_RABBITMQ_INTERVAL_BEFORE_RESTARTING_CONSUMER_S) - - consumer_tasks = [] - for exchange_name, message_parser, consumer_kwargs in [ - ( - settings.RABBIT_CHANNELS["log"], - log_message_parser, - {"no_ack": True}, - ), - ( - settings.RABBIT_CHANNELS["progress"], - progress_message_parser, - {"no_ack": True}, - ), - ( - settings.RABBIT_CHANNELS["instrumentation"], - instrumentation_message_parser, - {"no_ack": False}, - ), - ( - settings.RABBIT_CHANNELS["events"], - events_message_parser, - {"no_ack": False}, - ), - ]: - task = asyncio.create_task( - _exchange_consumer(exchange_name, message_parser, consumer_kwargs) - ) - consumer_tasks.append(task) + rabbit_client = RabbitMQClient("webserver", settings) - log.info("Connected to rabbitMQ exchanges") + for exchange_name, parser_fct, _exchange_kwargs in EXCHANGE_TO_PARSER_CONFIG: + await rabbit_client.subscribe( + exchange_name, functools.partial(parser_fct, app) + ) yield # cleanup - log.info("Disconnecting from rabbitMQ exchanges...") - consumer_running = False - for task in consumer_tasks: - task.cancel() - await logged_gather(*consumer_tasks, reraise=False, log=log) - - log.info("Closing connections...") - await channel_pool.close() - await connection_pool.close() - log.info("closed.") - - -@retry(**RabbitMQRetryPolicyUponInitialization().kwargs) -async def wait_till_rabbitmq_responsive(url: str) -> bool: - """Check if something responds to ``url``""" - connection = await aio_pika.connect(url) - await connection.close() - return True + with log_context(log, logging.INFO, msg="Closing RabbitMQ client"): + await rabbit_client.close() diff --git a/services/web/server/tests/integration/01/test_garbage_collection.py b/services/web/server/tests/integration/01/test_garbage_collection.py index ade9e397806..7324b5ca59e 100644 --- a/services/web/server/tests/integration/01/test_garbage_collection.py +++ b/services/web/server/tests/integration/01/test_garbage_collection.py @@ -293,6 +293,7 @@ async def disconnect_user_from_socketio(client, sio_connection_data): socket_registry = get_registry(client.server.app) await sio.disconnect() assert not sio.sid + await asyncio.sleep(0) # just to ensure there is a context switch assert not await socket_registry.find_keys(("socket_id", sio.sid)) assert not sid in await socket_registry.find_resources(resource_key, "socket_id") assert not await socket_registry.find_resources(resource_key, "socket_id") diff --git a/services/web/server/tests/integration/02/test_rabbit.py b/services/web/server/tests/integration/02/test_rabbit.py index 02fa66a9f0b..55d6544d790 100644 --- a/services/web/server/tests/integration/02/test_rabbit.py +++ b/services/web/server/tests/integration/02/test_rabbit.py @@ -6,8 +6,9 @@ import json import logging from asyncio import sleep -from collections import namedtuple -from typing import Any, Awaitable, Callable, NamedTuple +from dataclasses import dataclass +from typing import Any, AsyncIterator, Awaitable, Callable +from unittest import mock import aio_pika import pytest @@ -19,18 +20,18 @@ from models_library.projects_nodes_io import NodeID from models_library.projects_state import RunningState from models_library.rabbitmq_messages import ( + EventRabbitMessage, InstrumentationRabbitMessage, LoggerRabbitMessage, ProgressRabbitMessage, + RabbitEventMessageType, ) from models_library.users import UserID from pytest_mock import MockerFixture from pytest_simcore.helpers.utils_login import UserInfoDict -from pytest_simcore.rabbit_service import RabbitExchanges from servicelib.aiohttp.application import create_safe_application from settings_library.rabbit import RabbitSettings from simcore_postgres_database.models.comp_tasks import NodeClass -from simcore_service_webserver._constants import APP_SETTINGS_KEY from simcore_service_webserver.application_settings import setup_settings from simcore_service_webserver.computation import setup_computation from simcore_service_webserver.db import setup_db @@ -43,16 +44,20 @@ from simcore_service_webserver.security import setup_security from simcore_service_webserver.security_roles import UserRole from simcore_service_webserver.session import setup_session +from simcore_service_webserver.socketio.events import ( + SOCKET_IO_EVENT, + SOCKET_IO_LOG_EVENT, + SOCKET_IO_NODE_UPDATED_EVENT, +) from simcore_service_webserver.socketio.plugin import setup_socketio -from tenacity import retry_if_exception_type from tenacity._asyncio import AsyncRetrying from tenacity.before_sleep import before_sleep_log +from tenacity.retry import retry_if_exception_type from tenacity.stop import stop_after_delay from tenacity.wait import wait_fixed # Selection of core and tool services started in this swarm fixture (integration) pytest_simcore_core_services_selection = [ - "migration", "postgres", "rabbit", "redis", @@ -63,10 +68,25 @@ logger = logging.getLogger(__name__) -RabbitMessage = dict[str, Any] LogMessages = list[LoggerRabbitMessage] InstrumMessages = list[InstrumentationRabbitMessage] ProgressMessages = list[ProgressRabbitMessage] +EventMessages = list[EventRabbitMessage] + + +@dataclass(frozen=True) +class RabbitExchanges: + logs: aio_pika.abc.AbstractExchange + progress: aio_pika.abc.AbstractExchange + instrumentation: aio_pika.abc.AbstractExchange + events: aio_pika.abc.AbstractExchange + + +@dataclass(frozen=True) +class SocketIoHandlers: + mock_log: mock.Mock + mock_node_updated: mock.Mock + mock_event: mock.Mock async def _publish_in_rabbit( @@ -75,7 +95,7 @@ async def _publish_in_rabbit( node_uuid: NodeID, num_messages: int, rabbit_exchanges: RabbitExchanges, -) -> tuple[LogMessages, ProgressMessages, InstrumMessages]: +) -> tuple[LogMessages, ProgressMessages, InstrumMessages, EventMessages]: log_messages = [ LoggerRabbitMessage( @@ -95,6 +115,12 @@ async def _publish_in_rabbit( ) for n in range(num_messages) ] + event_message = EventRabbitMessage( + user_id=user_id, + project_id=project_id, + node_id=node_uuid, + action=RabbitEventMessageType.RELOAD_IFRAME, + ) # indicate container is started instrumentation_start_message = ( @@ -138,6 +164,11 @@ async def _publish_in_rabbit( routing_key="", ) + await rabbit_exchanges.events.publish( + aio_pika.Message(body=event_message.json().encode(), content_type="text/json"), + routing_key="", + ) + # indicate container is stopped await rabbit_exchanges.instrumentation.publish( aio_pika.Message( @@ -147,7 +178,7 @@ async def _publish_in_rabbit( routing_key="", ) - return (log_messages, progress_messages, instrumentation_messages) + return (log_messages, progress_messages, instrumentation_messages, [event_message]) @pytest.fixture @@ -157,7 +188,6 @@ def client( app_config: dict[str, Any], ## waits until swarm with *_services are up rabbit_service: RabbitSettings, ## waits until rabbit is responsive and set env vars postgres_db: sa.engine.Engine, - mocker: MockerFixture, monkeypatch_setenv_from_app_config: Callable, ): app_config["storage"]["enabled"] = False @@ -166,7 +196,6 @@ def client( app = create_safe_application(app_config) assert setup_settings(app) - assert app[APP_SETTINGS_KEY].WEBSERVER_COMPUTATION setup_db(app) setup_session(app) @@ -228,7 +257,7 @@ async def socketio_subscriber_handlers( socketio_client_factory: Callable, client_session_id: UUIDStr, mocker: MockerFixture, -) -> NamedTuple: +) -> SocketIoHandlers: """socketio SUBSCRIBER @@ -242,18 +271,19 @@ async def socketio_subscriber_handlers( # if client does not hold an authentication token sio: socketio.AsyncClient = await socketio_client_factory(client_session_id) - WEBSOCKET_LOG_EVENTNAME = "logger" # called when log messages are received mock_log_handler = mocker.Mock() - sio.on(WEBSOCKET_LOG_EVENTNAME, handler=mock_log_handler) + sio.on(SOCKET_IO_LOG_EVENT, handler=mock_log_handler) - WEBSOCKET_NODE_UPDATE_EVENTNAME = "nodeUpdated" # called when a node was updated (e.g. progress) mock_node_update_handler = mocker.Mock() - sio.on(WEBSOCKET_NODE_UPDATE_EVENTNAME, handler=mock_node_update_handler) + sio.on(SOCKET_IO_NODE_UPDATED_EVENT, handler=mock_node_update_handler) - return namedtuple("_MockHandlers", "log_handler node_update_handler")( - mock_log_handler, mock_node_update_handler + # called on event + mock_event_handler = mocker.Mock() + sio.on(SOCKET_IO_EVENT, handler=mock_event_handler) + return SocketIoHandlers( + mock_log_handler, mock_node_update_handler, mock_event_handler ) @@ -262,7 +292,7 @@ def publish_some_messages_in_rabbit( rabbit_exchanges: RabbitExchanges, ) -> Callable[ [UserID, ProjectID, NodeID, int], - Awaitable[tuple[LogMessages, ProgressMessages, InstrumMessages]], + Awaitable[tuple[LogMessages, ProgressMessages, InstrumMessages, EventMessages]], ]: """rabbitMQ PUBLISHER""" @@ -285,6 +315,54 @@ def user_role() -> UserRole: return UserRole.USER +@pytest.fixture(scope="function") +async def rabbit_exchanges( + rabbit_settings: RabbitSettings, + rabbit_channel: aio_pika.Channel, +) -> AsyncIterator[RabbitExchanges]: + + logs_exchange = await rabbit_channel.declare_exchange( + LoggerRabbitMessage.get_channel_name(), + aio_pika.ExchangeType.FANOUT, + durable=True, + ) + assert logs_exchange + + progress_exchange = await rabbit_channel.declare_exchange( + ProgressRabbitMessage.get_channel_name(), + aio_pika.ExchangeType.FANOUT, + durable=True, + ) + assert progress_exchange + + instrumentation_exchange = await rabbit_channel.declare_exchange( + InstrumentationRabbitMessage.get_channel_name(), + aio_pika.ExchangeType.FANOUT, + durable=True, + ) + assert instrumentation_exchange + + events_exchange = await rabbit_channel.declare_exchange( + EventRabbitMessage.get_channel_name(), + aio_pika.ExchangeType.FANOUT, + durable=True, + ) + assert instrumentation_exchange + + exchanges = RabbitExchanges( + logs_exchange, progress_exchange, instrumentation_exchange, events_exchange + ) + yield exchanges + + for exchange in [ + LoggerRabbitMessage, + ProgressRabbitMessage, + InstrumentationRabbitMessage, + EventRabbitMessage, + ]: + await rabbit_channel.exchange_delete(exchange.get_channel_name()) + + # # publisher ---> (rabbitMQ) ---> webserver --- (socketio) ---> front-end pages # @@ -317,13 +395,12 @@ async def test_publish_to_other_user( not_current_project_id: ProjectID, not_in_project_node_uuid: NodeID, # - socketio_subscriber_handlers: NamedTuple, + socketio_subscriber_handlers: SocketIoHandlers, publish_some_messages_in_rabbit: Callable[ [UserID, ProjectID, NodeID, int], - Awaitable[tuple[LogMessages, ProgressMessages, InstrumMessages]], + Awaitable[tuple[LogMessages, ProgressMessages, InstrumMessages, EventMessages]], ], ): - mock_log_handler, mock_node_update_handler = socketio_subscriber_handlers # Some other client publishes messages with wrong user id await publish_some_messages_in_rabbit( @@ -334,8 +411,9 @@ async def test_publish_to_other_user( ) await sleep(TIMEOUT_S) - mock_log_handler.assert_not_called() - mock_node_update_handler.assert_not_called() + socketio_subscriber_handlers.mock_log.assert_not_called() + socketio_subscriber_handlers.mock_node_updated.assert_not_called() + socketio_subscriber_handlers.mock_event.assert_not_called() @pytest.mark.parametrize("user_role", USER_ROLES) @@ -344,16 +422,14 @@ async def test_publish_to_user( not_current_project_id: ProjectID, not_in_project_node_uuid: NodeID, # - socketio_subscriber_handlers: NamedTuple, + socketio_subscriber_handlers: SocketIoHandlers, publish_some_messages_in_rabbit: Callable[ [UserID, ProjectID, NodeID, int], - Awaitable[tuple[LogMessages, ProgressMessages, InstrumMessages]], + Awaitable[tuple[LogMessages, ProgressMessages, InstrumMessages, EventMessages]], ], ): - mock_log_handler, mock_node_update_handler = socketio_subscriber_handlers - # publish messages with correct user id, but no project - log_messages, _, _ = await publish_some_messages_in_rabbit( + log_messages, *_ = await publish_some_messages_in_rabbit( logged_user["id"], not_current_project_id, not_in_project_node_uuid, @@ -362,17 +438,21 @@ async def test_publish_to_user( async for attempt in AsyncRetrying(**RETRY_POLICY): with attempt: - assert mock_log_handler.call_count == (NUMBER_OF_MESSAGES) + assert socketio_subscriber_handlers.mock_log.call_count == ( + NUMBER_OF_MESSAGES + ) for mock_call, expected_message in zip( - mock_log_handler.call_args_list, log_messages + socketio_subscriber_handlers.mock_log.mock_log_handler.call_args_list, + log_messages, ): value = mock_call[0] deserialized_value = json.loads(value[0]) assert deserialized_value == json.loads( - expected_message.json(include={"node_id", "project_id", "messages"}) + expected_message.json(exclude={"user_id"}) ) - mock_node_update_handler.assert_not_called() + socketio_subscriber_handlers.mock_node_updated.assert_not_called() + socketio_subscriber_handlers.mock_event.assert_called_once() @pytest.mark.parametrize("user_role", USER_ROLES) @@ -381,16 +461,14 @@ async def test_publish_about_users_project( user_project: dict[str, Any], not_in_project_node_uuid: NodeID, # - socketio_subscriber_handlers: NamedTuple, + socketio_subscriber_handlers: SocketIoHandlers, publish_some_messages_in_rabbit: Callable[ [UserID, ProjectID, NodeID, int], - Awaitable[tuple[LogMessages, ProgressMessages, InstrumMessages]], + Awaitable[tuple[LogMessages, ProgressMessages, InstrumMessages, EventMessages]], ], ): - mock_log_handler, mock_node_update_handler = socketio_subscriber_handlers - # publish message with correct user id, project but not node - log_messages, _, _ = await publish_some_messages_in_rabbit( + log_messages, *_ = await publish_some_messages_in_rabbit( UserID(logged_user["id"]), ProjectID(user_project["uuid"]), not_in_project_node_uuid, @@ -399,17 +477,20 @@ async def test_publish_about_users_project( async for attempt in AsyncRetrying(**RETRY_POLICY): with attempt: - assert mock_log_handler.call_count == (NUMBER_OF_MESSAGES) + assert socketio_subscriber_handlers.mock_log.call_count == ( + NUMBER_OF_MESSAGES + ) for mock_call, expected_message in zip( - mock_log_handler.call_args_list, log_messages + socketio_subscriber_handlers.mock_log.call_args_list, log_messages ): value = mock_call[0] deserialized_value = json.loads(value[0]) assert deserialized_value == json.loads( - expected_message.json(include={"node_id", "project_id", "messages"}) + expected_message.json(exclude={"user_id"}) ) - mock_node_update_handler.assert_not_called() + socketio_subscriber_handlers.mock_node_updated.assert_not_called() + socketio_subscriber_handlers.mock_event.assert_called_once() @pytest.mark.parametrize("user_role", USER_ROLES) @@ -417,17 +498,15 @@ async def test_publish_about_users_projects_node( logged_user: UserInfoDict, user_project: dict[str, Any], # - socketio_subscriber_handlers: NamedTuple, + socketio_subscriber_handlers: SocketIoHandlers, publish_some_messages_in_rabbit: Callable[ [UserID, ProjectID, NodeID, int], - Awaitable[tuple[LogMessages, ProgressMessages, InstrumMessages]], + Awaitable[tuple[LogMessages, ProgressMessages, InstrumMessages, EventMessages]], ], ): - mock_log_handler, mock_node_update_handler = socketio_subscriber_handlers - # publish message with correct user id, project node node_uuid = NodeID(list(user_project["workbench"])[0]) - log_messages, _, _ = await publish_some_messages_in_rabbit( + log_messages, *_ = await publish_some_messages_in_rabbit( UserID(logged_user["id"]), ProjectID(user_project["uuid"]), node_uuid, @@ -436,18 +515,25 @@ async def test_publish_about_users_projects_node( async for attempt in AsyncRetrying(**RETRY_POLICY): with attempt: - assert mock_log_handler.call_count == (NUMBER_OF_MESSAGES) - assert mock_node_update_handler.call_count == (NUMBER_OF_MESSAGES) + assert socketio_subscriber_handlers.mock_log.call_count == ( + NUMBER_OF_MESSAGES + ) + assert socketio_subscriber_handlers.mock_node_updated.call_count == ( + NUMBER_OF_MESSAGES + ) for mock_call, expected_message in zip( - mock_log_handler.call_args_list, log_messages + socketio_subscriber_handlers.mock_log.call_args_list, log_messages ): value = mock_call[0] deserialized_value = json.loads(value[0]) assert deserialized_value == json.loads( - expected_message.json(include={"node_id", "project_id", "messages"}) + expected_message.json(exclude={"user_id"}) ) # mock_log_handler.assert_has_calls(log_calls, any_order=True) - mock_node_update_handler.assert_called() - assert mock_node_update_handler.call_count == (NUMBER_OF_MESSAGES) + socketio_subscriber_handlers.mock_node_updated.assert_called() + assert socketio_subscriber_handlers.mock_node_updated.call_count == ( + NUMBER_OF_MESSAGES + ) + socketio_subscriber_handlers.mock_event.assert_called_once()