Skip to content

Commit

Permalink
Use config.ComponentID for Receivers config (open-telemetry#2870)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Apr 30, 2021
1 parent 24e90ed commit 5f95c6c
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 24 deletions.
2 changes: 1 addition & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

// Config defines configuration for Kafka receiver.
type Config struct {
config.ReceiverSettings `mapstructure:",squash"`
config.ReceiverSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct
// The list of kafka brokers (default localhost:9092)
Brokers []string `mapstructure:"brokers"`
// Kafka protocol version
Expand Down
17 changes: 7 additions & 10 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,14 @@ func TestLoadConfig(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, len(cfg.Receivers))

r := cfg.Receivers[typeStr].(*Config)
r := cfg.Receivers[config.NewID(typeStr)].(*Config)
assert.Equal(t, &Config{
ReceiverSettings: config.ReceiverSettings{
NameVal: typeStr,
TypeVal: typeStr,
},
Topic: "spans",
Encoding: "otlp_proto",
Brokers: []string{"foo:123", "bar:456"},
ClientID: "otel-collector",
GroupID: "otel-collector",
ReceiverSettings: config.NewReceiverSettings(config.NewID(typeStr)),
Topic: "spans",
Encoding: "otlp_proto",
Brokers: []string{"foo:123", "bar:456"},
ClientID: "otel-collector",
GroupID: "otel-collector",
Authentication: kafkaexporter.Authentication{
TLS: &configtls.TLSClientSetting{
TLSSetting: configtls.TLSSetting{
Expand Down
20 changes: 9 additions & 11 deletions factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import (
)

const (
typeStr = "kafka"
typeStr = "kafka"

defaultTopic = "otlp_spans"
defaultEncoding = "otlp_proto"
defaultBroker = "localhost:9092"
Expand All @@ -41,7 +42,7 @@ const (
defaultMetadataFull = true
)

// FactoryOption applies changes to kafkaReceiverFactory.
// FactoryOption applies changes to kafkaExporterFactory.
type FactoryOption func(factory *kafkaReceiverFactory)

// WithTracesUnmarshalers adds Unmarshalers.
Expand Down Expand Up @@ -81,15 +82,12 @@ func NewFactory(options ...FactoryOption) component.ReceiverFactory {

func createDefaultConfig() config.Receiver {
return &Config{
ReceiverSettings: config.ReceiverSettings{
TypeVal: typeStr,
NameVal: typeStr,
},
Topic: defaultTopic,
Encoding: defaultEncoding,
Brokers: []string{defaultBroker},
ClientID: defaultClientID,
GroupID: defaultGroupID,
ReceiverSettings: config.NewReceiverSettings(config.NewID(typeStr)),
Topic: defaultTopic,
Encoding: defaultEncoding,
Brokers: []string{defaultBroker},
ClientID: defaultClientID,
GroupID: defaultGroupID,
Metadata: kafkaexporter.Metadata{
Full: defaultMetadataFull,
Retry: kafkaexporter.MetadataRetry{
Expand Down
4 changes: 2 additions & 2 deletions kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func newTracesReceiver(config Config, params component.ReceiverCreateParams, unm
return nil, err
}
return &kafkaTracesConsumer{
name: config.Name(),
name: config.ID().String(),
consumerGroup: client,
topics: []string{config.Topic},
nextConsumer: nextConsumer,
Expand Down Expand Up @@ -160,7 +160,7 @@ func newLogsReceiver(config Config, params component.ReceiverCreateParams, unmar
return nil, err
}
return &kafkaLogsConsumer{
name: config.Name(),
name: config.ID().String(),
consumerGroup: client,
topics: []string{config.Topic},
nextConsumer: nextConsumer,
Expand Down

0 comments on commit 5f95c6c

Please sign in to comment.