diff --git a/dramatiq/brokers/rabbitmq.py b/dramatiq/brokers/rabbitmq.py index b0db7752..de6c0657 100644 --- a/dramatiq/brokers/rabbitmq.py +++ b/dramatiq/brokers/rabbitmq.py @@ -225,8 +225,8 @@ def declare_queue(self, queue_name, *, ensure=False): Parameters: queue_name(str): The name of the new queue. - ensure(bool): When True, the queue is created immediately on - the server. + ensure(bool): When True, the queue is created on the server, + if necessary. Raises: ConnectionClosed: When ensure=True if the underlying channel @@ -308,8 +308,8 @@ def enqueue(self, message, *, delay=None): ConnectionClosed: If the underlying channel or connection has been closed. """ - queue_name = message.queue_name - self.declare_queue(queue_name, ensure=True) + canonical_queue_name = message.queue_name + queue_name = canonical_queue_name if delay is not None: queue_name = dq_name(queue_name) @@ -324,6 +324,7 @@ def enqueue(self, message, *, delay=None): attempts = 1 while True: try: + self.declare_queue(canonical_queue_name, ensure=True) self.logger.debug("Enqueueing message %r on queue %r.", message.message_id, queue_name) self.emit_before("enqueue", message, delay) self.channel.basic_publish( @@ -344,6 +345,12 @@ def enqueue(self, message, *, delay=None): # next caller/attempt may initiate new ones of each. del self.connection + # If the queue disappears, remove it from the known set + # so that it can be redeclared on retry or the next time + # a message is enqueued. + if getattr(e, "reply_code", None) == 404: + self.queues.remove(q_name(queue_name)) + attempts += 1 if attempts > MAX_ENQUEUE_ATTEMPTS: raise ConnectionClosed(e) from None diff --git a/tests/test_rabbitmq.py b/tests/test_rabbitmq.py index 08d18ec2..20539550 100644 --- a/tests/test_rabbitmq.py +++ b/tests/test_rabbitmq.py @@ -7,7 +7,7 @@ import pytest import dramatiq -from dramatiq import Message, QueueJoinTimeout, Worker +from dramatiq import Message, Middleware, QueueJoinTimeout, Worker from dramatiq.brokers.rabbitmq import RabbitmqBroker, URLRabbitmqBroker, _IgnoreScaryLogs from dramatiq.common import current_millis @@ -464,6 +464,50 @@ def do_work(): worker.stop() +def test_rabbitmq_broker_retries_declaring_queues_when_declared_queue_disappears(rabbitmq_broker): + executed = False + + # Given that I have an actor on a flaky queue + flaky_queue_name = "flaky_queue" + rabbitmq_broker.channel.queue_delete(flaky_queue_name) + + @dramatiq.actor(queue_name=flaky_queue_name) + def do_work(): + nonlocal executed + executed = True + + # When I start a server + worker = Worker(rabbitmq_broker, worker_threads=1) + worker.start() + + declared_ev = Event() + + class DeclaredMiddleware(Middleware): + def after_declare_queue(self, broker, queue_name): + if queue_name == flaky_queue_name: + declared_ev.set() + + # I expect that queue to be declared + rabbitmq_broker.add_middleware(DeclaredMiddleware()) + assert declared_ev.wait(timeout=5) + + # If I delete the queue + rabbitmq_broker.channel.queue_delete(do_work.queue_name) + with pytest.raises(pika.exceptions.ChannelClosedByBroker): + rabbitmq_broker.channel.queue_declare(do_work.queue_name, passive=True) + + # And I send that actor a message + do_work.send() + try: + rabbitmq_broker.join(do_work.queue_name, timeout=20000) + worker.join() + finally: + worker.stop() + + # Then the queue should be declared and the message executed + assert executed + + def test_rabbitmq_messages_that_failed_to_decode_are_rejected(rabbitmq_broker, rabbitmq_worker): # Given that I have an Actor @dramatiq.actor(max_retries=0)