diff --git a/config.go b/config.go index ec8f368ea37f..a71e668c5ac3 100644 --- a/config.go +++ b/config.go @@ -21,7 +21,7 @@ import ( // Config defines configuration for Kafka receiver. type Config struct { - config.ReceiverSettings `mapstructure:",squash"` + config.ReceiverSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct // The list of kafka brokers (default localhost:9092) Brokers []string `mapstructure:"brokers"` // Kafka protocol version diff --git a/config_test.go b/config_test.go index 4d09dca56253..436e533d329f 100644 --- a/config_test.go +++ b/config_test.go @@ -39,17 +39,14 @@ func TestLoadConfig(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, len(cfg.Receivers)) - r := cfg.Receivers[typeStr].(*Config) + r := cfg.Receivers[config.NewID(typeStr)].(*Config) assert.Equal(t, &Config{ - ReceiverSettings: config.ReceiverSettings{ - NameVal: typeStr, - TypeVal: typeStr, - }, - Topic: "spans", - Encoding: "otlp_proto", - Brokers: []string{"foo:123", "bar:456"}, - ClientID: "otel-collector", - GroupID: "otel-collector", + ReceiverSettings: config.NewReceiverSettings(config.NewID(typeStr)), + Topic: "spans", + Encoding: "otlp_proto", + Brokers: []string{"foo:123", "bar:456"}, + ClientID: "otel-collector", + GroupID: "otel-collector", Authentication: kafkaexporter.Authentication{ TLS: &configtls.TLSClientSetting{ TLSSetting: configtls.TLSSetting{ diff --git a/factory.go b/factory.go index 684a9d85551a..a9ab9b6ca0ca 100644 --- a/factory.go +++ b/factory.go @@ -26,7 +26,8 @@ import ( ) const ( - typeStr = "kafka" + typeStr = "kafka" + defaultTopic = "otlp_spans" defaultEncoding = "otlp_proto" defaultBroker = "localhost:9092" @@ -41,7 +42,7 @@ const ( defaultMetadataFull = true ) -// FactoryOption applies changes to kafkaReceiverFactory. +// FactoryOption applies changes to kafkaExporterFactory. type FactoryOption func(factory *kafkaReceiverFactory) // WithTracesUnmarshalers adds Unmarshalers. @@ -81,15 +82,12 @@ func NewFactory(options ...FactoryOption) component.ReceiverFactory { func createDefaultConfig() config.Receiver { return &Config{ - ReceiverSettings: config.ReceiverSettings{ - TypeVal: typeStr, - NameVal: typeStr, - }, - Topic: defaultTopic, - Encoding: defaultEncoding, - Brokers: []string{defaultBroker}, - ClientID: defaultClientID, - GroupID: defaultGroupID, + ReceiverSettings: config.NewReceiverSettings(config.NewID(typeStr)), + Topic: defaultTopic, + Encoding: defaultEncoding, + Brokers: []string{defaultBroker}, + ClientID: defaultClientID, + GroupID: defaultGroupID, Metadata: kafkaexporter.Metadata{ Full: defaultMetadataFull, Retry: kafkaexporter.MetadataRetry{ diff --git a/kafka_receiver.go b/kafka_receiver.go index de473c0f652a..59a087c35e76 100644 --- a/kafka_receiver.go +++ b/kafka_receiver.go @@ -89,7 +89,7 @@ func newTracesReceiver(config Config, params component.ReceiverCreateParams, unm return nil, err } return &kafkaTracesConsumer{ - name: config.Name(), + name: config.ID().String(), consumerGroup: client, topics: []string{config.Topic}, nextConsumer: nextConsumer, @@ -160,7 +160,7 @@ func newLogsReceiver(config Config, params component.ReceiverCreateParams, unmar return nil, err } return &kafkaLogsConsumer{ - name: config.Name(), + name: config.ID().String(), consumerGroup: client, topics: []string{config.Topic}, nextConsumer: nextConsumer,