Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: detect the missing of rabbitmq queue and redeclare it. #556

Merged
merged 4 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions dramatiq/brokers/rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,8 @@ def declare_queue(self, queue_name, *, ensure=False):

Parameters:
queue_name(str): The name of the new queue.
ensure(bool): When True, the queue is created immediately on
the server.
ensure(bool): When True, the queue is created on the server,
if necessary.

Raises:
ConnectionClosed: When ensure=True if the underlying channel
Expand Down Expand Up @@ -308,8 +308,8 @@ def enqueue(self, message, *, delay=None):
ConnectionClosed: If the underlying channel or connection
has been closed.
"""
queue_name = message.queue_name
self.declare_queue(queue_name, ensure=True)
canonical_queue_name = message.queue_name
queue_name = canonical_queue_name

if delay is not None:
queue_name = dq_name(queue_name)
Expand All @@ -324,6 +324,7 @@ def enqueue(self, message, *, delay=None):
attempts = 1
while True:
try:
self.declare_queue(canonical_queue_name, ensure=True)
self.logger.debug("Enqueueing message %r on queue %r.", message.message_id, queue_name)
self.emit_before("enqueue", message, delay)
self.channel.basic_publish(
Expand All @@ -344,6 +345,12 @@ def enqueue(self, message, *, delay=None):
# next caller/attempt may initiate new ones of each.
del self.connection

# If the queue disappears, remove it from the known set
# so that it can be redeclared on retry or the next time
# a message is enqueued.
if getattr(e, "reply_code", None) == 404:
self.queues.remove(q_name(queue_name))

attempts += 1
if attempts > MAX_ENQUEUE_ATTEMPTS:
raise ConnectionClosed(e) from None
Expand Down
46 changes: 45 additions & 1 deletion tests/test_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pytest

import dramatiq
from dramatiq import Message, QueueJoinTimeout, Worker
from dramatiq import Message, Middleware, QueueJoinTimeout, Worker
from dramatiq.brokers.rabbitmq import RabbitmqBroker, URLRabbitmqBroker, _IgnoreScaryLogs
from dramatiq.common import current_millis

Expand Down Expand Up @@ -464,6 +464,50 @@ def do_work():
worker.stop()


def test_rabbitmq_broker_retries_declaring_queues_when_declared_queue_disappears(rabbitmq_broker):
JetDrag marked this conversation as resolved.
Show resolved Hide resolved
executed = False

# Given that I have an actor on a flaky queue
flaky_queue_name = "flaky_queue"
rabbitmq_broker.channel.queue_delete(flaky_queue_name)

@dramatiq.actor(queue_name=flaky_queue_name)
def do_work():
nonlocal executed
executed = True

# When I start a server
worker = Worker(rabbitmq_broker, worker_threads=1)
worker.start()

declared_ev = Event()

class DeclaredMiddleware(Middleware):
def after_declare_queue(self, broker, queue_name):
if queue_name == flaky_queue_name:
declared_ev.set()

# I expect that queue to be declared
rabbitmq_broker.add_middleware(DeclaredMiddleware())
assert declared_ev.wait(timeout=5)

# If I delete the queue
rabbitmq_broker.channel.queue_delete(do_work.queue_name)
with pytest.raises(pika.exceptions.ChannelClosedByBroker):
rabbitmq_broker.channel.queue_declare(do_work.queue_name, passive=True)

# And I send that actor a message
do_work.send()
try:
rabbitmq_broker.join(do_work.queue_name, timeout=20000)
worker.join()
finally:
worker.stop()

# Then the queue should be declared and the message executed
assert executed


def test_rabbitmq_messages_that_failed_to_decode_are_rejected(rabbitmq_broker, rabbitmq_worker):
# Given that I have an Actor
@dramatiq.actor(max_retries=0)
Expand Down
Loading