From e6622e06ebbfa701804ad7ed52352bc619595362 Mon Sep 17 00:00:00 2001 From: Anton Belousov Date: Tue, 12 Sep 2023 23:12:04 +0400 Subject: [PATCH 1/7] Pass message headers as tags --- plugins/inputs/kafka_consumer/README.md | 20 +++++++--- .../inputs/kafka_consumer/kafka_consumer.go | 38 +++++++++++++++++-- plugins/inputs/kafka_consumer/sample.conf | 20 +++++++--- 3 files changed, 65 insertions(+), 13 deletions(-) diff --git a/plugins/inputs/kafka_consumer/README.md b/plugins/inputs/kafka_consumer/README.md index 0198d2b97574a..b182cb2107db2 100644 --- a/plugins/inputs/kafka_consumer/README.md +++ b/plugins/inputs/kafka_consumer/README.md @@ -43,6 +43,16 @@ to use them. ## Kafka brokers. brokers = ["localhost:9092"] + ## Set the minimal supported Kafka version. Should be a string contains + ## 4 digits in case if it is 0 version and 3 digits for versions starting + ## from 1.0.0 separated by dot. This setting enables the use of new + ## Kafka features and APIs. Must be 0.10.2.0(used as default) or greater. + ## Please, check the list of supported versions at + ## https://pkg.go.dev/github.com/Shopify/sarama#SupportedVersions + ## ex: kafka_version = "2.6.0" + ## ex: kafka_version = "0.10.2.0" + # kafka_version = "0.10.2.0" + ## Topics to consume. topics = ["telegraf"] @@ -53,14 +63,14 @@ to use them. ## When set this tag will be added to all metrics with the topic as the value. # topic_tag = "" + ## The list of Kafka message headers that should be pass as metric tags + ## works only for Kafka version 0.11+, on lower versions the message headers + ## are not available + # msg_headers_to_tags = ['Country','City'] + ## Optional Client id # client_id = "Telegraf" - ## Set the minimal supported Kafka version. Setting this enables the use of new - ## Kafka features and APIs. Must be 0.10.2.0 or greater. - ## ex: version = "1.1.0" - # version = "" - ## Optional TLS Config # enable_tls = false # tls_ca = "/etc/telegraf/ca.pem" diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 695c7a6017c8a..9d15db220dbae 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -35,6 +35,7 @@ type semaphore chan empty type KafkaConsumer struct { Brokers []string `toml:"brokers"` + Version string `toml:"kafka_version"` ConsumerGroup string `toml:"consumer_group"` MaxMessageLen int `toml:"max_message_len"` MaxUndeliveredMessages int `toml:"max_undelivered_messages"` @@ -44,6 +45,7 @@ type KafkaConsumer struct { Topics []string `toml:"topics"` TopicRegexps []string `toml:"topic_regexps"` TopicTag string `toml:"topic_tag"` + MsgHeadersAsTags []string `toml:"msg_headers_as_tags"` ConsumerFetchDefault config.Size `toml:"consumer_fetch_default"` ConnectionStrategy string `toml:"connection_strategy"` @@ -109,7 +111,12 @@ func (k *KafkaConsumer) Init() error { cfg := sarama.NewConfig() // Kafka version 0.10.2.0 is required for consumer groups. - cfg.Version = sarama.V0_10_2_0 + // Try to parse version from config. If can not, set default + if KafkaVersion, err := sarama.ParseKafkaVersion(k.Version); err == nil { + cfg.Version = KafkaVersion + } else { + cfg.Version = sarama.V0_10_2_0 + } if err := k.SetConfig(cfg, k.Log); err != nil { return fmt.Errorf("SetConfig: %w", err) @@ -311,6 +318,15 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error { handler := NewConsumerGroupHandler(acc, k.MaxUndeliveredMessages, k.parserFunc, k.Log) handler.MaxMessageLen = k.MaxMessageLen handler.TopicTag = k.TopicTag + //if message headers list specified, put it as map to handler + MsgHeadersMap := make(map[string]bool) + if len(k.MsgHeadersAsTags) > 0 { + for _, header := range k.MsgHeadersAsTags { + MsgHeadersMap[header] = true + } + } + handler.MsgHeadersToTags = MsgHeadersMap + // We need to copy allWantedTopics; the Consume() is // long-running and we can easily deadlock if our // topic-update-checker fires. @@ -371,8 +387,9 @@ func NewConsumerGroupHandler(acc telegraf.Accumulator, maxUndelivered int, fn te // ConsumerGroupHandler is a sarama.ConsumerGroupHandler implementation. type ConsumerGroupHandler struct { - MaxMessageLen int - TopicTag string + MaxMessageLen int + TopicTag string + MsgHeadersToTags map[string]bool acc telegraf.TrackingAccumulator sem semaphore @@ -467,6 +484,21 @@ func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg * return err } + // Check if any message header should be pass as tag + header_key := "" + if len(h.MsgHeadersToTags) > 0 { + for _, header := range msg.Headers { + header_key = string(header.Key) + if _, exists := h.MsgHeadersToTags[header_key]; exists { + // If message header should be pass as tags, add it to metrics + for _, metric := range metrics { + metric.AddTag(header_key, string(header.Value)) + } + } + } + } + + // Add topic name as tag with TopicTag name specified in the config if len(h.TopicTag) > 0 { for _, metric := range metrics { metric.AddTag(h.TopicTag, msg.Topic) diff --git a/plugins/inputs/kafka_consumer/sample.conf b/plugins/inputs/kafka_consumer/sample.conf index c618d16107917..054916a11ea72 100644 --- a/plugins/inputs/kafka_consumer/sample.conf +++ b/plugins/inputs/kafka_consumer/sample.conf @@ -3,6 +3,16 @@ ## Kafka brokers. brokers = ["localhost:9092"] + ## Set the minimal supported Kafka version. Should be a string contains + ## 4 digits in case if it is 0 version and 3 digits for versions starting + ## from 1.0.0 separated by dot. This setting enables the use of new + ## Kafka features and APIs. Must be 0.10.2.0(used as default) or greater. + ## Please, check the list of supported versions at + ## https://pkg.go.dev/github.com/Shopify/sarama#SupportedVersions + ## ex: kafka_version = "2.6.0" + ## ex: kafka_version = "0.10.2.0" + # kafka_version = "0.10.2.0" + ## Topics to consume. topics = ["telegraf"] @@ -13,14 +23,14 @@ ## When set this tag will be added to all metrics with the topic as the value. # topic_tag = "" + ## The list of Kafka message headers that should be pass as metric tags + ## works only for Kafka version 0.11+, on lower versions the message headers + ## are not available + # msg_headers_to_tags = ['Country','City'] + ## Optional Client id # client_id = "Telegraf" - ## Set the minimal supported Kafka version. Setting this enables the use of new - ## Kafka features and APIs. Must be 0.10.2.0 or greater. - ## ex: version = "1.1.0" - # version = "" - ## Optional TLS Config # enable_tls = false # tls_ca = "/etc/telegraf/ca.pem" From f36f0b7582b126893c0f01b0f2f903a500badc32 Mon Sep 17 00:00:00 2001 From: Anton Belousov Date: Thu, 14 Sep 2023 19:56:46 +0400 Subject: [PATCH 2/7] fix: update comments and commit message --- plugins/inputs/kafka_consumer/kafka_consumer.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 9d15db220dbae..c563aaf9d9037 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -488,9 +488,10 @@ func (h *ConsumerGroupHandler) Handle(session sarama.ConsumerGroupSession, msg * header_key := "" if len(h.MsgHeadersToTags) > 0 { for _, header := range msg.Headers { + //convert to a string as the header and value are byte arrays. header_key = string(header.Key) if _, exists := h.MsgHeadersToTags[header_key]; exists { - // If message header should be pass as tags, add it to metrics + // If message header should be pass as tag then add it to the metrics for _, metric := range metrics { metric.AddTag(header_key, string(header.Value)) } From 2081acf0bd772aa6dc5405c8b8fb9cf79970a165 Mon Sep 17 00:00:00 2001 From: Anton Belousov <133366121+BelousovAntonV@users.noreply.github.com> Date: Fri, 15 Sep 2023 20:27:48 +0400 Subject: [PATCH 3/7] fix: set default value for option Co-authored-by: Joshua Powers --- plugins/inputs/kafka_consumer/sample.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/inputs/kafka_consumer/sample.conf b/plugins/inputs/kafka_consumer/sample.conf index 054916a11ea72..28fdb215a2680 100644 --- a/plugins/inputs/kafka_consumer/sample.conf +++ b/plugins/inputs/kafka_consumer/sample.conf @@ -26,7 +26,7 @@ ## The list of Kafka message headers that should be pass as metric tags ## works only for Kafka version 0.11+, on lower versions the message headers ## are not available - # msg_headers_to_tags = ['Country','City'] + # msg_headers_to_tags = [] ## Optional Client id # client_id = "Telegraf" From 32150e78d36cf5ecb2e0af5a0fbb389a06ed1bc1 Mon Sep 17 00:00:00 2001 From: Anton Belousov <133366121+BelousovAntonV@users.noreply.github.com> Date: Fri, 15 Sep 2023 20:29:58 +0400 Subject: [PATCH 4/7] Update plugins/inputs/kafka_consumer/README.md fix: set default value for option in readme Co-authored-by: Joshua Powers --- plugins/inputs/kafka_consumer/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/inputs/kafka_consumer/README.md b/plugins/inputs/kafka_consumer/README.md index b182cb2107db2..6b5edd348b358 100644 --- a/plugins/inputs/kafka_consumer/README.md +++ b/plugins/inputs/kafka_consumer/README.md @@ -66,7 +66,7 @@ to use them. ## The list of Kafka message headers that should be pass as metric tags ## works only for Kafka version 0.11+, on lower versions the message headers ## are not available - # msg_headers_to_tags = ['Country','City'] + # msg_headers_to_tags = [] ## Optional Client id # client_id = "Telegraf" From 7ca86cdc5c53cd8cd992928f3e3cc19db991b502 Mon Sep 17 00:00:00 2001 From: Anton Belousov <133366121+BelousovAntonV@users.noreply.github.com> Date: Sat, 16 Sep 2023 22:11:16 +0400 Subject: [PATCH 5/7] fix: preallocate the size of the map Co-authored-by: Joshua Powers --- plugins/inputs/kafka_consumer/kafka_consumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index c563aaf9d9037..20d2cce4c7441 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -319,7 +319,7 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error { handler.MaxMessageLen = k.MaxMessageLen handler.TopicTag = k.TopicTag //if message headers list specified, put it as map to handler - MsgHeadersMap := make(map[string]bool) + MsgHeadersMap := make(map[string]bool, len(k.MsgHeadersAsTags)) if len(k.MsgHeadersAsTags) > 0 { for _, header := range k.MsgHeadersAsTags { MsgHeadersMap[header] = true From 668a3e969342462a0545d6335fc0b75b6c0fa349 Mon Sep 17 00:00:00 2001 From: Anton Belousov <133366121+BelousovAntonV@users.noreply.github.com> Date: Tue, 19 Sep 2023 23:02:31 +0400 Subject: [PATCH 6/7] fix: return error if incorrect version specified Co-authored-by: Sven Rebhan <36194019+srebhan@users.noreply.github.com> --- plugins/inputs/kafka_consumer/kafka_consumer.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 20d2cce4c7441..00290ea922468 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -112,10 +112,13 @@ func (k *KafkaConsumer) Init() error { // Kafka version 0.10.2.0 is required for consumer groups. // Try to parse version from config. If can not, set default - if KafkaVersion, err := sarama.ParseKafkaVersion(k.Version); err == nil { - cfg.Version = KafkaVersion - } else { - cfg.Version = sarama.V0_10_2_0 + cfg.Version = sarama.V0_10_2_0 + if k.Version != "" { + version, err := sarama.ParseKafkaVersion(k.Version) + if err != nil { + return fmt.Errorf("invalid version: %w", err) + } + cfg.Version = version } if err := k.SetConfig(cfg, k.Log); err != nil { From 3efead97cc01283c3ded2f1cd740b6d0eeec1c2e Mon Sep 17 00:00:00 2001 From: Anton Belousov <133366121+BelousovAntonV@users.noreply.github.com> Date: Tue, 19 Sep 2023 23:03:07 +0400 Subject: [PATCH 7/7] fix: use unexported local variable Co-authored-by: Sven Rebhan <36194019+srebhan@users.noreply.github.com> --- plugins/inputs/kafka_consumer/kafka_consumer.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index 00290ea922468..23aed960914f8 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -322,13 +322,13 @@ func (k *KafkaConsumer) Start(acc telegraf.Accumulator) error { handler.MaxMessageLen = k.MaxMessageLen handler.TopicTag = k.TopicTag //if message headers list specified, put it as map to handler - MsgHeadersMap := make(map[string]bool, len(k.MsgHeadersAsTags)) + msgHeadersMap := make(map[string]bool, len(k.MsgHeadersAsTags)) if len(k.MsgHeadersAsTags) > 0 { for _, header := range k.MsgHeadersAsTags { - MsgHeadersMap[header] = true + msgHeadersMap[header] = true } } - handler.MsgHeadersToTags = MsgHeadersMap + handler.MsgHeadersToTags = msgHeadersMap // We need to copy allWantedTopics; the Consume() is // long-running and we can easily deadlock if our