Skip to content

Commit

Permalink
feat: Nats Jetstream consumer add simple support for jetstream subjec…
Browse files Browse the repository at this point in the history
…ts (#11373)
  • Loading branch information
Bertram Holzer authored Jul 18, 2022
1 parent 5e418d7 commit 4766d0c
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 5 deletions.
20 changes: 20 additions & 0 deletions plugins/inputs/nats_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,19 @@ instances of telegraf can read from a NATS cluster in parallel.
servers = ["nats://localhost:4222"]

## subject(s) to consume
## If you use jetstream you need to set the subjects
## in jetstream_subjects
subjects = ["telegraf"]

## jetstream subjects
## jetstream is a streaming technology inside of nats.
## With jetstream the nats-server persists messages and
## a consumer can consume historical messages. This is
## useful when telegraf needs to restart it don't miss a
## message. You need to configure the nats-server:
## https://docs.nats.io/nats-concepts/jetstream.
jetstream_subjects = ["js_telegraf"]

## name a queue group
queue_group = "telegraf_consumers"

Expand Down Expand Up @@ -62,3 +73,12 @@ instances of telegraf can read from a NATS cluster in parallel.
[nats]: https://www.nats.io/about/
[input data formats]: /docs/DATA_FORMATS_INPUT.md
[queue group]: https://www.nats.io/documentation/concepts/nats-queueing/

## Metrics

Which data you will get depends on the subjects you consume from nats

## Example Output

Depends on the nats subject input
nats_consumer,host=[] value=1.9 1655972309339341000
47 changes: 42 additions & 5 deletions plugins/inputs/nats_consumer/nats_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type natsConsumer struct {
Username string `toml:"username"`
Password string `toml:"password"`
Credentials string `toml:"credentials"`
JsSubjects []string `toml:"jetstream_subjects"`

tls.ClientConfig

Expand All @@ -58,8 +59,10 @@ type natsConsumer struct {
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
MetricBuffer int `toml:"metric_buffer" deprecated:"0.10.3;2.0.0;option is ignored"`

conn *nats.Conn
subs []*nats.Subscription
conn *nats.Conn
jsConn nats.JetStreamContext
subs []*nats.Subscription
jsSubs []*nats.Subscription

parser parsers.Parser
// channel for all incoming NATS messages
Expand Down Expand Up @@ -142,6 +145,33 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error {

n.subs = append(n.subs, sub)
}

if len(n.JsSubjects) > 0 {
var connErr error
n.jsConn, connErr = n.conn.JetStream(nats.PublishAsyncMaxPending(256))
if connErr != nil {
return connErr
}

if n.jsConn != nil {
for _, jsSub := range n.JsSubjects {
sub, err := n.jsConn.QueueSubscribe(jsSub, n.QueueGroup, func(m *nats.Msg) {
n.in <- m
})
if err != nil {
return err
}

// set the subscription pending limits
err = sub.SetPendingLimits(n.PendingMessageLimit, n.PendingBytesLimit)
if err != nil {
return err
}

n.jsSubs = append(n.jsSubs, sub)
}
}
}
}

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -154,8 +184,8 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error {
go n.receiver(ctx)
}()

n.Log.Infof("Started the NATS consumer service, nats: %v, subjects: %v, queue: %v",
n.conn.ConnectedUrl(), n.Subjects, n.QueueGroup)
n.Log.Infof("Started the NATS consumer service, nats: %v, subjects: %v, jssubjects: %v, queue: %v",
n.conn.ConnectedUrl(), n.Subjects, n.JsSubjects, n.QueueGroup)

return nil
}
Expand Down Expand Up @@ -201,7 +231,14 @@ func (n *natsConsumer) clean() {
for _, sub := range n.subs {
if err := sub.Unsubscribe(); err != nil {
n.Log.Errorf("Error unsubscribing from subject %s in queue %s: %s",
sub.Subject, sub.Queue, err.Error())
sub.Subject, sub.Queue, err)
}
}

for _, sub := range n.jsSubs {
if err := sub.Unsubscribe(); err != nil {
n.Log.Errorf("Error unsubscribing from subject %s in queue %s: %s",
sub.Subject, sub.Queue, err)
}
}

Expand Down
11 changes: 11 additions & 0 deletions plugins/inputs/nats_consumer/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,19 @@
servers = ["nats://localhost:4222"]

## subject(s) to consume
## If you use jetstream you need to set the subjects
## in jetstream_subjects
subjects = ["telegraf"]

## jetstream subjects
## jetstream is a streaming technology inside of nats.
## With jetstream the nats-server persists messages and
## a consumer can consume historical messages. This is
## useful when telegraf needs to restart it don't miss a
## message. You need to configure the nats-server.
## https://docs.nats.io/nats-concepts/jetstream.
jetstream_subjects = ["js_telegraf"]

## name a queue group
queue_group = "telegraf_consumers"

Expand Down

0 comments on commit 4766d0c

Please sign in to comment.