Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ listen to wallet events in payment system #5012

14 changes: 13 additions & 1 deletion packages/service-library/src/servicelib/rabbitmq/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ async def _get_channel(self) -> aio_pika.abc.AbstractChannel:
channel.close_callbacks.add(self._channel_close_callback)
return channel

async def _get_consumer_tag(self, exchange_name) -> str:
return f"{get_rabbitmq_client_unique_name(self.client_name)}_{exchange_name}"

async def subscribe(
self,
exchange_name: str,
Expand Down Expand Up @@ -139,10 +142,11 @@ async def _on_message(
)
await message.nack()

_consumer_tag = await self._get_consumer_tag(exchange_name)
await queue.consume(
_on_message,
exclusive=exclusive_queue,
consumer_tag=f"{get_rabbitmq_client_unique_name(self.client_name)}_{exchange_name}",
consumer_tag=_consumer_tag,
)
output: str = queue.name
return output
Expand Down Expand Up @@ -214,3 +218,11 @@ async def publish(self, exchange_name: str, message: RabbitMessage) -> None:
aio_pika.Message(message.body()),
routing_key=message.routing_key() or "",
)

async def unsubscribe_consumer(self, exchange_name: str):
assert self._channel_pool # nosec
async with self._channel_pool.acquire() as channel:
queue_name = exchange_name
queue = await channel.get_queue(queue_name)
_consumer_tag = await self._get_consumer_tag(exchange_name)
await queue.cancel(_consumer_tag)
17 changes: 17 additions & 0 deletions packages/service-library/tests/rabbitmq/test_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,3 +435,20 @@ async def test_rabbit_not_using_the_same_exchange_type_raises(
# now do a second subscribtion wiht topics, will create a TOPICS exchange
with pytest.raises(aio_pika.exceptions.ChannelPreconditionFailed):
await client.subscribe(exchange_name, mocked_message_parser, topics=[])


@pytest.mark.no_cleanup_check_rabbitmq_server_has_no_errors()
matusdrobuliak66 marked this conversation as resolved.
Show resolved Hide resolved
async def test_unsubscribe_consumer(
rabbitmq_client: Callable[[str], RabbitMQClient],
random_exchange_name: Callable[[], str],
mocked_message_parser: mock.AsyncMock,
):
exchange_name = f"{random_exchange_name()}"
client = rabbitmq_client("consumer")
await client.subscribe(exchange_name, mocked_message_parser, exclusive_queue=False)
# Unsubsribe just a consumer, the queue will be still there
await client.unsubscribe_consumer(exchange_name)
# Unsubsribe the queue
await client.unsubscribe(exchange_name)
with pytest.raises(aio_pika.exceptions.ChannelNotFoundEntity):
await client.unsubscribe(exchange_name)
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
)
from ..api.rest.routes import setup_rest_api
from ..api.rpc.routes import setup_rpc_api_routes
from ..services.auto_recharge_listener import setup_auto_recharge_listener
from ..services.payments_gateway import setup_payments_gateway
from ..services.postgres import setup_postgres
from ..services.rabbitmq import setup_rabbitmq
Expand Down Expand Up @@ -51,6 +52,9 @@ def create_app(settings: ApplicationSettings | None = None) -> FastAPI:
# APIs w/ RUT
setup_resource_usage_tracker(app)

# Listening to Rabbitmq
setup_auto_recharge_listener(app)

# ERROR HANDLERS
# ... add here ...

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import functools
import logging

from fastapi import FastAPI
from models_library.rabbitmq_messages import WalletCreditsMessage
from servicelib.logging_utils import log_context
from servicelib.rabbitmq import RabbitMQClient

from .auto_recharge_process_message import process_message
from .rabbitmq import get_rabbitmq_client

_logger = logging.getLogger(__name__)


async def _subscribe_to_rabbitmq(app) -> str:
with log_context(_logger, logging.INFO, msg="Subscribing to rabbitmq channel"):
rabbit_client: RabbitMQClient = get_rabbitmq_client(app)
subscribed_queue: str = await rabbit_client.subscribe(
WalletCreditsMessage.get_channel_name(),
message_handler=functools.partial(process_message, app),
exclusive_queue=False,
topics=["#"],
)
return subscribed_queue


