Skip to content

Commit

Permalink
feat(inputs.kafka_consumer): Add message headers as metric tags. (#13924
Browse files Browse the repository at this point in the history
)
  • Loading branch information
BelousovAntonV authored Sep 20, 2023
1 parent a542899 commit 5ab2468
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 12 deletions.
20 changes: 15 additions & 5 deletions plugins/inputs/kafka_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ to use them.
## Kafka brokers.
brokers = ["localhost:9092"]

## Set the minimal supported Kafka version. Should be a string contains
## 4 digits in case if it is 0 version and 3 digits for versions starting
## from 1.0.0 separated by dot. This setting enables the use of new
## Kafka features and APIs. Must be 0.10.2.0(used as default) or greater.
## Please, check the list of supported versions at
## https://pkg.go.dev/github.com/Shopify/sarama#SupportedVersions
## ex: kafka_version = "2.6.0"
## ex: kafka_version = "0.10.2.0"
# kafka_version = "0.10.2.0"

## Topics to consume.
topics = ["telegraf"]

Expand All @@ -53,14 +63,14 @@ to use them.
## When set this tag will be added to all metrics with the topic as the value.
# topic_tag = ""

## The list of Kafka message headers that should be pass as metric tags
## works only for Kafka version 0.11+, on lower versions the message headers
## are not available
# msg_headers_to_tags = []

## Optional Client id
# client_id = "Telegraf"

## Set the minimal supported Kafka version. Setting this enables the use of new
## Kafka features and APIs. Must be 0.10.2.0 or greater.
## ex: version = "1.1.0"
# version = ""

## Optional TLS Config
# enable_tls = false
# tls_ca = "/etc/telegraf/ca.pem"
Expand Down
40 changes: 38 additions & 2 deletions plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type semaphore chan empty

type KafkaConsumer struct {
Brokers []string `toml:"brokers"`
Version string `toml:"kafka_version"`
ConsumerGroup string `toml:"consumer_group"`
MaxMessageLen int `toml:"max_message_len"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
Expand All @@ -44,6 +45,7 @@ type KafkaConsumer struct {
Topics []string `toml:"topics"`
TopicRegexps []string `toml:"topic_regexps"`
TopicTag string `toml:"topic_tag"`
MsgHeadersAsTags []string `toml:"msg_headers_as_tags"`
ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"`
ConnectionStrategy string `toml:"connection_strategy"`

Expand Down Expand Up @@ -109,7 +111,15 @@ func (k *KafkaConsumer) Init() error {
cfg := sarama.NewConfig()

// Kafka version 0.10.2.0 is required for consumer groups.
// Try to parse version from config. If can not, set default
cfg.Version = sarama.V0_10_2_0
if k.Version != "" {
version, err := sarama.ParseKafkaVersion(k.Version)
if err != nil {
return fmt.Errorf("invalid version: %w", err)
}
cfg.Version = version
}

if err := k.SetConfig(cfg, k.Log); err != nil {
return fmt.Errorf("SetConfig: %w", err)
Expand Down Expand Up @@ -311,6 +321,15 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parserFunc, k.Log)
handler.MaxMessageLen = k.MaxMessageLen
handler.TopicTag = k.TopicTag
//if message headers list specified, put it as map to handler
msgHeadersMap := make(map[string]bool, len(k.MsgHeadersAsTags))
if len(k.MsgHeadersAsTags) > 0 {
for _, header := range k.MsgHeadersAsTags {
msgHeadersMap[header] = true
}
}
handler.MsgHeadersToTags = msgHeadersMap

// We need to copy allWantedTopics; the Consume() is
// long-running and we can easily deadlock if our
// topic-update-checker fires.
Expand Down Expand Up @@ -371,8 +390,9 @@ func NewConsumerGroupHandler(acc telegraf.Accumulator, maxUndelivered int, fn te

// ConsumerGroupHandler is a sarama.ConsumerGroupHandler implementation.
type ConsumerGroupHandler struct {
MaxMessageLen int
TopicTag string
MaxMessageLen int
TopicTag string
MsgHeadersToTags map[string]bool

acc telegraf.TrackingAccumulator
sem semaphore
Expand Down Expand Up @@ -467,6 +487,22 @@ func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *
return err
}

// Check if any message header should be pass as tag
header_key := ""
if len(h.MsgHeadersToTags) > 0 {
for _, header := range msg.Headers {
//convert to a string as the header and value are byte arrays.
header_key = string(header.Key)
if _, exists := h.MsgHeadersToTags[header_key]; exists {
// If message header should be pass as tag then add it to the metrics
for _, metric := range metrics {
metric.AddTag(header_key, string(header.Value))
}
}
}
}

// Add topic name as tag with TopicTag name specified in the config
if len(h.TopicTag) > 0 {
for _, metric := range metrics {
metric.AddTag(h.TopicTag, msg.Topic)
Expand Down
20 changes: 15 additions & 5 deletions plugins/inputs/kafka_consumer/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@
## Kafka brokers.
brokers = ["localhost:9092"]

## Set the minimal supported Kafka version. Should be a string contains
## 4 digits in case if it is 0 version and 3 digits for versions starting
## from 1.0.0 separated by dot. This setting enables the use of new
## Kafka features and APIs. Must be 0.10.2.0(used as default) or greater.
## Please, check the list of supported versions at
## https://pkg.go.dev/github.com/Shopify/sarama#SupportedVersions
## ex: kafka_version = "2.6.0"
## ex: kafka_version = "0.10.2.0"
# kafka_version = "0.10.2.0"

## Topics to consume.
topics = ["telegraf"]

Expand All @@ -13,14 +23,14 @@
## When set this tag will be added to all metrics with the topic as the value.
# topic_tag = ""

## The list of Kafka message headers that should be pass as metric tags
## works only for Kafka version 0.11+, on lower versions the message headers
## are not available
# msg_headers_to_tags = []

## Optional Client id
# client_id = "Telegraf"

## Set the minimal supported Kafka version. Setting this enables the use of new
## Kafka features and APIs. Must be 0.10.2.0 or greater.
## ex: version = "1.1.0"
# version = ""

## Optional TLS Config
# enable_tls = false
# tls_ca = "/etc/telegraf/ca.pem"
Expand Down

0 comments on commit 5ab2468

Please sign in to comment.