Skip to content

Commit

Permalink
♻️ Use common RabbitMQ client (⚠️ devops) (#3502)
Browse files Browse the repository at this point in the history
  • Loading branch information
sanderegg authored Nov 15, 2022
1 parent 45ca6ad commit c370521
Show file tree
Hide file tree
Showing 53 changed files with 784 additions and 808 deletions.
1 change: 0 additions & 1 deletion .env-devel
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ POSTGRES_PASSWORD=adminadmin
POSTGRES_PORT=5432
POSTGRES_USER=scu

RABBIT_CHANNELS='{"log": "simcore.services.logs", "progress": "simcore.services.progress", "instrumentation": "simcore.services.instrumentation", "events": "simcore.services.events"}'
RABBIT_HOST=rabbit
RABBIT_PASSWORD=adminadmin
RABBIT_PORT=5672
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/ci-testing-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1244,6 +1244,7 @@ jobs:
if: ${{ contains(join(needs.*.result, ','), 'failure') }}
run: |
echo "::error title=ERROR::one of the unit-tests failed!"
echo "${{ join(needs.*.result, ',') }}"
exit 1
- name: all the previous unit-tests were run successfully or skipped
if: ${{ !contains(join(needs.*.result, ','), 'failure') }}
Expand Down
21 changes: 14 additions & 7 deletions packages/models-library/src/models_library/rabbitmq_messages.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from enum import Enum
from typing import List, Optional, Union
from typing import Literal, Optional

from models_library.projects import ProjectID
from models_library.projects_nodes import NodeID
Expand All @@ -15,32 +15,39 @@ class RabbitEventMessageType(str, Enum):


class RabbitMessageBase(BaseModel):
channel_name: str
node_id: NodeID
user_id: UserID
project_id: ProjectID

@classmethod
def get_channel_name(cls) -> str:
# NOTE: this returns the channel type name
return cls.__fields__["channel_name"].default


class LoggerRabbitMessage(RabbitMessageBase):
messages: List[str]
channel_name: Literal["simcore.services.logs"] = "simcore.services.logs"
messages: list[str]


class EventRabbitMessage(RabbitMessageBase):
channel_name: Literal["simcore.services.events"] = "simcore.services.events"
action: RabbitEventMessageType


class ProgressRabbitMessage(RabbitMessageBase):
channel_name: Literal["simcore.services.progress"] = "simcore.services.progress"
progress: NonNegativeFloat


class InstrumentationRabbitMessage(RabbitMessageBase):
channel_name: Literal[
"simcore.services.instrumentation"
] = "simcore.services.instrumentation"
metrics: str
service_uuid: NodeID
service_type: NodeClass
service_key: str
service_tag: str
result: Optional[RunningState] = None


RabbitMessageTypes = Union[
LoggerRabbitMessage, ProgressRabbitMessage, InstrumentationRabbitMessage
]
11 changes: 4 additions & 7 deletions packages/pytest-simcore/src/pytest_simcore/docker_swarm.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,10 @@ def docker_stack(

# NOTE: if the migration service was already running prior to this call it must
# be force updated so that it does its job. else it remains and tests will fail
migration_service_was_running_before = any(
filter(
lambda s: "migration" in s.name, docker_client.services.list() # type: ignore
)
)
for migration_service in filter(
lambda s: "migration" in s.name, docker_client.services.list() # type: ignore
for migration_service in (
service
for service in docker_client.services.list()
if "migration" in service.name # type: ignore
):
print(
"WARNING: migration service detected before updating stack, it will be force-updated"
Expand Down
82 changes: 15 additions & 67 deletions packages/pytest-simcore/src/pytest_simcore/rabbit_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
# pylint: disable=unused-argument
# pylint: disable=unused-variable

import json
import asyncio
import logging
import os
import socket
from dataclasses import dataclass
from typing import Any, AsyncIterator, Optional

import aio_pika
Expand Down Expand Up @@ -49,7 +48,6 @@ async def rabbit_settings(
RABBIT_PASSWORD=testing_environ_vars["RABBIT_PASSWORD"],
RABBIT_HOST=get_localhost_ip(),
RABBIT_PORT=int(port),
RABBIT_CHANNELS=json.loads(testing_environ_vars["RABBIT_CHANNELS"]),
)

await wait_till_rabbit_responsive(settings.dsn)
Expand All @@ -59,7 +57,7 @@ async def rabbit_settings(

@pytest.fixture(scope="function")
async def rabbit_service(
rabbit_settings: RabbitSettings, monkeypatch
rabbit_settings: RabbitSettings, monkeypatch: pytest.MonkeyPatch
) -> RabbitSettings:
"""Sets env vars for a rabbit service is up and responsive and returns its settings as well
Expand All @@ -71,7 +69,6 @@ async def rabbit_service(
monkeypatch.setenv(
"RABBIT_PASSWORD", rabbit_settings.RABBIT_PASSWORD.get_secret_value()
)
monkeypatch.setenv("RABBIT_CHANNELS", json.dumps(rabbit_settings.RABBIT_CHANNELS))

return rabbit_settings

Expand All @@ -83,6 +80,11 @@ async def rabbit_connection(
def _reconnect_callback():
pytest.fail("rabbit reconnected")

def _connection_close_callback(sender: Any, exc: Optional[BaseException] = None):
if exc and not isinstance(exc, asyncio.CancelledError):
pytest.fail(f"rabbit connection closed with exception {exc} from {sender}!")
print("<-- connection closed")

# create connection
# NOTE: to show the connection name in the rabbitMQ UI see there
# https://www.bountysource.com/issues/89342433-setting-custom-connection-name-via-client_properties-doesn-t-work-when-connecting-using-an-amqp-url
Expand All @@ -93,6 +95,7 @@ def _reconnect_callback():
assert connection
assert not connection.is_closed
connection.reconnect_callbacks.add(_reconnect_callback)
connection.close_callbacks.add(_connection_close_callback)

yield connection
# close connection
Expand All @@ -105,68 +108,13 @@ async def rabbit_channel(
rabbit_connection: aio_pika.abc.AbstractConnection,
) -> AsyncIterator[aio_pika.abc.AbstractChannel]:
def _channel_close_callback(sender: Any, exc: Optional[BaseException] = None):
if exc:
pytest.fail("rabbit channel closed!")
else:
print("sender was '{sender}'")
if exc and not isinstance(exc, asyncio.CancelledError):
pytest.fail(f"rabbit channel closed with exception {exc} from {sender}!")
print("<-- rabbit channel closed")

# create channel
async with rabbit_connection.channel(publisher_confirms=False) as channel:
async with rabbit_connection.channel() as channel:
print("--> rabbit channel created")
channel.close_callbacks.add(_channel_close_callback)
yield channel


@dataclass
class RabbitExchanges:
logs: aio_pika.abc.AbstractExchange
progress: aio_pika.abc.AbstractExchange
instrumentation: aio_pika.abc.AbstractExchange


@pytest.fixture(scope="function")
async def rabbit_exchanges(
rabbit_settings: RabbitSettings,
rabbit_channel: aio_pika.Channel,
) -> RabbitExchanges:
"""
Declares and returns 'log' and 'instrumentation' exchange channels with rabbit
"""

# declare log exchange
LOG_EXCHANGE_NAME: str = rabbit_settings.RABBIT_CHANNELS["log"]
logs_exchange = await rabbit_channel.declare_exchange(
LOG_EXCHANGE_NAME, aio_pika.ExchangeType.FANOUT
)
assert logs_exchange

# declare progress exchange
PROGRESS_EXCHANGE_NAME: str = rabbit_settings.RABBIT_CHANNELS["progress"]
progress_exchange = await rabbit_channel.declare_exchange(
PROGRESS_EXCHANGE_NAME, aio_pika.ExchangeType.FANOUT
)
assert progress_exchange

# declare instrumentation exchange
INSTRUMENTATION_EXCHANGE_NAME: str = rabbit_settings.RABBIT_CHANNELS[
"instrumentation"
]
instrumentation_exchange = await rabbit_channel.declare_exchange(
INSTRUMENTATION_EXCHANGE_NAME, aio_pika.ExchangeType.FANOUT
)
assert instrumentation_exchange

return RabbitExchanges(logs_exchange, progress_exchange, instrumentation_exchange)


@pytest.fixture(scope="function")
async def rabbit_queue(
rabbit_channel: aio_pika.Channel,
rabbit_exchanges: RabbitExchanges,
) -> AsyncIterator[aio_pika.abc.AbstractQueue]:
queue = await rabbit_channel.declare_queue(exclusive=True)
assert queue

# Binding queue to exchange
await queue.bind(rabbit_exchanges.logs)
await queue.bind(rabbit_exchanges.progress)
await queue.bind(rabbit_exchanges.instrumentation)
yield queue
assert channel.is_closed
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,10 @@ async def _connect(
# WARNING: engineio fails with empty cookies. Expects "key=value"
headers.update({"Cookie": cookie})

logger.debug("Connecting socketio client to %s ...", url)
print(f"--> Connecting socketio client to {url} ...")
await sio.connect(url, headers=headers)
assert sio.sid
print("... connection done")
clients.append(sio)
return sio

Expand All @@ -96,8 +97,9 @@ async def _connect(
# cleans up clients produce by _connect(*) calls
for sio in clients:
if sio.connected:
logger.debug("Disconnecting socketio client %s", sio)
print(f"<--Disconnecting socketio client {sio}")
await sio.disconnect()
await sio.wait()
print(f"... disconnection from {sio} done.")

assert not sio.sid
5 changes: 5 additions & 0 deletions packages/service-library/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ test-dev: ## runs unit tests in w/o extras
--durations=10 \
--exitfirst \
--failed-first \
--keep-docker-up \
--ignore=tests/aiohttp \
--ignore=tests/fastapi \
--pdb \
Expand All @@ -58,6 +59,7 @@ test-dev[aiohttp]: ## runs unit common tests and aiohttp extras
--durations=10 \
--exitfirst \
--failed-first \
--keep-docker-up \
--ignore=tests/fastapi \
--pdb \
-vv \
Expand All @@ -76,6 +78,7 @@ test-dev[fastapi]: ## runs unit common tests and fastapi extras
--durations=10 \
--exitfirst \
--failed-first \
--keep-docker-up \
--ignore=tests/aiohttp \
--pdb \
-vv \
Expand All @@ -93,6 +96,7 @@ test-dev[all]: ## runs unit tests w/ all extras
--durations=10 \
--exitfirst \
--failed-first \
--keep-docker-up \
--pdb \
-vv \
$(CURDIR)/tests
Expand All @@ -109,6 +113,7 @@ test-ci[all]: ## runs unit tests w/ all extras
--cov-report=xml \
--cov=$(APP_PACKAGE_NAME) \
--durations=10 \
--keep-docker-up \
--log-date-format="%Y-%m-%d %H:%M:%S" \
--log-format="%(asctime)s %(levelname)s %(message)s" \
--verbose \
Expand Down
1 change: 1 addition & 0 deletions packages/service-library/requirements/_base.in
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

aiodebug
aiofiles
aio-pika
pydantic
pyinstrument
pyyaml
Expand Down
14 changes: 14 additions & 0 deletions packages/service-library/requirements/_base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,20 @@
#
# pip-compile --output-file=requirements/_base.txt --strip-extras requirements/_base.in
#
aio-pika==8.2.4
# via -r requirements/_base.in
aiodebug==2.3.0
# via -r requirements/_base.in
aiofiles==22.1.0
# via -r requirements/_base.in
aiormq==6.4.2
# via aio-pika
idna==3.4
# via yarl
multidict==6.0.2
# via yarl
pamqp==3.2.1
# via aiormq
pydantic==1.10.2
# via
# -c requirements/../../../requirements/constraints.txt
Expand All @@ -27,3 +37,7 @@ typing-extensions==4.4.0
# via
# aiodebug
# pydantic
yarl==1.8.1
# via
# aio-pika
# aiormq
2 changes: 2 additions & 0 deletions packages/service-library/requirements/_test.in
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
asgi_lifespan
coverage
coveralls
docker
faker
flaky
openapi-spec-validator
Expand All @@ -26,3 +27,4 @@ pytest-mock
pytest-runner
pytest-sugar
pytest-xdist
python-dotenv
Loading

0 comments on commit c370521

Please sign in to comment.