Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎨 Add extra tests for no registered subscriber when using RabbitMQClient.subscribe #5318

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/service-library/src/servicelib/rabbitmq/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ async def subscribe(
- exclusive_queue: False means that only one instance of this application will
reveice the incoming message

NOTE: ``message_ttl` is also a soft timeout: if the handler does not finish processing
the message before this is reached the message will be redelivered!

specifying a topic will make the client declare a TOPIC type of RabbitMQ Exchange
instead of FANOUT
- a FANOUT exchange transmit messages to any connected queue regardless of
Expand Down
65 changes: 65 additions & 0 deletions packages/service-library/tests/rabbitmq/test_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,71 @@ async def _always_returning_fail(_: Any) -> bool:
}


@pytest.mark.parametrize("topics", _TOPICS)
@pytest.mark.no_cleanup_check_rabbitmq_server_has_no_errors()
async def test_publish_with_no_registered_subscriber(
on_message_spy: mock.Mock,
create_rabbitmq_client: Callable[[str], RabbitMQClient],
random_exchange_name: Callable[[], str],
random_rabbit_message: Callable[..., PytestRabbitMessage],
mocked_message_parser: mock.AsyncMock,
topics: list[str] | None,
):
publisher = create_rabbitmq_client("publisher")
consumer = create_rabbitmq_client("consumer")

exchange_name = f"{random_exchange_name()}"

ttl_s: float = 0.1
topics_count: int = 1 if topics is None else len(topics)

async def _publish_random_message():
if topics is not None:
GitHK marked this conversation as resolved.
Show resolved Hide resolved
for topic in topics:
message = random_rabbit_message(topic=topic)
await publisher.publish(exchange_name, message)
else:
message = random_rabbit_message()
await publisher.publish(exchange_name, message)

async def _subscribe_consumer_to_queue():
await consumer.subscribe(
exchange_name,
mocked_message_parser,
topics=topics,
exclusive_queue=False,
message_ttl=int(ttl_s * 1000),
GitHK marked this conversation as resolved.
Show resolved Hide resolved
unexpected_error_max_attempts=_DEFAULT_UNEXPECTED_ERROR_MAX_ATTEMPTS,
unexpected_error_retry_delay_s=ttl_s,
)

async def _unsubscribe_consumer():
await consumer.unsubscribe_consumer(exchange_name)

# CASE 1 (subscribe immediately after publishing message)

await _subscribe_consumer_to_queue()
await _unsubscribe_consumer()
await _publish_random_message()
# reconnect immediately
await _subscribe_consumer_to_queue()
# expected to receive a message (one per topic)
await _assert_wait_for_messages(on_message_spy, 1 * topics_count)

# CASE 2 (no subscriber attached when publishing)
on_message_spy.reset_mock()

await _unsubscribe_consumer()
await _publish_random_message()
# wait for message to expire (will be dropped)
await asyncio.sleep(ttl_s * 2)
await _subscribe_consumer_to_queue()
# wait for a message to be possibly delivered
await asyncio.sleep(ttl_s * 2)
# nothing changed from before
await _assert_wait_for_messages(on_message_spy, 0)


async def test_rabbit_client_pub_sub_message_is_lost_if_no_consumer_present(
create_rabbitmq_client: Callable[[str], RabbitMQClient],
random_exchange_name: Callable[[], str],
Expand Down
Loading