From 6af33d6229a6c4b70f45a4a6bdccfb0e1e9a48f5 Mon Sep 17 00:00:00 2001 From: George Brighton Date: Tue, 14 Nov 2023 08:48:34 +0000 Subject: [PATCH] [kafka] Expose resolve_canonical_bootstrap_servers_only (#26022) **Description:** The latest release of IBM/sarama includes the `resolve_canonical_bootstrap_servers_only` setting for `client.dns.lookup`. In a GSSAPI context, this enables replacing a list of brokers with a single name that resolves to all of the underlying IPs. Previously this would fail with a Kerberos error. This PR exposes the new option in collector config. **Testing:** Added new cases to check the option is set correctly. **Documentation:** Added a description of the parameter to the relevant READMEs. --- .chloggen/kafka-bootstrap.yaml | 27 ++++++++++++ exporter/kafkaexporter/README.md | 5 ++- exporter/kafkaexporter/config.go | 8 ++++ exporter/kafkaexporter/config_test.go | 46 +++++++++++++++++++++ exporter/kafkaexporter/kafka_exporter.go | 4 ++ receiver/kafkametricsreceiver/README.md | 1 + receiver/kafkametricsreceiver/config.go | 6 +++ receiver/kafkametricsreceiver/receiver.go | 3 ++ receiver/kafkareceiver/README.md | 1 + receiver/kafkareceiver/config.go | 5 +++ receiver/kafkareceiver/config_test.go | 13 +++--- receiver/kafkareceiver/kafka_receiver.go | 3 ++ receiver/kafkareceiver/testdata/config.yaml | 1 + 13 files changed, 115 insertions(+), 8 deletions(-) create mode 100755 .chloggen/kafka-bootstrap.yaml diff --git a/.chloggen/kafka-bootstrap.yaml b/.chloggen/kafka-bootstrap.yaml new file mode 100755 index 000000000000..2a2d05a81bdb --- /dev/null +++ b/.chloggen/kafka-bootstrap.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: kafka + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Expose resolve_canonical_bootstrap_servers_only + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [26022] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/kafkaexporter/README.md b/exporter/kafkaexporter/README.md index 4e80b5916397..d916a4246327 100644 --- a/exporter/kafkaexporter/README.md +++ b/exporter/kafkaexporter/README.md @@ -21,10 +21,11 @@ that blocks and does not batch messages, therefore it should be used with batch processors for higher throughput and resiliency. Message payload encoding is configurable. The following settings are required: -- `protocol_version` (no default): Kafka protocol version e.g. 2.0.0 +- `protocol_version` (no default): Kafka protocol version e.g. `2.0.0`. The following settings can be optionally configured: -- `brokers` (default = localhost:9092): The list of kafka brokers +- `brokers` (default = localhost:9092): The list of kafka brokers. +- `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup. - `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the kafka topic to export to. - `encoding` (default = otlp_proto): The encoding of the traces sent to kafka. All available encodings: - `otlp_proto`: payload is Protobuf serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs. diff --git a/exporter/kafkaexporter/config.go b/exporter/kafkaexporter/config.go index a4a4c1591e99..186441f33d7c 100644 --- a/exporter/kafkaexporter/config.go +++ b/exporter/kafkaexporter/config.go @@ -22,8 +22,16 @@ type Config struct { // The list of kafka brokers (default localhost:9092) Brokers []string `mapstructure:"brokers"` + + // ResolveCanonicalBootstrapServersOnly makes Sarama do a DNS lookup for + // each of the provided brokers. It will then do a PTR lookup for each + // returned IP, and that set of names becomes the broker list. This can be + // required in SASL environments. + ResolveCanonicalBootstrapServersOnly bool `mapstructure:"resolve_canonical_bootstrap_servers_only"` + // Kafka protocol version ProtocolVersion string `mapstructure:"protocol_version"` + // The name of the kafka topic to export to (default otlp_spans for traces, otlp_metrics for metrics) Topic string `mapstructure:"topic"` diff --git a/exporter/kafkaexporter/config_test.go b/exporter/kafkaexporter/config_test.go index a80383febce9..ac080ae9ee9d 100644 --- a/exporter/kafkaexporter/config_test.go +++ b/exporter/kafkaexporter/config_test.go @@ -135,6 +135,52 @@ func TestLoadConfig(t *testing.T) { }, }, }, + { + id: component.NewIDWithName(metadata.Type, ""), + option: func(conf *Config) { + conf.ResolveCanonicalBootstrapServersOnly = true + }, + expected: &Config{ + TimeoutSettings: exporterhelper.TimeoutSettings{ + Timeout: 10 * time.Second, + }, + RetrySettings: exporterhelper.RetrySettings{ + Enabled: true, + InitialInterval: 10 * time.Second, + MaxInterval: 1 * time.Minute, + MaxElapsedTime: 10 * time.Minute, + RandomizationFactor: backoff.DefaultRandomizationFactor, + Multiplier: backoff.DefaultMultiplier, + }, + QueueSettings: exporterhelper.QueueSettings{ + Enabled: true, + NumConsumers: 2, + QueueSize: 10, + }, + Topic: "spans", + Encoding: "otlp_proto", + Brokers: []string{"foo:123", "bar:456"}, + ResolveCanonicalBootstrapServersOnly: true, + Authentication: kafka.Authentication{ + PlainText: &kafka.PlainTextConfig{ + Username: "jdoe", + Password: "pass", + }, + }, + Metadata: Metadata{ + Full: false, + Retry: MetadataRetry{ + Max: 15, + Backoff: defaultMetadataRetryBackoff, + }, + }, + Producer: Producer{ + MaxMessageBytes: 10000000, + RequiredAcks: sarama.WaitForAll, + Compression: "none", + }, + }, + }, } for _, tt := range tests { diff --git a/exporter/kafkaexporter/kafka_exporter.go b/exporter/kafkaexporter/kafka_exporter.go index 2c3670554c97..52ef71c48be3 100644 --- a/exporter/kafkaexporter/kafka_exporter.go +++ b/exporter/kafkaexporter/kafka_exporter.go @@ -134,6 +134,10 @@ func newSaramaProducer(config Config) (sarama.SyncProducer, error) { c.Producer.MaxMessageBytes = config.Producer.MaxMessageBytes c.Producer.Flush.MaxMessages = config.Producer.FlushMaxMessages + if config.ResolveCanonicalBootstrapServersOnly { + c.Net.ResolveCanonicalBootstrapServers = true + } + if config.ProtocolVersion != "" { version, err := sarama.ParseKafkaVersion(config.ProtocolVersion) if err != nil { diff --git a/receiver/kafkametricsreceiver/README.md b/receiver/kafkametricsreceiver/README.md index d5d41e22cf6b..aff3c534e6a8 100644 --- a/receiver/kafkametricsreceiver/README.md +++ b/receiver/kafkametricsreceiver/README.md @@ -32,6 +32,7 @@ Metrics collected by the associated scraper are listed [here](metadata.yaml) Optional Settings (with defaults): - `brokers` (default = localhost:9092): the list of brokers to read from. +- `resolve_canonical_bootstrap_servers_only` (default = false): whether to resolve then reverse-lookup broker IPs during startup. - `topic_match` (default = ^[^_].*$): regex pattern of topics to filter on metrics collection. The default filter excludes internal topics (starting with `_`). - `group_match` (default = .*): regex pattern of consumer groups to filter on for metrics. - `client_id` (default = otel-metrics-receiver): consumer client id diff --git a/receiver/kafkametricsreceiver/config.go b/receiver/kafkametricsreceiver/config.go index 5623d392329f..a63e0fc119a6 100644 --- a/receiver/kafkametricsreceiver/config.go +++ b/receiver/kafkametricsreceiver/config.go @@ -17,6 +17,12 @@ type Config struct { // The list of kafka brokers (default localhost:9092) Brokers []string `mapstructure:"brokers"` + // ResolveCanonicalBootstrapServersOnly makes Sarama do a DNS lookup for + // each of the provided brokers. It will then do a PTR lookup for each + // returned IP, and that set of names becomes the broker list. This can be + // required in SASL environments. + ResolveCanonicalBootstrapServersOnly bool `mapstructure:"resolve_canonical_bootstrap_servers_only"` + // ProtocolVersion Kafka protocol version ProtocolVersion string `mapstructure:"protocol_version"` diff --git a/receiver/kafkametricsreceiver/receiver.go b/receiver/kafkametricsreceiver/receiver.go index 1de4c19be483..499beac9d1f5 100644 --- a/receiver/kafkametricsreceiver/receiver.go +++ b/receiver/kafkametricsreceiver/receiver.go @@ -39,6 +39,9 @@ var newMetricsReceiver = func( ) (receiver.Metrics, error) { sc := sarama.NewConfig() sc.ClientID = config.ClientID + if config.ResolveCanonicalBootstrapServersOnly { + sc.Net.ResolveCanonicalBootstrapServers = true + } if config.ProtocolVersion != "" { version, err := sarama.ParseKafkaVersion(config.ProtocolVersion) if err != nil { diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index 9ca33533ba87..bd970d5caaf1 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -30,6 +30,7 @@ The following settings are required: The following settings can be optionally configured: - `brokers` (default = localhost:9092): The list of kafka brokers +- `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup - `topic` (default = otlp_spans): The name of the kafka topic to read from - `encoding` (default = otlp_proto): The encoding of the payload received from kafka. Available encodings: - `otlp_proto`: the payload is deserialized to `ExportTraceServiceRequest`, `ExportLogsServiceRequest` or `ExportMetricsServiceRequest` respectively. diff --git a/receiver/kafkareceiver/config.go b/receiver/kafkareceiver/config.go index 185e4010d85c..fce6c0641254 100644 --- a/receiver/kafkareceiver/config.go +++ b/receiver/kafkareceiver/config.go @@ -41,6 +41,11 @@ type HeaderExtraction struct { type Config struct { // The list of kafka brokers (default localhost:9092) Brokers []string `mapstructure:"brokers"` + // ResolveCanonicalBootstrapServersOnly makes Sarama do a DNS lookup for + // each of the provided brokers. It will then do a PTR lookup for each + // returned IP, and that set of names becomes the broker list. This can be + // required in SASL environments. + ResolveCanonicalBootstrapServersOnly bool `mapstructure:"resolve_canonical_bootstrap_servers_only"` // Kafka protocol version ProtocolVersion string `mapstructure:"protocol_version"` // The name of the kafka topic to consume from (default "otlp_spans") diff --git a/receiver/kafkareceiver/config_test.go b/receiver/kafkareceiver/config_test.go index b97b093b59c5..c59f70d92e74 100644 --- a/receiver/kafkareceiver/config_test.go +++ b/receiver/kafkareceiver/config_test.go @@ -33,12 +33,13 @@ func TestLoadConfig(t *testing.T) { { id: component.NewIDWithName(metadata.Type, ""), expected: &Config{ - Topic: "spans", - Encoding: "otlp_proto", - Brokers: []string{"foo:123", "bar:456"}, - ClientID: "otel-collector", - GroupID: "otel-collector", - InitialOffset: "latest", + Topic: "spans", + Encoding: "otlp_proto", + Brokers: []string{"foo:123", "bar:456"}, + ResolveCanonicalBootstrapServersOnly: true, + ClientID: "otel-collector", + GroupID: "otel-collector", + InitialOffset: "latest", Authentication: kafka.Authentication{ TLS: &configtls.TLSClientSetting{ TLSSetting: configtls.TLSSetting{ diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index aa0ad617f557..5f10d5478783 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -98,6 +98,9 @@ func newTracesReceiver(config Config, set receiver.CreateSettings, unmarshalers } else { return nil, err } + if config.ResolveCanonicalBootstrapServersOnly { + c.Net.ResolveCanonicalBootstrapServers = true + } if config.ProtocolVersion != "" { version, err := sarama.ParseKafkaVersion(config.ProtocolVersion) if err != nil { diff --git a/receiver/kafkareceiver/testdata/config.yaml b/receiver/kafkareceiver/testdata/config.yaml index f4b99b780391..3cddc9dc1652 100644 --- a/receiver/kafkareceiver/testdata/config.yaml +++ b/receiver/kafkareceiver/testdata/config.yaml @@ -3,6 +3,7 @@ kafka: brokers: - "foo:123" - "bar:456" + resolve_canonical_bootstrap_servers_only: true client_id: otel-collector group_id: otel-collector auth: