Skip to content

Commit

Permalink
feat: Nats Jetstream consumer influxdata#11046
Browse files Browse the repository at this point in the history
  • Loading branch information
Bertram Holzer committed Jun 23, 2022
1 parent 44a4df8 commit 0700bf8
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 4 deletions.
3 changes: 3 additions & 0 deletions plugins/inputs/nats_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ instances of telegraf can read from a NATS cluster in parallel.
## subject(s) to consume
subjects = ["telegraf"]

## jetstream subjects
js_subjects = ["js_telegraf"]

## name a queue group
queue_group = "telegraf_consumers"

Expand Down
44 changes: 40 additions & 4 deletions plugins/inputs/nats_consumer/nats_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type natsConsumer struct {
Username string `toml:"username"`
Password string `toml:"password"`
Credentials string `toml:"credentials"`
JsSubjects []string `toml:"js_subjects"`

tls.ClientConfig

Expand All @@ -58,8 +59,10 @@ type natsConsumer struct {
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
MetricBuffer int `toml:"metric_buffer" deprecated:"0.10.3;2.0.0;option is ignored"`

conn *nats.Conn
subs []*nats.Subscription
conn *nats.Conn
jsConn nats.JetStreamContext
subs []*nats.Subscription
jsSubs []*nats.Subscription

parser parsers.Parser
// channel for all incoming NATS messages
Expand Down Expand Up @@ -142,6 +145,32 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error {

n.subs = append(n.subs, sub)
}

if len(n.JsSubjects) > 0 {
var connErr error
n.jsConn, connErr = n.conn.JetStream(nats.PublishAsyncMaxPending(256))
if connErr != nil {
return connErr
}

for _, jsSub := range n.JsSubjects {
sub, err := n.jsConn.QueueSubscribe(jsSub, n.QueueGroup, func(m *nats.Msg) {
n.in <- m
})
if err != nil {
return err
}

// set the subscription pending limits
err = sub.SetPendingLimits(n.PendingMessageLimit, n.PendingBytesLimit)
if err != nil {
return err
}

n.jsSubs = append(n.jsSubs, sub)
}
}

}

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -154,8 +183,8 @@ func (n *natsConsumer) Start(acc telegraf.Accumulator) error {
go n.receiver(ctx)
}()

n.Log.Infof("Started the NATS consumer service, nats: %v, subjects: %v, queue: %v",
n.conn.ConnectedUrl(), n.Subjects, n.QueueGroup)
n.Log.Infof("Started the NATS consumer service, nats: %v, subjects: %v, jssubjects: %v, queue: %v",
n.conn.ConnectedUrl(), n.Subjects, n.JsSubjects, n.QueueGroup)

return nil
}
Expand Down Expand Up @@ -205,6 +234,13 @@ func (n *natsConsumer) clean() {
}
}

for _, sub := range n.jsSubs {
if err := sub.Unsubscribe(); err != nil {
n.Log.Errorf("Error unsubscribing from subject %s in queue %s: %s",
sub.Subject, sub.Queue, err.Error())
}
}

if n.conn != nil && !n.conn.IsClosed() {
n.conn.Close()
}
Expand Down
3 changes: 3 additions & 0 deletions plugins/inputs/nats_consumer/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
## subject(s) to consume
subjects = ["telegraf"]

## jetstream subjects
js_subjects = ["js_telegraf"]

## name a queue group
queue_group = "telegraf_consumers"

Expand Down

0 comments on commit 0700bf8

Please sign in to comment.