Skip to content

Commit

Permalink
fix: update dead letter queue declaration to use passive check and re…
Browse files Browse the repository at this point in the history
…create if necessary
  • Loading branch information
fernandobandeira committed Dec 5, 2024
1 parent 22a33f8 commit a40c862
Showing 1 changed file with 45 additions and 28 deletions.
73 changes: 45 additions & 28 deletions internal/events/rabbitmq/consumer/declares.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,44 +85,61 @@ func (r *rabbitmqConsumer) queueBindDeclare(routingKeys []string) error {
}

func (r *rabbitmqConsumer) deadLetterDeclare(dlxName string) error {
err := r.chManager.Channel.ExchangeDeclare(
dlxName,
"fanout",
true,
false,
false,
false,
nil,
)
if err != nil {
return eris.Wrap(err, "failed to declare exchange")
dlxProps := amqp091.Table{
"x-queue-type": "quorum",
amqp091.QueueMessageTTLArg: 1000 * 60 * 60 * 24 * 14, // 14 days
amqp091.QueueMaxLenArg: 10000, // 10k messages
}

_, err = r.chManager.Channel.QueueDeclare(
// Attempt to passively declare the queue to check if it exists and has the same properties
_, err := r.chManager.Channel.QueueDeclarePassive(
dlxName,
true,
false,
false,
false,
amqp091.Table{
amqp091.QueueMessageTTLArg: 1000 * 60 * 60 * 24 * 14, // 14 day
amqp091.QueueMaxLenArg: 10000, // 10k messages
},
dlxProps,
)
if err != nil {
return eris.Wrap(err, "failed to declare queue")
}

err = r.chManager.Channel.QueueBind(
dlxName,
"",
dlxName,
false,
nil,
)
if err != nil {
return eris.Wrap(err, "failed to bind queue")
}
// Queue does not exist or properties are different, delete and recreate
_, _ = r.chManager.Channel.QueueDelete(dlxName, false, false, false)

err = r.chManager.Channel.ExchangeDeclare(
dlxName,
"fanout",
true,
false,
false,
false,
nil,
)
if err != nil {
return eris.Wrap(err, "failed to declare exchange")
}

_, err = r.chManager.Channel.QueueDeclare(
dlxName,
true,
false,
false,
false,
dlxProps,
)
if err != nil {
return eris.Wrap(err, "failed to declare queue")
}

err = r.chManager.Channel.QueueBind(
dlxName,
"",
dlxName,
false,
nil,
)
if err != nil {
return eris.Wrap(err, "failed to bind queue")
}
}
return nil
}

0 comments on commit a40c862

Please sign in to comment.