diff --git a/plugins/inputs/nats_consumer/README.md b/plugins/inputs/nats_consumer/README.md index b3865dca74850..5426110307c0a 100644 --- a/plugins/inputs/nats_consumer/README.md +++ b/plugins/inputs/nats_consumer/README.md @@ -15,8 +15,19 @@ instances of telegraf can read from a NATS cluster in parallel. servers = ["nats://localhost:4222"] ## subject(s) to consume + ## If you use jetstream you need to set the subjects + ## in jetstream_subjects subjects = ["telegraf"] + ## jetstream subjects + ## jetstream is a streaming technology inside of nats. + ## With jetstream the nats-server persists messages and + ## a consumer can consume historical messages. This is + ## useful when telegraf needs to restart it don't miss a + ## message. You need to configure the nats-server: + ## https://docs.nats.io/nats-concepts/jetstream. + jetstream_subjects = ["js_telegraf"] + ## name a queue group queue_group = "telegraf_consumers" @@ -62,3 +73,12 @@ instances of telegraf can read from a NATS cluster in parallel. [nats]: https://www.nats.io/about/ [input data formats]: /docs/DATA_FORMATS_INPUT.md [queue group]: https://www.nats.io/documentation/concepts/nats-queueing/ + +## Metrics + +Which data you will get depends on the subjects you consume from nats + +## Example Output + +Depends on the nats subject input +nats_consumer,host=[] value=1.9 1655972309339341000 diff --git a/plugins/inputs/nats_consumer/nats_consumer.go b/plugins/inputs/nats_consumer/nats_consumer.go index 9adc62b60bd69..8105ff9b2c546 100644 --- a/plugins/inputs/nats_consumer/nats_consumer.go +++ b/plugins/inputs/nats_consumer/nats_consumer.go @@ -46,6 +46,7 @@ type natsConsumer struct { Username string `toml:"username"` Password string `toml:"password"` Credentials string `toml:"credentials"` + JsSubjects []string `toml:"jetstream_subjects"` tls.ClientConfig @@ -58,8 +59,10 @@ type natsConsumer struct { MaxUndeliveredMessages int `toml:"max_undelivered_messages"` MetricBuffer int `toml:"metric_buffer" deprecated:"0.10.3;2.0.0;option is ignored"` - conn *nats.Conn - subs []*nats.Subscription + conn *nats.Conn + jsConn nats.JetStreamContext + subs []*nats.Subscription + jsSubs []*nats.Subscription parser parsers.Parser // channel for all incoming NATS messages @@ -142,6 +145,33 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error { n.subs = append(n.subs, sub) } + + if len(n.JsSubjects) > 0 { + var connErr error + n.jsConn, connErr = n.conn.JetStream(nats.PublishAsyncMaxPending(256)) + if connErr != nil { + return connErr + } + + if n.jsConn != nil { + for _, jsSub := range n.JsSubjects { + sub, err := n.jsConn.QueueSubscribe(jsSub, n.QueueGroup, func(m *nats.Msg) { + n.in <- m + }) + if err != nil { + return err + } + + // set the subscription pending limits + err = sub.SetPendingLimits(n.PendingMessageLimit, n.PendingBytesLimit) + if err != nil { + return err + } + + n.jsSubs = append(n.jsSubs, sub) + } + } + } } ctx, cancel := context.WithCancel(context.Background()) @@ -154,8 +184,8 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error { go n.receiver(ctx) }() - n.Log.Infof("Started the NATS consumer service, nats: %v, subjects: %v, queue: %v", - n.conn.ConnectedUrl(), n.Subjects, n.QueueGroup) + n.Log.Infof("Started the NATS consumer service, nats: %v, subjects: %v, jssubjects: %v, queue: %v", + n.conn.ConnectedUrl(), n.Subjects, n.JsSubjects, n.QueueGroup) return nil } @@ -201,7 +231,14 @@ func (n *natsConsumer) clean() { for _, sub := range n.subs { if err := sub.Unsubscribe(); err != nil { n.Log.Errorf("Error unsubscribing from subject %s in queue %s: %s", - sub.Subject, sub.Queue, err.Error()) + sub.Subject, sub.Queue, err) + } + } + + for _, sub := range n.jsSubs { + if err := sub.Unsubscribe(); err != nil { + n.Log.Errorf("Error unsubscribing from subject %s in queue %s: %s", + sub.Subject, sub.Queue, err) } } diff --git a/plugins/inputs/nats_consumer/sample.conf b/plugins/inputs/nats_consumer/sample.conf index a498dadbc0832..1a4e2914fe0c3 100644 --- a/plugins/inputs/nats_consumer/sample.conf +++ b/plugins/inputs/nats_consumer/sample.conf @@ -4,8 +4,19 @@ servers = ["nats://localhost:4222"] ## subject(s) to consume + ## If you use jetstream you need to set the subjects + ## in jetstream_subjects subjects = ["telegraf"] + ## jetstream subjects + ## jetstream is a streaming technology inside of nats. + ## With jetstream the nats-server persists messages and + ## a consumer can consume historical messages. This is + ## useful when telegraf needs to restart it don't miss a + ## message. You need to configure the nats-server. + ## https://docs.nats.io/nats-concepts/jetstream. + jetstream_subjects = ["js_telegraf"] + ## name a queue group queue_group = "telegraf_consumers"