Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inputs.kafka_consumer): Add regular expression support for topics #11831

Merged
merged 21 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions plugins/inputs/kafka_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ to use them.
## Topics to consume.
topics = ["telegraf"]

## Topic regular expressions to consume. Matches will be added to topics.
## Example: topic_regexps = [ "*test", "metric[0-9A-z]*" ]
# topic_regexps = [ ]

## When set this tag will be added to all metrics with the topic as the value.
# topic_tag = ""

Expand Down
135 changes: 131 additions & 4 deletions plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"context"
_ "embed"
"fmt"
"regexp"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -41,6 +43,7 @@ type KafkaConsumer struct {
Offset string `toml:"offset"`
BalanceStrategy string `toml:"balance_strategy"`
Topics []string `toml:"topics"`
TopicRegexps []string `toml:"topic_regexps"`
TopicTag string `toml:"topic_tag"`
ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"`
ConnectionStrategy string `toml:"connection_strategy"`
Expand All @@ -55,9 +58,16 @@ type KafkaConsumer struct {
consumer ConsumerGroup
config *sarama.Config

parser parsers.Parser
wg sync.WaitGroup
cancel context.CancelFunc
topicClient sarama.Client
regexps []regexp.Regexp
allWantedTopics []string
ticker *time.Ticker
fingerprint string

parser parsers.Parser
topicLock sync.Mutex
wg sync.WaitGroup
cancel context.CancelFunc
}

type ConsumerGroup interface {
Expand Down Expand Up @@ -143,6 +153,100 @@ func (k *KafkaConsumer) Init() error {
}

k.config = cfg

if len(k.TopicRegexps) == 0 {
k.allWantedTopics = k.Topics
} else {
if err := k.compileTopicRegexps(); err != nil {
return err
}
// We have regexps, so we're going to need a client to ask
// the broker for topics
client, err := sarama.NewClient(k.Brokers, k.config)
if err != nil {
return err
}
k.topicClient = client
}

return nil
}

func (k *KafkaConsumer) compileTopicRegexps() error {
// While we can add new topics matching extant regexps, we can't
// update that list on the fly. We compile them once at startup.
// Changing them is a configuration change and requires a restart.

k.regexps = make([]regexp.Regexp, 0, len(k.TopicRegexps))
for _, r := range k.TopicRegexps {
re, err := regexp.Compile(r)
if err != nil {
return fmt.Errorf("regular expression %q did not compile: '%w", r, err)
}
k.regexps = append(k.regexps, *re)
}
return nil
}

func (k *KafkaConsumer) refreshTopics() error {
// We have instantiated a new generic Kafka client, so we can ask
// it for all the topics it knows about. Then we build
// regexps from our strings, loop over those, loop over the
// topics, and if we find a match, add that topic to
// out topic set, which then we turn back into a list at the end.

if len(k.regexps) == 0 {
return nil
}

allDiscoveredTopics, err := k.topicClient.Topics()
if err != nil {
return err
}
k.Log.Debugf("discovered topics: %v", allDiscoveredTopics)

extantTopicSet := make(map[string]bool, len(allDiscoveredTopics))
for _, t := range allDiscoveredTopics {
extantTopicSet[t] = true
}
// Even if a topic specified by a literal string (that is, k.Topics)
// does not appear in the topic list, we want to keep it around, in
// case it pops back up--it is not guaranteed to be matched by any
// of our regular expressions. Therefore, we pretend that it's in
// extantTopicSet, even if it isn't.
//
// Assuming that literally-specified topics are usually in the topics
// present on the broker, this should not need a resizing (although if
// you have many topics that you don't care about, it will be too big)
wantedTopicSet := make(map[string]bool, len(allDiscoveredTopics))
for _, t := range k.Topics {
// Get our pre-specified topics
k.Log.Debugf("adding literally-specified topic %s", t)
wantedTopicSet[t] = true
}
for _, t := range allDiscoveredTopics {
// Add topics that match regexps
for _, r := range k.regexps {
if r.MatchString(t) {
wantedTopicSet[t] = true
k.Log.Debugf("adding regexp-matched topic %q", t)
break
}
}
}
topicList := make([]string, 0, len(wantedTopicSet))
for t := range wantedTopicSet {
topicList = append(topicList, t)
}
sort.Strings(topicList)
fingerprint := strings.Join(topicList, ";")
if fingerprint != k.fingerprint {
k.Log.Infof("updating topics: replacing %q with %q", k.allWantedTopics, topicList)
}
k.topicLock.Lock()
k.fingerprint = fingerprint
k.allWantedTopics = topicList
k.topicLock.Unlock()
return nil
}

Expand Down Expand Up @@ -170,6 +274,13 @@ func (k *KafkaConsumer) startErrorAdder(acc telegraf.Accumulator) {
func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
var err error

// If TopicRegexps is set, add matches to Topics
if len(k.TopicRegexps) > 0 {
if err := k.refreshTopics(); err != nil {
return err
}
}

ctx, cancel := context.WithCancel(context.Background())
k.cancel = cancel

Expand Down Expand Up @@ -201,7 +312,14 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parser, k.Log)
handler.MaxMessageLen = k.MaxMessageLen
handler.TopicTag = k.TopicTag
err := k.consumer.Consume(ctx, k.Topics, handler)
// We need to copy allWantedTopics; the Consume() is
// long-running and we can easily deadlock if our
// topic-update-checker fires.
topics := make([]string, len(k.allWantedTopics))
k.topicLock.Lock()
copy(topics, k.allWantedTopics)
k.topicLock.Unlock()
err := k.consumer.Consume(ctx, topics, handler)
if err != nil {
acc.AddError(fmt.Errorf("consume: %w", err))
internal.SleepContext(ctx, reconnectDelay) //nolint:errcheck // ignore returned error as we cannot do anything about it anyway
Expand All @@ -221,6 +339,15 @@ func (k *KafkaConsumer) Gather(_ telegraf.Accumulator) error {
}

func (k *KafkaConsumer) Stop() {
if k.ticker != nil {
k.ticker.Stop()
}
// Lock so that a topic refresh cannot start while we are stopping.
k.topicLock.Lock()
defer k.topicLock.Unlock()
if k.topicClient != nil {
k.topicClient.Close()
}
k.cancel()
k.wg.Wait()
}
Expand Down
20 changes: 12 additions & 8 deletions plugins/inputs/kafka_consumer/kafka_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,11 +476,15 @@ func TestKafkaRoundTripIntegration(t *testing.T) {
}

var tests = []struct {
name string
connectionStrategy string
name string
connectionStrategy string
topics []string
topicRegexps []string
topicRefreshInterval config.Duration
}{
{"connection strategy startup", "startup"},
{"connection strategy defer", "defer"},
{"connection strategy startup", "startup", []string{"Test"}, nil, config.Duration(0)},
{"connection strategy defer", "defer", []string{"Test"}, nil, config.Duration(0)},
{"topic regexp", "startup", nil, []string{"T*"}, config.Duration(5 * time.Second)},
}

for _, tt := range tests {
Expand Down Expand Up @@ -513,7 +517,6 @@ func TestKafkaRoundTripIntegration(t *testing.T) {
defer zookeeper.Terminate()

t.Logf("rt: starting broker")
topic := "Test"
container := testutil.Container{
Name: "telegraf-test-kafka-consumer",
Image: "wurstmeister/kafka",
Expand All @@ -522,7 +525,7 @@ func TestKafkaRoundTripIntegration(t *testing.T) {
"KAFKA_ADVERTISED_HOST_NAME": "localhost",
"KAFKA_ADVERTISED_PORT": "9092",
"KAFKA_ZOOKEEPER_CONNECT": fmt.Sprintf("%s:%s", zookeeperName, zookeeper.Ports["2181"]),
"KAFKA_CREATE_TOPICS": fmt.Sprintf("%s:1:1", topic),
"KAFKA_CREATE_TOPICS": fmt.Sprintf("%s:1:1", "Test"),
},
Networks: []string{networkName},
WaitingFor: wait.ForLog("Log loaded for partition Test-0 with initial high watermark 0"),
Expand All @@ -544,7 +547,7 @@ func TestKafkaRoundTripIntegration(t *testing.T) {
require.NoError(t, s.Init())
output.SetSerializer(s)
output.Brokers = brokers
output.Topic = topic
output.Topic = "Test"
output.Log = testutil.Logger{}

require.NoError(t, output.Init())
Expand All @@ -555,7 +558,8 @@ func TestKafkaRoundTripIntegration(t *testing.T) {
input := KafkaConsumer{
Brokers: brokers,
Log: testutil.Logger{},
Topics: []string{topic},
Topics: tt.topics,
TopicRegexps: tt.topicRegexps,
MaxUndeliveredMessages: 1,
ConnectionStrategy: tt.connectionStrategy,
}
Expand Down
4 changes: 4 additions & 0 deletions plugins/inputs/kafka_consumer/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
## Topics to consume.
topics = ["telegraf"]

## Topic regular expressions to consume. Matches will be added to topics.
## Example: topic_regexps = [ "*test", "metric[0-9A-z]*" ]
# topic_regexps = [ ]

## When set this tag will be added to all metrics with the topic as the value.
# topic_tag = ""

Expand Down