Skip to content

Commit

Permalink
Add queue_durability parameter to amqp_consumer input (influxdata#4628)
Browse files Browse the repository at this point in the history
  • Loading branch information
snubydev authored and rgitzel committed Oct 17, 2018
1 parent 69e538d commit cdb3191
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 9 deletions.
4 changes: 4 additions & 0 deletions plugins/inputs/amqp_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ The following defaults are known to work with RabbitMQ:

## AMQP queue name
queue = "telegraf"

## AMQP queue durability can be "transient" or "durable".
queue_durability = "durable"

## Binding Key
binding_key = "#"

Expand Down
34 changes: 25 additions & 9 deletions plugins/inputs/amqp_consumer/amqp_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ type AMQPConsumer struct {
ExchangeArguments map[string]string `toml:"exchange_arguments"`

// Queue Name
Queue string
Queue string `toml:"queue"`
QueueDurability string `toml:"queue_durability"`

// Binding Key
BindingKey string `toml:"binding_key"`

Expand Down Expand Up @@ -64,6 +66,8 @@ const (
DefaultExchangeType = "topic"
DefaultExchangeDurability = "durable"

DefaultQueueDurability = "durable"

DefaultPrefetchCount = 50
)

Expand Down Expand Up @@ -98,10 +102,13 @@ func (a *AMQPConsumer) SampleConfig() string {
# exchange_arguments = { }
# exchange_arguments = {"hash_propery" = "timestamp"}
## AMQP queue name
## AMQP queue name.
queue = "telegraf"
## Binding Key
## AMQP queue durability can be "transient" or "durable".
queue_durability = "durable"
## Binding Key.
binding_key = "#"
## Maximum number of messages server should give to the worker.
Expand Down Expand Up @@ -260,13 +267,21 @@ func (a *AMQPConsumer) connect(amqpConf *amqp.Config) (<-chan amqp.Delivery, err
return nil, err
}

var queueDurable = true
switch a.QueueDurability {
case "transient":
queueDurable = false
default:
queueDurable = true
}

q, err := ch.QueueDeclare(
a.Queue, // queue
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
a.Queue, // queue
queueDurable, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return nil, fmt.Errorf("Failed to declare a queue: %s", err)
Expand Down Expand Up @@ -380,6 +395,7 @@ func init() {
AuthMethod: DefaultAuthMethod,
ExchangeType: DefaultExchangeType,
ExchangeDurability: DefaultExchangeDurability,
QueueDurability: DefaultQueueDurability,
PrefetchCount: DefaultPrefetchCount,
}
})
Expand Down

0 comments on commit cdb3191

Please sign in to comment.