Skip to content

Commit

Permalink
fix: detect the missing of rabbitmq queue and redeclare it.
Browse files Browse the repository at this point in the history
  • Loading branch information
JetDrag committed Jun 29, 2023
1 parent 64fff96 commit 06d6f17
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 8 deletions.
23 changes: 16 additions & 7 deletions dramatiq/brokers/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from threading import Event, local

import pika
from pika.exceptions import ChannelClosedByBroker

from ..broker import Broker, Consumer, MessageProxy
from ..common import current_millis, dq_name, q_name, xq_name
Expand Down Expand Up @@ -85,7 +86,8 @@ class RabbitmqBroker(Broker):
.. _ConnectionParameters: https://pika.readthedocs.io/en/0.12.0/modules/parameters.html
"""

def __init__(self, *, confirm_delivery=False, url=None, middleware=None, max_priority=None, parameters=None, **kwargs):
def __init__(self, *, confirm_delivery=False, url=None, middleware=None, max_priority=None, parameters=None,
**kwargs):
super().__init__(middleware=middleware)

if max_priority is not None and not (0 < max_priority <= 255):
Expand Down Expand Up @@ -216,17 +218,19 @@ def consume(self, queue_name, prefetch=1, timeout=5000):
Returns:
Consumer: A consumer that retrieves messages from RabbitMQ.
"""
self.declare_queue(queue_name, ensure=True)
self.declare_queue(queue_name, ensure=True, strict=True)
return self.consumer_class(self.parameters, queue_name, prefetch, timeout)

def declare_queue(self, queue_name, *, ensure=False):
def declare_queue(self, queue_name, *, ensure=False, strict=False):
"""Declare a queue. Has no effect if a queue with the given
name already exists.
Parameters:
queue_name(str): The name of the new queue.
ensure(bool): When True, the queue is created immediately on
the server.
strict(bool): When True, make sure queue is created on the
server.
Raises:
ConnectionClosed: When ensure=True if the underlying channel
Expand All @@ -242,14 +246,14 @@ def declare_queue(self, queue_name, *, ensure=False):
self.delay_queues.add(delayed_name)
self.emit_after("declare_delay_queue", delayed_name)

if ensure:
self._ensure_queue(queue_name)
if ensure or strict:
self._ensure_queue(queue_name, strict)

def _ensure_queue(self, queue_name):
def _ensure_queue(self, queue_name, strict):
attempts = 1
while True:
try:
if queue_name in self.queues_pending:
if strict or queue_name in self.queues_pending:
self._declare_queue(queue_name)
self._declare_dq_queue(queue_name)
self._declare_xq_queue(queue_name)
Expand Down Expand Up @@ -344,6 +348,11 @@ def enqueue(self, message, *, delay=None):
# next caller/attempt may initiate new ones of each.
del self.connection

if isinstance(e, ChannelClosedByBroker) and e.reply_code == 404 and \
e.reply_text.startswith("NOT_FOUND - no queue"):
self.queues_pending.add(queue_name(queue_name))
raise ConnectionClosed(e) from None

attempts += 1
if attempts > MAX_ENQUEUE_ATTEMPTS:
raise ConnectionClosed(e) from None
Expand Down
35 changes: 34 additions & 1 deletion tests/test_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ def test_rabbitmq_broker_raises_an_error_if_given_invalid_parameter_combinations
# When I try to give it both a connection URL and a list of connection parameters
# Then a RuntimeError should be raised
with pytest.raises(RuntimeError):
RabbitmqBroker(url="amqp://127.0.0.1:5672", parameters=[dict(host="127.0.0.1", credentials=RABBITMQ_CREDENTIALS)])
RabbitmqBroker(url="amqp://127.0.0.1:5672",
parameters=[dict(host="127.0.0.1", credentials=RABBITMQ_CREDENTIALS)])

# When I try to give it both a connection URL and pika connection parameters
# Then a RuntimeError should be raised
Expand Down Expand Up @@ -460,6 +461,38 @@ def do_work():
worker.stop()


def test_rabbitmq_broker_retries_declaring_queues_when_declared_queues_is_gone(rabbitmq_broker):
executed = False

# I declare an actor
@dramatiq.actor(queue_name="flaky_queue")
def do_work():
nonlocal executed
executed = True

# Let worker to ensure_queue and consume message
worker = Worker(rabbitmq_broker, worker_threads=1)
worker.start()

# Check the queue is declared
rabbitmq_broker.channel.queue_declare(do_work.queue_name, passive=True)
# Let the queue go unexpectedly
rabbitmq_broker.channel.queue_delete(do_work.queue_name)
with pytest.raises(pika.exceptions.ChannelClosedByBroker):
rabbitmq_broker.channel.queue_declare(do_work.queue_name, passive=True)

# And I send that actor a message
do_work.send()
try:
rabbitmq_broker.join(do_work.queue_name, timeout=20000)
worker.join()

# Then the queue should eventually be declared and the message executed
assert executed
finally:
worker.stop()


def test_rabbitmq_messages_that_failed_to_decode_are_rejected(rabbitmq_broker, rabbitmq_worker):
# Given that I have an Actor
@dramatiq.actor(max_retries=0)
Expand Down

0 comments on commit 06d6f17

Please sign in to comment.