async def _unsubscribe_consumer(app) -> None:
with log_context(_logger, logging.INFO, msg="Unsubscribing from rabbitmq queue"):
rabbit_client: RabbitMQClient = get_rabbitmq_client(app)
await rabbit_client.unsubscribe_consumer(
WalletCreditsMessage.get_channel_name(),
)
return None


def setup_auto_recharge_listener(app: FastAPI) -> None:
async def _on_startup() -> None:
app.state.auto_recharge_rabbitmq_consumer = await _subscribe_to_rabbitmq(app)

async def _on_shutdown() -> None:
# NOTE: We want to have persistent queue, therefore we will unsubscribe only consumer
app.state.auto_recharge_rabbitmq_constumer = await _unsubscribe_consumer(app)

app.add_event_handler("startup", _on_startup)
app.add_event_handler("shutdown", _on_shutdown)
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import logging

from fastapi import FastAPI
from models_library.rabbitmq_messages import WalletCreditsMessage
from pydantic import parse_raw_as

_logger = logging.getLogger(__name__)


async def process_message(app: FastAPI, data: bytes) -> bool:
assert app # nosec
rabbit_message = parse_raw_as(WalletCreditsMessage, data)
_logger.debug("Process msg: %s", rabbit_message)

# 1. Check if auto-recharge functionality is ON for wallet_id
# 2. Check if wallet credits are bellow the threshold
# 3. Get Payment method
# 4. Pay with payment method

return True
3 changes: 3 additions & 0 deletions services/payments/tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ def _do():
# The following services are affected if rabbitmq is not in place
mocker.patch("simcore_service_payments.core.application.setup_rabbitmq")
mocker.patch("simcore_service_payments.core.application.setup_rpc_api_routes")
mocker.patch(
"simcore_service_payments.core.application.setup_auto_recharge_listener"
)

return _do

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# pylint: disable=protected-access
# pylint: disable=redefined-outer-name
# pylint: disable=too-many-arguments
# pylint: disable=unused-argument
# pylint: disable=unused-variable

from collections.abc import Callable
from decimal import Decimal
from unittest import mock

import pytest
from fastapi import FastAPI
from models_library.rabbitmq_messages import WalletCreditsMessage
from pytest_mock.plugin import MockerFixture
from pytest_simcore.helpers.typing_env import EnvVarsDict
from pytest_simcore.helpers.utils_envs import setenvs_from_dict
from servicelib.rabbitmq import RabbitMQClient
from tenacity._asyncio import AsyncRetrying
from tenacity.retry import retry_if_exception_type
from tenacity.stop import stop_after_delay
from tenacity.wait import wait_fixed

pytest_simcore_core_services_selection = [
"postgres",
"rabbit",
]
pytest_simcore_ops_services_selection = [
"adminer",
]


@pytest.fixture
def app_environment(
monkeypatch: pytest.MonkeyPatch,
app_environment: EnvVarsDict,
rabbit_env_vars_dict: EnvVarsDict, # rabbitMQ settings from 'rabbit' service
postgres_env_vars_dict: EnvVarsDict,
wait_for_postgres_ready_and_db_migrated: None,
external_environment: EnvVarsDict,
):
# set environs
monkeypatch.delenv("PAYMENTS_RABBITMQ", raising=False)
monkeypatch.delenv("PAYMENTS_POSTGRES", raising=False)

return setenvs_from_dict(
monkeypatch,
{
**app_environment,
**rabbit_env_vars_dict,
**postgres_env_vars_dict,
**external_environment,
"POSTGRES_CLIENT_NAME": "payments-service-pg-client",
},
)


@pytest.fixture
async def mocked_message_parser(mocker: MockerFixture) -> mock.AsyncMock:
return mocker.patch(
"simcore_service_payments.services.auto_recharge_listener.process_message"
)


async def test_process_event_functions(
mocked_message_parser: mock.AsyncMock,
app: FastAPI,
rabbitmq_client: Callable[[str], RabbitMQClient],
):
publisher = rabbitmq_client("publisher")
msg = WalletCreditsMessage(wallet_id=1, credits=Decimal(120.5))
await publisher.publish(WalletCreditsMessage.get_channel_name(), msg)

async for attempt in AsyncRetrying(
wait=wait_fixed(0.1),
stop=stop_after_delay(5),
retry=retry_if_exception_type(AssertionError),
reraise=True,
):
with attempt:
mocked_message_parser.assert_called_once()
Loading