Skip to content

Commit

Permalink
Add option to amqp output to publish persistent messages (influxdata#…
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored and Alain ROMEYER committed May 19, 2018
1 parent 1dfbe1c commit e37b4cc
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 4 deletions.
3 changes: 3 additions & 0 deletions plugins/outputs/amqp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ For an introduction to AMQP see:
## Telegraf tag to use as a routing key
## ie, if this tag exists, its value will be used as the routing key
routing_tag = "host"
## Delivery Mode controls if a published message is persistent
## Valid options are "transient" and "persistent". default: "transient"
# delivery_mode = "transient"
## InfluxDB retention policy
# retention_policy = "default"
Expand Down
28 changes: 24 additions & 4 deletions plugins/outputs/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type AMQP struct {
Precision string
// Connection timeout
Timeout internal.Duration
// Delivery Mode controls if a published message is persistent
// Valid options are "transient" and "persistent". default: "transient"
DeliveryMode string

// Path to CA file
SSLCA string `toml:"ssl_ca"`
Expand All @@ -52,7 +55,8 @@ type AMQP struct {
sync.Mutex
c *client

serializer serializers.Serializer
deliveryMode uint8
serializer serializers.Serializer
}

type externalAuth struct{}
Expand Down Expand Up @@ -82,6 +86,9 @@ var sampleConfig = `
## Telegraf tag to use as a routing key
## ie, if this tag exists, its value will be used as the routing key
routing_tag = "host"
## Delivery Mode controls if a published message is persistent
## Valid options are "transient" and "persistent". default: "transient"
delivery_mode = "transient"
## InfluxDB retention policy
# retention_policy = "default"
Expand Down Expand Up @@ -111,6 +118,18 @@ func (a *AMQP) SetSerializer(serializer serializers.Serializer) {
}

func (q *AMQP) Connect() error {
switch q.DeliveryMode {
case "transient":
q.deliveryMode = amqp.Transient
break
case "persistent":
q.deliveryMode = amqp.Persistent
break
default:
q.deliveryMode = amqp.Transient
break
}

headers := amqp.Table{
"database": q.Database,
"retention_policy": q.RetentionPolicy,
Expand Down Expand Up @@ -245,9 +264,10 @@ func (q *AMQP) Write(metrics []telegraf.Metric) error {
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: c.headers,
ContentType: "text/plain",
Body: buf,
Headers: c.headers,
ContentType: "text/plain",
Body: buf,
DeliveryMode: q.deliveryMode,
})
if err != nil {
return fmt.Errorf("Failed to send AMQP message: %s", err)
Expand Down

0 comments on commit e37b4cc

Please sign in to comment.