Skip to content

Commit

Permalink
Work on locking
Browse files Browse the repository at this point in the history
  • Loading branch information
athornton committed May 19, 2023
1 parent 43cc6bd commit 030e013
Showing 1 changed file with 20 additions and 8 deletions.
28 changes: 20 additions & 8 deletions plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,6 @@ func (k *KafkaConsumer) refreshTopics() error {
// topics, and if we find a match, add that topic to
// out topic set, which then we turn back into a list at the end.

// We lock our internal mutex at the start of this function and
// defer its unlock until we leave; this provides synchronization
// so if we close the client, we aren't refreshing while closing.

k.topicLock.Lock()
defer k.topicLock.Unlock()

if len(k.regexps) == 0 {
return nil
}
Expand Down Expand Up @@ -251,8 +244,10 @@ func (k *KafkaConsumer) refreshTopics() error {
if fingerprint != k.fingerprint {
k.Log.Infof("updating topics: replacing '%v' with '%v'", k.allWantedTopics, topicList)
}
k.topicLock.Lock()
k.fingerprint = fingerprint
k.allWantedTopics = topicList
k.topicLock.Unlock()
return nil
}

Expand Down Expand Up @@ -332,7 +327,24 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parser, k.Log)
handler.MaxMessageLen = k.MaxMessageLen
handler.TopicTag = k.TopicTag
err := k.consumer.Consume(ctx, k.allWantedTopics, handler)
// We need to copy allWantedTopics; the Consume() is
// long-running and we can easily deadlock if our
// topic-update-checker fires.
//
// This means we just get all the topics in play
// at the time Consume() starts. That's better than
// nothing, but it means that we don't actually
// dynamically update what we're listening to.
//
// We probably need to signal this goroutine when we
// refresh our topics, and when we get that signal,
// stop the consumer, and then restart it with the
// new topic list.
topics := make([]string, len(k.allWantedTopics))
k.topicLock.Lock()
copy(topics, k.allWantedTopics)
k.topicLock.Unlock()
err := k.consumer.Consume(ctx, topics, handler)
if err != nil {
acc.AddError(fmt.Errorf("consume: %w", err))
internal.SleepContext(ctx, reconnectDelay) //nolint:errcheck // ignore returned error as we cannot do anything about it anyway
Expand Down

0 comments on commit 030e013

Please sign in to comment.