Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inputs.kafka_consumer): Add message headers as metric tags. #13924

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions plugins/inputs/kafka_consumer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,16 @@ to use them.
## Kafka brokers.
brokers = ["localhost:9092"]

## Set the minimal supported Kafka version. Should be a string contains
## 4 digits in case if it is 0 version and 3 digits for versions starting
## from 1.0.0 separated by dot. This setting enables the use of new
## Kafka features and APIs. Must be 0.10.2.0(used as default) or greater.
## Please, check the list of supported versions at
## https://pkg.go.dev/github.com/Shopify/sarama#SupportedVersions
## ex: kafka_version = "2.6.0"
## ex: kafka_version = "0.10.2.0"
# kafka_version = "0.10.2.0"

## Topics to consume.
topics = ["telegraf"]

Expand All @@ -53,14 +63,14 @@ to use them.
## When set this tag will be added to all metrics with the topic as the value.
# topic_tag = ""

## The list of Kafka message headers that should be pass as metric tags
## works only for Kafka version 0.11+, on lower versions the message headers
## are not available
# msg_headers_to_tags = []

## Optional Client id
# client_id = "Telegraf"

## Set the minimal supported Kafka version. Setting this enables the use of new
## Kafka features and APIs. Must be 0.10.2.0 or greater.
## ex: version = "1.1.0"
# version = ""

## Optional TLS Config
# enable_tls = false
# tls_ca = "/etc/telegraf/ca.pem"
Expand Down
40 changes: 38 additions & 2 deletions plugins/inputs/kafka_consumer/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type semaphore chan empty

type KafkaConsumer struct {
Brokers []string `toml:"brokers"`
Version string `toml:"kafka_version"`
ConsumerGroup string `toml:"consumer_group"`
MaxMessageLen int `toml:"max_message_len"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
Expand All @@ -44,6 +45,7 @@ type KafkaConsumer struct {
Topics []string `toml:"topics"`
TopicRegexps []string `toml:"topic_regexps"`
TopicTag string `toml:"topic_tag"`
MsgHeadersAsTags []string `toml:"msg_headers_as_tags"`
ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"`
ConnectionStrategy string `toml:"connection_strategy"`

Expand Down Expand Up @@ -109,7 +111,15 @@ func (k *KafkaConsumer) Init() error {
cfg := sarama.NewConfig()

// Kafka version 0.10.2.0 is required for consumer groups.
// Try to parse version from config. If can not, set default
cfg.Version = sarama.V0_10_2_0
if k.Version != "" {
version, err := sarama.ParseKafkaVersion(k.Version)
if err != nil {
return fmt.Errorf("invalid version: %w", err)
}
cfg.Version = version
}

if err := k.SetConfig(cfg, k.Log); err != nil {
return fmt.Errorf("SetConfig: %w", err)
Expand Down Expand Up @@ -311,6 +321,15 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error {
handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parserFunc, k.Log)
handler.MaxMessageLen = k.MaxMessageLen
handler.TopicTag = k.TopicTag
//if message headers list specified, put it as map to handler
msgHeadersMap := make(map[string]bool, len(k.MsgHeadersAsTags))
if len(k.MsgHeadersAsTags) > 0 {
for _, header := range k.MsgHeadersAsTags {
msgHeadersMap[header] = true
}
}
handler.MsgHeadersToTags = msgHeadersMap

// We need to copy allWantedTopics; the Consume() is
// long-running and we can easily deadlock if our
// topic-update-checker fires.
Expand Down Expand Up @@ -371,8 +390,9 @@ func NewConsumerGroupHandler(acc telegraf.Accumulator, maxUndelivered int, fn te

// ConsumerGroupHandler is a sarama.ConsumerGroupHandler implementation.
type ConsumerGroupHandler struct {
MaxMessageLen int
TopicTag string
MaxMessageLen int
TopicTag string
MsgHeadersToTags map[string]bool

acc telegraf.TrackingAccumulator
sem semaphore
Expand Down Expand Up @@ -467,6 +487,22 @@ func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg *
return err
}

// Check if any message header should be pass as tag
header_key := ""
if len(h.MsgHeadersToTags) > 0 {
for _, header := range msg.Headers {
//convert to a string as the header and value are byte arrays.
header_key = string(header.Key)
if _, exists := h.MsgHeadersToTags[header_key]; exists {
// If message header should be pass as tag then add it to the metrics
for _, metric := range metrics {
metric.AddTag(header_key, string(header.Value))
}
}
}
}

// Add topic name as tag with TopicTag name specified in the config
if len(h.TopicTag) > 0 {
for _, metric := range metrics {
metric.AddTag(h.TopicTag, msg.Topic)
Expand Down
20 changes: 15 additions & 5 deletions plugins/inputs/kafka_consumer/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,16 @@
## Kafka brokers.
brokers = ["localhost:9092"]

## Set the minimal supported Kafka version. Should be a string contains
## 4 digits in case if it is 0 version and 3 digits for versions starting
## from 1.0.0 separated by dot. This setting enables the use of new
## Kafka features and APIs. Must be 0.10.2.0(used as default) or greater.
## Please, check the list of supported versions at
## https://pkg.go.dev/github.com/Shopify/sarama#SupportedVersions
## ex: kafka_version = "2.6.0"
## ex: kafka_version = "0.10.2.0"
# kafka_version = "0.10.2.0"

## Topics to consume.
topics = ["telegraf"]

Expand All @@ -13,14 +23,14 @@
## When set this tag will be added to all metrics with the topic as the value.
# topic_tag = ""

## The list of Kafka message headers that should be pass as metric tags
## works only for Kafka version 0.11+, on lower versions the message headers
## are not available
# msg_headers_to_tags = []

## Optional Client id
# client_id = "Telegraf"

## Set the minimal supported Kafka version. Setting this enables the use of new
## Kafka features and APIs. Must be 0.10.2.0 or greater.
## ex: version = "1.1.0"
# version = ""

## Optional TLS Config
# enable_tls = false
# tls_ca = "/etc/telegraf/ca.pem"
Expand Down
Loading