Skip to content

Commit

Permalink
[kafka] Expose resolve_canonical_bootstrap_servers_only (#26022)
Browse files Browse the repository at this point in the history
**Description:** The latest release of IBM/sarama includes the
`resolve_canonical_bootstrap_servers_only` setting for
`client.dns.lookup`. In a GSSAPI context, this enables replacing a list
of brokers with a single name that resolves to all of the underlying
IPs. Previously this would fail with a Kerberos error. This PR exposes
the new option in collector config.

**Testing:** Added new cases to check the option is set correctly.

**Documentation:** Added a description of the parameter to the relevant
READMEs.
  • Loading branch information
gebn authored Nov 14, 2023
1 parent 482cc7d commit 6af33d6
Show file tree
Hide file tree
Showing 13 changed files with 115 additions and 8 deletions.
27 changes: 27 additions & 0 deletions .chloggen/kafka-bootstrap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafka

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Expose resolve_canonical_bootstrap_servers_only

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [26022]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
5 changes: 3 additions & 2 deletions exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ that blocks and does not batch messages, therefore it should be used with batch
processors for higher throughput and resiliency. Message payload encoding is configurable.

The following settings are required:
- `protocol_version` (no default): Kafka protocol version e.g. 2.0.0
- `protocol_version` (no default): Kafka protocol version e.g. `2.0.0`.

The following settings can be optionally configured:
- `brokers` (default = localhost:9092): The list of kafka brokers
- `brokers` (default = localhost:9092): The list of kafka brokers.
- `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup.
- `topic` (default = otlp_spans for traces, otlp_metrics for metrics, otlp_logs for logs): The name of the kafka topic to export to.
- `encoding` (default = otlp_proto): The encoding of the traces sent to kafka. All available encodings:
- `otlp_proto`: payload is Protobuf serialized from `ExportTraceServiceRequest` if set as a traces exporter or `ExportMetricsServiceRequest` for metrics or `ExportLogsServiceRequest` for logs.
Expand Down
8 changes: 8 additions & 0 deletions exporter/kafkaexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,16 @@ type Config struct {

// The list of kafka brokers (default localhost:9092)
Brokers []string `mapstructure:"brokers"`

// ResolveCanonicalBootstrapServersOnly makes Sarama do a DNS lookup for
// each of the provided brokers. It will then do a PTR lookup for each
// returned IP, and that set of names becomes the broker list. This can be
// required in SASL environments.
ResolveCanonicalBootstrapServersOnly bool `mapstructure:"resolve_canonical_bootstrap_servers_only"`

// Kafka protocol version
ProtocolVersion string `mapstructure:"protocol_version"`

// The name of the kafka topic to export to (default otlp_spans for traces, otlp_metrics for metrics)
Topic string `mapstructure:"topic"`

Expand Down
46 changes: 46 additions & 0 deletions exporter/kafkaexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,52 @@ func TestLoadConfig(t *testing.T) {
},
},
},
{
id: component.NewIDWithName(metadata.Type, ""),
option: func(conf *Config) {
conf.ResolveCanonicalBootstrapServersOnly = true
},
expected: &Config{
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: 10 * time.Second,
},
RetrySettings: exporterhelper.RetrySettings{
Enabled: true,
InitialInterval: 10 * time.Second,
MaxInterval: 1 * time.Minute,
MaxElapsedTime: 10 * time.Minute,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
},
QueueSettings: exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 2,
QueueSize: 10,
},
Topic: "spans",
Encoding: "otlp_proto",
Brokers: []string{"foo:123", "bar:456"},
ResolveCanonicalBootstrapServersOnly: true,
Authentication: kafka.Authentication{
PlainText: &kafka.PlainTextConfig{
Username: "jdoe",
Password: "pass",
},
},
Metadata: Metadata{
Full: false,
Retry: MetadataRetry{
Max: 15,
Backoff: defaultMetadataRetryBackoff,
},
},
Producer: Producer{
MaxMessageBytes: 10000000,
RequiredAcks: sarama.WaitForAll,
Compression: "none",
},
},
},
}

for _, tt := range tests {
Expand Down
4 changes: 4 additions & 0 deletions exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ func newSaramaProducer(config Config) (sarama.SyncProducer, error) {
c.Producer.MaxMessageBytes = config.Producer.MaxMessageBytes
c.Producer.Flush.MaxMessages = config.Producer.FlushMaxMessages

if config.ResolveCanonicalBootstrapServersOnly {
c.Net.ResolveCanonicalBootstrapServers = true
}

if config.ProtocolVersion != "" {
version, err := sarama.ParseKafkaVersion(config.ProtocolVersion)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions receiver/kafkametricsreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Metrics collected by the associated scraper are listed [here](metadata.yaml)
Optional Settings (with defaults):

- `brokers` (default = localhost:9092): the list of brokers to read from.
- `resolve_canonical_bootstrap_servers_only` (default = false): whether to resolve then reverse-lookup broker IPs during startup.
- `topic_match` (default = ^[^_].*$): regex pattern of topics to filter on metrics collection. The default filter excludes internal topics (starting with `_`).
- `group_match` (default = .*): regex pattern of consumer groups to filter on for metrics.
- `client_id` (default = otel-metrics-receiver): consumer client id
Expand Down
6 changes: 6 additions & 0 deletions receiver/kafkametricsreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ type Config struct {
// The list of kafka brokers (default localhost:9092)
Brokers []string `mapstructure:"brokers"`

// ResolveCanonicalBootstrapServersOnly makes Sarama do a DNS lookup for
// each of the provided brokers. It will then do a PTR lookup for each
// returned IP, and that set of names becomes the broker list. This can be
// required in SASL environments.
ResolveCanonicalBootstrapServersOnly bool `mapstructure:"resolve_canonical_bootstrap_servers_only"`

// ProtocolVersion Kafka protocol version
ProtocolVersion string `mapstructure:"protocol_version"`

Expand Down
3 changes: 3 additions & 0 deletions receiver/kafkametricsreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ var newMetricsReceiver = func(
) (receiver.Metrics, error) {
sc := sarama.NewConfig()
sc.ClientID = config.ClientID
if config.ResolveCanonicalBootstrapServersOnly {
sc.Net.ResolveCanonicalBootstrapServers = true
}
if config.ProtocolVersion != "" {
version, err := sarama.ParseKafkaVersion(config.ProtocolVersion)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions receiver/kafkareceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ The following settings are required:
The following settings can be optionally configured:

- `brokers` (default = localhost:9092): The list of kafka brokers
- `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup
- `topic` (default = otlp_spans): The name of the kafka topic to read from
- `encoding` (default = otlp_proto): The encoding of the payload received from kafka. Available encodings:
- `otlp_proto`: the payload is deserialized to `ExportTraceServiceRequest`, `ExportLogsServiceRequest` or `ExportMetricsServiceRequest` respectively.
Expand Down
5 changes: 5 additions & 0 deletions receiver/kafkareceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ type HeaderExtraction struct {
type Config struct {
// The list of kafka brokers (default localhost:9092)
Brokers []string `mapstructure:"brokers"`
// ResolveCanonicalBootstrapServersOnly makes Sarama do a DNS lookup for
// each of the provided brokers. It will then do a PTR lookup for each
// returned IP, and that set of names becomes the broker list. This can be
// required in SASL environments.
ResolveCanonicalBootstrapServersOnly bool `mapstructure:"resolve_canonical_bootstrap_servers_only"`
// Kafka protocol version
ProtocolVersion string `mapstructure:"protocol_version"`
// The name of the kafka topic to consume from (default "otlp_spans")
Expand Down
13 changes: 7 additions & 6 deletions receiver/kafkareceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ func TestLoadConfig(t *testing.T) {
{
id: component.NewIDWithName(metadata.Type, ""),
expected: &Config{
Topic: "spans",
Encoding: "otlp_proto",
Brokers: []string{"foo:123", "bar:456"},
ClientID: "otel-collector",
GroupID: "otel-collector",
InitialOffset: "latest",
Topic: "spans",
Encoding: "otlp_proto",
Brokers: []string{"foo:123", "bar:456"},
ResolveCanonicalBootstrapServersOnly: true,
ClientID: "otel-collector",
GroupID: "otel-collector",
InitialOffset: "latest",
Authentication: kafka.Authentication{
TLS: &configtls.TLSClientSetting{
TLSSetting: configtls.TLSSetting{
Expand Down
3 changes: 3 additions & 0 deletions receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ func newTracesReceiver(config Config, set receiver.CreateSettings, unmarshalers
} else {
return nil, err
}
if config.ResolveCanonicalBootstrapServersOnly {
c.Net.ResolveCanonicalBootstrapServers = true
}
if config.ProtocolVersion != "" {
version, err := sarama.ParseKafkaVersion(config.ProtocolVersion)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions receiver/kafkareceiver/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ kafka:
brokers:
- "foo:123"
- "bar:456"
resolve_canonical_bootstrap_servers_only: true
client_id: otel-collector
group_id: otel-collector
auth:
Expand Down

0 comments on commit 6af33d6

Please sign in to comment.