diff --git a/.chloggen/kafka-exporter-aws-iam-oauth-bearer.yaml b/.chloggen/kafka-exporter-aws-iam-oauth-bearer.yaml new file mode 100644 index 000000000000..43cb89e7fbdd --- /dev/null +++ b/.chloggen/kafka-exporter-aws-iam-oauth-bearer.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: kafkaexporter, kafkareceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add a new mechanism "AWS_MSK_IAM_OAUTHBEARER" for kafka exporter and kafka receiver. This mechanism use the AWS MSK IAM SASL Signer for Go https://github.com/aws/aws-msk-iam-sasl-signer-go. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [19747] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/kafkaexporter/README.md b/exporter/kafkaexporter/README.md index 1ae82a924204..1fcb128a8a52 100644 --- a/exporter/kafkaexporter/README.md +++ b/exporter/kafkaexporter/README.md @@ -46,9 +46,9 @@ The following settings can be optionally configured: - `sasl` - `username`: The username to use. - `password`: The password to use - - `mechanism`: The SASL mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512, AWS_MSK_IAM or PLAIN) + - `mechanism`: The SASL mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512, AWS_MSK_IAM, AWS_MSK_IAM_OAUTHBEARER or PLAIN) - `version` (default = 0): The SASL protocol version to use (0 or 1) - - `aws_msk.region`: AWS Region in case of AWS_MSK_IAM mechanism + - `aws_msk.region`: AWS Region in case of AWS_MSK_IAM or AWS_MSK_IAM_OAUTHBEARER mechanism - `aws_msk.broker_addr`: MSK Broker address in case of AWS_MSK_IAM mechanism - `tls`: see [TLS Configuration Settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md) for the full set of available options. - `ca_file`: path to the CA cert. For a client this verifies the server certificate. Should diff --git a/exporter/kafkaexporter/config.go b/exporter/kafkaexporter/config.go index 09968dd6a7b7..aac2cf77c097 100644 --- a/exporter/kafkaexporter/config.go +++ b/exporter/kafkaexporter/config.go @@ -144,10 +144,10 @@ func validateSASLConfig(c *kafka.SASLConfig) error { } switch c.Mechanism { - case "PLAIN", "AWS_MSK_IAM", "SCRAM-SHA-256", "SCRAM-SHA-512": + case "PLAIN", "AWS_MSK_IAM", "AWS_MSK_IAM_OAUTHBEARER", "SCRAM-SHA-256", "SCRAM-SHA-512": // Do nothing, valid mechanism default: - return fmt.Errorf("auth.sasl.mechanism should be one of 'PLAIN', 'AWS_MSK_IAM', 'SCRAM-SHA-256' or 'SCRAM-SHA-512'. configured value %v", c.Mechanism) + return fmt.Errorf("auth.sasl.mechanism should be one of 'PLAIN', 'AWS_MSK_IAM', 'AWS_MSK_IAM_OAUTHBEARER', 'SCRAM-SHA-256' or 'SCRAM-SHA-512'. configured value %v", c.Mechanism) } if c.Version < 0 || c.Version > 1 { diff --git a/exporter/kafkaexporter/config_test.go b/exporter/kafkaexporter/config_test.go index d019a9dab62b..3264e996cbe5 100644 --- a/exporter/kafkaexporter/config_test.go +++ b/exporter/kafkaexporter/config_test.go @@ -272,7 +272,7 @@ func TestValidate_sasl_mechanism(t *testing.T) { } err := config.Validate() - assert.EqualError(t, err, "auth.sasl.mechanism should be one of 'PLAIN', 'AWS_MSK_IAM', 'SCRAM-SHA-256' or 'SCRAM-SHA-512'. configured value FAKE") + assert.EqualError(t, err, "auth.sasl.mechanism should be one of 'PLAIN', 'AWS_MSK_IAM', 'AWS_MSK_IAM_OAUTHBEARER', 'SCRAM-SHA-256' or 'SCRAM-SHA-512'. configured value FAKE") } func TestValidate_sasl_version(t *testing.T) { diff --git a/exporter/kafkaexporter/go.mod b/exporter/kafkaexporter/go.mod index 18687942cf34..3f485b88bdd2 100644 --- a/exporter/kafkaexporter/go.mod +++ b/exporter/kafkaexporter/go.mod @@ -35,7 +35,20 @@ require ( require ( github.com/apache/thrift v0.21.0 // indirect + github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0 // indirect github.com/aws/aws-sdk-go v1.55.5 // indirect + github.com/aws/aws-sdk-go-v2 v1.19.0 // indirect + github.com/aws/aws-sdk-go-v2/config v1.18.28 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.13.27 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.5 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.35 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.29 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.3.36 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.29 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.12.13 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.13 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.19.3 // indirect + github.com/aws/smithy-go v1.13.5 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/eapache/go-resiliency v1.7.0 // indirect diff --git a/exporter/kafkaexporter/go.sum b/exporter/kafkaexporter/go.sum index 9e6dc75cbefa..a95ccacbcdf6 100644 --- a/exporter/kafkaexporter/go.sum +++ b/exporter/kafkaexporter/go.sum @@ -2,8 +2,34 @@ github.com/IBM/sarama v1.43.3 h1:Yj6L2IaNvb2mRBop39N7mmJAHBVY3dTPncr3qGVkxPA= github.com/IBM/sarama v1.43.3/go.mod h1:FVIRaLrhK3Cla/9FfRF5X9Zua2KpS3SYIXxhac1H+FQ= github.com/apache/thrift v0.21.0 h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE= github.com/apache/thrift v0.21.0/go.mod h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw= +github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0 h1:UyjtGmO0Uwl/K+zpzPwLoXzMhcN9xmnR2nrqJoBrg3c= +github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0/go.mod h1:TJAXuFs2HcMib3sN5L0gUC+Q01Qvy3DemvA55WuC+iA= github.com/aws/aws-sdk-go v1.55.5 h1:KKUZBfBoyqy5d3swXyiC7Q76ic40rYcbqH7qjh59kzU= github.com/aws/aws-sdk-go v1.55.5/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU= +github.com/aws/aws-sdk-go-v2 v1.19.0 h1:klAT+y3pGFBU/qVf1uzwttpBbiuozJYWzNLHioyDJ+k= +github.com/aws/aws-sdk-go-v2 v1.19.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= +github.com/aws/aws-sdk-go-v2/config v1.18.28 h1:TINEaKyh1Td64tqFvn09iYpKiWjmHYrG1fa91q2gnqw= +github.com/aws/aws-sdk-go-v2/config v1.18.28/go.mod h1:nIL+4/8JdAuNHEjn/gPEXqtnS02Q3NXB/9Z7o5xE4+A= +github.com/aws/aws-sdk-go-v2/credentials v1.13.27 h1:dz0yr/yR1jweAnsCx+BmjerUILVPQ6FS5AwF/OyG1kA= +github.com/aws/aws-sdk-go-v2/credentials v1.13.27/go.mod h1:syOqAek45ZXZp29HlnRS/BNgMIW6uiRmeuQsz4Qh2UE= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.5 h1:kP3Me6Fy3vdi+9uHd7YLr6ewPxRL+PU6y15urfTaamU= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.5/go.mod h1:Gj7tm95r+QsDoN2Fhuz/3npQvcZbkEf5mL70n3Xfluc= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.35 h1:hMUCiE3Zi5AHrRNGf5j985u0WyqI6r2NULhUfo0N/No= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.35/go.mod h1:ipR5PvpSPqIqL5Mi82BxLnfMkHVbmco8kUwO2xrCi0M= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.29 h1:yOpYx+FTBdpk/g+sBU6Cb1H0U/TLEcYYp66mYqsPpcc= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.29/go.mod h1:M/eUABlDbw2uVrdAn+UsI6M727qp2fxkp8K0ejcBDUY= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.36 h1:8r5m1BoAWkn0TDC34lUculryf7nUF25EgIMdjvGCkgo= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.36/go.mod h1:Rmw2M1hMVTwiUhjwMoIBFWFJMhvJbct06sSidxInkhY= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.29 h1:IiDolu/eLmuB18DRZibj77n1hHQT7z12jnGO7Ze3pLc= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.29/go.mod h1:fDbkK4o7fpPXWn8YAPmTieAMuB9mk/VgvW64uaUqxd4= +github.com/aws/aws-sdk-go-v2/service/sso v1.12.13 h1:sWDv7cMITPcZ21QdreULwxOOAmE05JjEsT6fCDtDA9k= +github.com/aws/aws-sdk-go-v2/service/sso v1.12.13/go.mod h1:DfX0sWuT46KpcqbMhJ9QWtxAIP1VozkDWf8VAkByjYY= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.13 h1:BFubHS/xN5bjl818QaroN6mQdjneYQ+AOx44KNXlyH4= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.13/go.mod h1:BzqsVVFduubEmzrVtUFQQIQdFqvUItF8XUq2EnS8Wog= +github.com/aws/aws-sdk-go-v2/service/sts v1.19.3 h1:e5mnydVdCVWxP+5rPAGi2PYxC7u2OZgH1ypC114H04U= +github.com/aws/aws-sdk-go-v2/service/sts v1.19.3/go.mod h1:yVGZA1CPkmUhBdA039jXNJJG7/6t+G+EBWmFq23xqnY= +github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8= +github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -33,6 +59,7 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= diff --git a/exporter/kafkaexporter/kafka_exporter.go b/exporter/kafkaexporter/kafka_exporter.go index fe3130cecc9c..6720237fabe2 100644 --- a/exporter/kafkaexporter/kafka_exporter.go +++ b/exporter/kafkaexporter/kafka_exporter.go @@ -66,7 +66,7 @@ func (e *kafkaTracesProducer) Close(context.Context) error { return e.producer.Close() } -func (e *kafkaTracesProducer) start(_ context.Context, host component.Host) error { +func (e *kafkaTracesProducer) start(ctx context.Context, host component.Host) error { // extensions take precedence over internal encodings if marshaler, errExt := loadEncodingExtension[ptrace.Marshaler]( host, @@ -83,7 +83,7 @@ func (e *kafkaTracesProducer) start(_ context.Context, host component.Host) erro if e.marshaler == nil { return errUnrecognizedEncoding } - producer, err := newSaramaProducer(e.cfg) + producer, err := newSaramaProducer(ctx, e.cfg) if err != nil { return err } @@ -124,7 +124,7 @@ func (e *kafkaMetricsProducer) Close(context.Context) error { return e.producer.Close() } -func (e *kafkaMetricsProducer) start(_ context.Context, host component.Host) error { +func (e *kafkaMetricsProducer) start(ctx context.Context, host component.Host) error { // extensions take precedence over internal encodings if marshaler, errExt := loadEncodingExtension[pmetric.Marshaler]( host, @@ -141,7 +141,7 @@ func (e *kafkaMetricsProducer) start(_ context.Context, host component.Host) err if e.marshaler == nil { return errUnrecognizedEncoding } - producer, err := newSaramaProducer(e.cfg) + producer, err := newSaramaProducer(ctx, e.cfg) if err != nil { return err } @@ -182,7 +182,7 @@ func (e *kafkaLogsProducer) Close(context.Context) error { return e.producer.Close() } -func (e *kafkaLogsProducer) start(_ context.Context, host component.Host) error { +func (e *kafkaLogsProducer) start(ctx context.Context, host component.Host) error { // extensions take precedence over internal encodings if marshaler, errExt := loadEncodingExtension[plog.Marshaler]( host, @@ -199,7 +199,7 @@ func (e *kafkaLogsProducer) start(_ context.Context, host component.Host) error if e.marshaler == nil { return errUnrecognizedEncoding } - producer, err := newSaramaProducer(e.cfg) + producer, err := newSaramaProducer(ctx, e.cfg) if err != nil { return err } @@ -207,7 +207,7 @@ func (e *kafkaLogsProducer) start(_ context.Context, host component.Host) error return nil } -func newSaramaProducer(config Config) (sarama.SyncProducer, error) { +func newSaramaProducer(ctx context.Context, config Config) (sarama.SyncProducer, error) { c := sarama.NewConfig() c.ClientID = config.ClientID @@ -236,7 +236,7 @@ func newSaramaProducer(config Config) (sarama.SyncProducer, error) { c.Version = version } - if err := kafka.ConfigureAuthentication(config.Authentication, c); err != nil { + if err := kafka.ConfigureAuthentication(ctx, config.Authentication, c); err != nil { return nil, err } diff --git a/internal/kafka/authentication.go b/internal/kafka/authentication.go index 491f02985a65..ca6ce4c3519e 100644 --- a/internal/kafka/authentication.go +++ b/internal/kafka/authentication.go @@ -7,9 +7,11 @@ import ( "context" "crypto/sha256" "crypto/sha512" + "crypto/tls" "fmt" "github.com/IBM/sarama" + "github.com/aws/aws-msk-iam-sasl-signer-go/signer" "go.opentelemetry.io/collector/config/configtls" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/awsmsk" @@ -35,7 +37,7 @@ type SASLConfig struct { Username string `mapstructure:"username"` // Password to be used on authentication Password string `mapstructure:"password"` - // SASL Mechanism to be used, possible values are: (PLAIN, AWS_MSK_IAM, SCRAM-SHA-256 or SCRAM-SHA-512). + // SASL Mechanism to be used, possible values are: (PLAIN, AWS_MSK_IAM, AWS_MSK_IAM_OAUTHBEARER, SCRAM-SHA-256 or SCRAM-SHA-512). Mechanism string `mapstructure:"mechanism"` // SASL Protocol Version to be used, possible values are: (0, 1). Defaults to 0. Version int `mapstructure:"version"` @@ -44,12 +46,21 @@ type SASLConfig struct { } // AWSMSKConfig defines the additional SASL authentication -// measures needed to use AWS_MSK_IAM mechanism +// measures needed to use AWS_MSK_IAM and AWS_MSK_IAM_OAUTHBEARER mechanism type AWSMSKConfig struct { // Region is the AWS region the MSK cluster is based in Region string `mapstructure:"region"` // BrokerAddr is the client is connecting to in order to perform the auth required BrokerAddr string `mapstructure:"broker_addr"` + // Context + ctx context.Context +} + +// Token return the AWS session token for the AWS_MSK_IAM_OAUTHBEARER mechanism +func (c *AWSMSKConfig) Token() (*sarama.AccessToken, error) { + token, _, err := signer.GenerateAuthToken(c.ctx, c.Region) + + return &sarama.AccessToken{Token: token}, err } // KerberosConfig defines kerberos configuration. @@ -65,7 +76,7 @@ type KerberosConfig struct { } // ConfigureAuthentication configures authentication in sarama.Config. -func ConfigureAuthentication(config Authentication, saramaConfig *sarama.Config) error { +func ConfigureAuthentication(ctx context.Context, config Authentication, saramaConfig *sarama.Config) error { if config.PlainText != nil { configurePlaintext(*config.PlainText, saramaConfig) } @@ -75,7 +86,7 @@ func ConfigureAuthentication(config Authentication, saramaConfig *sarama.Config) } } if config.SASL != nil { - if err := configureSASL(*config.SASL, saramaConfig); err != nil { + if err := configureSASL(ctx, *config.SASL, saramaConfig); err != nil { return err } } @@ -92,12 +103,12 @@ func configurePlaintext(config PlainTextConfig, saramaConfig *sarama.Config) { saramaConfig.Net.SASL.Password = config.Password } -func configureSASL(config SASLConfig, saramaConfig *sarama.Config) error { - if config.Username == "" { +func configureSASL(ctx context.Context, config SASLConfig, saramaConfig *sarama.Config) error { + if config.Username == "" && config.Mechanism != "AWS_MSK_IAM_OAUTHBEARER" { return fmt.Errorf("username have to be provided") } - if config.Password == "" { + if config.Password == "" && config.Mechanism != "AWS_MSK_IAM_OAUTHBEARER" { return fmt.Errorf("password have to be provided") } @@ -119,8 +130,15 @@ func configureSASL(config SASLConfig, saramaConfig *sarama.Config) error { return awsmsk.NewIAMSASLClient(config.AWSMSK.BrokerAddr, config.AWSMSK.Region, saramaConfig.ClientID) } saramaConfig.Net.SASL.Mechanism = awsmsk.Mechanism + case "AWS_MSK_IAM_OAUTHBEARER": + config.AWSMSK.ctx = ctx + saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth + saramaConfig.Net.SASL.TokenProvider = &config.AWSMSK + tlsConfig := tls.Config{} + saramaConfig.Net.TLS.Enable = true + saramaConfig.Net.TLS.Config = &tlsConfig default: - return fmt.Errorf(`invalid SASL Mechanism %q: can be either "PLAIN", "AWS_MSK_IAM", "SCRAM-SHA-256" or "SCRAM-SHA-512"`, config.Mechanism) + return fmt.Errorf(`invalid SASL Mechanism %q: can be either "PLAIN", "AWS_MSK_IAM", "AWS_MSK_IAM_OAUTHBEARER", "SCRAM-SHA-256" or "SCRAM-SHA-512"`, config.Mechanism) } switch config.Version { diff --git a/internal/kafka/authentication_test.go b/internal/kafka/authentication_test.go index 1f797e3019b7..47cbc0989684 100644 --- a/internal/kafka/authentication_test.go +++ b/internal/kafka/authentication_test.go @@ -5,6 +5,7 @@ package kafka import ( "context" + "crypto/tls" "testing" "github.com/IBM/sarama" @@ -51,6 +52,16 @@ func TestAuthentication(t *testing.T) { require.NoError(t, err) saramaTLSCfg.Net.TLS.Config = tlscfg + ctx := context.Background() + saramaSASLAWSIAMOATUHConfig := &sarama.Config{} + saramaSASLAWSIAMOATUHConfig.Net.SASL.Enable = true + saramaSASLAWSIAMOATUHConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth + saramaSASLAWSIAMOATUHConfig.Net.SASL.TokenProvider = &AWSMSKConfig{Region: "region", ctx: ctx} + + tlsConfig := tls.Config{} + saramaSASLAWSIAMOATUHConfig.Net.TLS.Enable = true + saramaSASLAWSIAMOATUHConfig.Net.TLS.Config = &tlsConfig + saramaKerberosCfg := &sarama.Config{} saramaKerberosCfg.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI saramaKerberosCfg.Net.SASL.Enable = true @@ -129,6 +140,10 @@ func TestAuthentication(t *testing.T) { auth: Authentication{SASL: &SASLConfig{Username: "jdoe", Password: "pass", Mechanism: "PLAIN"}}, saramaConfig: saramaSASLPLAINConfig, }, + { + auth: Authentication{SASL: &SASLConfig{Username: "", Password: "", Mechanism: "AWS_MSK_IAM_OAUTHBEARER", AWSMSK: AWSMSKConfig{Region: "region"}}}, + saramaConfig: saramaSASLAWSIAMOATUHConfig, + }, { auth: Authentication{SASL: &SASLConfig{Username: "jdoe", Password: "pass", Mechanism: "SCRAM-SHA-222"}}, saramaConfig: saramaSASLSCRAM512Config, @@ -153,7 +168,7 @@ func TestAuthentication(t *testing.T) { for _, test := range tests { t.Run("", func(t *testing.T) { config := &sarama.Config{} - err := ConfigureAuthentication(test.auth, config) + err := ConfigureAuthentication(context.Background(), test.auth, config) if test.err != "" { assert.ErrorContains(t, err, test.err) } else { diff --git a/internal/kafka/go.mod b/internal/kafka/go.mod index 971d4d75ec98..0fe9e25f0c89 100644 --- a/internal/kafka/go.mod +++ b/internal/kafka/go.mod @@ -4,6 +4,7 @@ go 1.22.0 require ( github.com/IBM/sarama v1.43.3 + github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0 github.com/aws/aws-sdk-go v1.55.5 github.com/stretchr/testify v1.10.0 github.com/xdg-go/scram v1.1.2 @@ -13,6 +14,18 @@ require ( ) require ( + github.com/aws/aws-sdk-go-v2 v1.19.0 // indirect + github.com/aws/aws-sdk-go-v2/config v1.18.28 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.13.27 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.5 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.35 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.29 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.3.36 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.29 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.12.13 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.13 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.19.3 // indirect + github.com/aws/smithy-go v1.13.5 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/eapache/go-resiliency v1.7.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect diff --git a/internal/kafka/go.sum b/internal/kafka/go.sum index 4f90b773c18a..cd602a5daf8e 100644 --- a/internal/kafka/go.sum +++ b/internal/kafka/go.sum @@ -1,7 +1,33 @@ github.com/IBM/sarama v1.43.3 h1:Yj6L2IaNvb2mRBop39N7mmJAHBVY3dTPncr3qGVkxPA= github.com/IBM/sarama v1.43.3/go.mod h1:FVIRaLrhK3Cla/9FfRF5X9Zua2KpS3SYIXxhac1H+FQ= +github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0 h1:UyjtGmO0Uwl/K+zpzPwLoXzMhcN9xmnR2nrqJoBrg3c= +github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0/go.mod h1:TJAXuFs2HcMib3sN5L0gUC+Q01Qvy3DemvA55WuC+iA= github.com/aws/aws-sdk-go v1.55.5 h1:KKUZBfBoyqy5d3swXyiC7Q76ic40rYcbqH7qjh59kzU= github.com/aws/aws-sdk-go v1.55.5/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU= +github.com/aws/aws-sdk-go-v2 v1.19.0 h1:klAT+y3pGFBU/qVf1uzwttpBbiuozJYWzNLHioyDJ+k= +github.com/aws/aws-sdk-go-v2 v1.19.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= +github.com/aws/aws-sdk-go-v2/config v1.18.28 h1:TINEaKyh1Td64tqFvn09iYpKiWjmHYrG1fa91q2gnqw= +github.com/aws/aws-sdk-go-v2/config v1.18.28/go.mod h1:nIL+4/8JdAuNHEjn/gPEXqtnS02Q3NXB/9Z7o5xE4+A= +github.com/aws/aws-sdk-go-v2/credentials v1.13.27 h1:dz0yr/yR1jweAnsCx+BmjerUILVPQ6FS5AwF/OyG1kA= +github.com/aws/aws-sdk-go-v2/credentials v1.13.27/go.mod h1:syOqAek45ZXZp29HlnRS/BNgMIW6uiRmeuQsz4Qh2UE= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.5 h1:kP3Me6Fy3vdi+9uHd7YLr6ewPxRL+PU6y15urfTaamU= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.5/go.mod h1:Gj7tm95r+QsDoN2Fhuz/3npQvcZbkEf5mL70n3Xfluc= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.35 h1:hMUCiE3Zi5AHrRNGf5j985u0WyqI6r2NULhUfo0N/No= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.35/go.mod h1:ipR5PvpSPqIqL5Mi82BxLnfMkHVbmco8kUwO2xrCi0M= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.29 h1:yOpYx+FTBdpk/g+sBU6Cb1H0U/TLEcYYp66mYqsPpcc= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.29/go.mod h1:M/eUABlDbw2uVrdAn+UsI6M727qp2fxkp8K0ejcBDUY= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.36 h1:8r5m1BoAWkn0TDC34lUculryf7nUF25EgIMdjvGCkgo= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.36/go.mod h1:Rmw2M1hMVTwiUhjwMoIBFWFJMhvJbct06sSidxInkhY= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.29 h1:IiDolu/eLmuB18DRZibj77n1hHQT7z12jnGO7Ze3pLc= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.29/go.mod h1:fDbkK4o7fpPXWn8YAPmTieAMuB9mk/VgvW64uaUqxd4= +github.com/aws/aws-sdk-go-v2/service/sso v1.12.13 h1:sWDv7cMITPcZ21QdreULwxOOAmE05JjEsT6fCDtDA9k= +github.com/aws/aws-sdk-go-v2/service/sso v1.12.13/go.mod h1:DfX0sWuT46KpcqbMhJ9QWtxAIP1VozkDWf8VAkByjYY= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.13 h1:BFubHS/xN5bjl818QaroN6mQdjneYQ+AOx44KNXlyH4= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.13/go.mod h1:BzqsVVFduubEmzrVtUFQQIQdFqvUItF8XUq2EnS8Wog= +github.com/aws/aws-sdk-go-v2/service/sts v1.19.3 h1:e5mnydVdCVWxP+5rPAGi2PYxC7u2OZgH1ypC114H04U= +github.com/aws/aws-sdk-go-v2/service/sts v1.19.3/go.mod h1:yVGZA1CPkmUhBdA039jXNJJG7/6t+G+EBWmFq23xqnY= +github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8= +github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -18,6 +44,8 @@ github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/ github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= diff --git a/receiver/kafkametricsreceiver/go.mod b/receiver/kafkametricsreceiver/go.mod index 15af5af066ab..b15de7f22f18 100644 --- a/receiver/kafkametricsreceiver/go.mod +++ b/receiver/kafkametricsreceiver/go.mod @@ -24,7 +24,20 @@ require ( ) require ( + github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0 // indirect github.com/aws/aws-sdk-go v1.55.5 // indirect + github.com/aws/aws-sdk-go-v2 v1.19.0 // indirect + github.com/aws/aws-sdk-go-v2/config v1.18.28 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.13.27 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.5 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.35 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.29 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.3.36 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.29 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.12.13 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.13 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.19.3 // indirect + github.com/aws/smithy-go v1.13.5 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/eapache/go-resiliency v1.7.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect diff --git a/receiver/kafkametricsreceiver/go.sum b/receiver/kafkametricsreceiver/go.sum index c3a9d94a07fc..68d155a459f3 100644 --- a/receiver/kafkametricsreceiver/go.sum +++ b/receiver/kafkametricsreceiver/go.sum @@ -1,7 +1,33 @@ github.com/IBM/sarama v1.43.3 h1:Yj6L2IaNvb2mRBop39N7mmJAHBVY3dTPncr3qGVkxPA= github.com/IBM/sarama v1.43.3/go.mod h1:FVIRaLrhK3Cla/9FfRF5X9Zua2KpS3SYIXxhac1H+FQ= +github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0 h1:UyjtGmO0Uwl/K+zpzPwLoXzMhcN9xmnR2nrqJoBrg3c= +github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0/go.mod h1:TJAXuFs2HcMib3sN5L0gUC+Q01Qvy3DemvA55WuC+iA= github.com/aws/aws-sdk-go v1.55.5 h1:KKUZBfBoyqy5d3swXyiC7Q76ic40rYcbqH7qjh59kzU= github.com/aws/aws-sdk-go v1.55.5/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU= +github.com/aws/aws-sdk-go-v2 v1.19.0 h1:klAT+y3pGFBU/qVf1uzwttpBbiuozJYWzNLHioyDJ+k= +github.com/aws/aws-sdk-go-v2 v1.19.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= +github.com/aws/aws-sdk-go-v2/config v1.18.28 h1:TINEaKyh1Td64tqFvn09iYpKiWjmHYrG1fa91q2gnqw= +github.com/aws/aws-sdk-go-v2/config v1.18.28/go.mod h1:nIL+4/8JdAuNHEjn/gPEXqtnS02Q3NXB/9Z7o5xE4+A= +github.com/aws/aws-sdk-go-v2/credentials v1.13.27 h1:dz0yr/yR1jweAnsCx+BmjerUILVPQ6FS5AwF/OyG1kA= +github.com/aws/aws-sdk-go-v2/credentials v1.13.27/go.mod h1:syOqAek45ZXZp29HlnRS/BNgMIW6uiRmeuQsz4Qh2UE= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.5 h1:kP3Me6Fy3vdi+9uHd7YLr6ewPxRL+PU6y15urfTaamU= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.5/go.mod h1:Gj7tm95r+QsDoN2Fhuz/3npQvcZbkEf5mL70n3Xfluc= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.35 h1:hMUCiE3Zi5AHrRNGf5j985u0WyqI6r2NULhUfo0N/No= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.35/go.mod h1:ipR5PvpSPqIqL5Mi82BxLnfMkHVbmco8kUwO2xrCi0M= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.29 h1:yOpYx+FTBdpk/g+sBU6Cb1H0U/TLEcYYp66mYqsPpcc= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.29/go.mod h1:M/eUABlDbw2uVrdAn+UsI6M727qp2fxkp8K0ejcBDUY= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.36 h1:8r5m1BoAWkn0TDC34lUculryf7nUF25EgIMdjvGCkgo= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.36/go.mod h1:Rmw2M1hMVTwiUhjwMoIBFWFJMhvJbct06sSidxInkhY= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.29 h1:IiDolu/eLmuB18DRZibj77n1hHQT7z12jnGO7Ze3pLc= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.29/go.mod h1:fDbkK4o7fpPXWn8YAPmTieAMuB9mk/VgvW64uaUqxd4= +github.com/aws/aws-sdk-go-v2/service/sso v1.12.13 h1:sWDv7cMITPcZ21QdreULwxOOAmE05JjEsT6fCDtDA9k= +github.com/aws/aws-sdk-go-v2/service/sso v1.12.13/go.mod h1:DfX0sWuT46KpcqbMhJ9QWtxAIP1VozkDWf8VAkByjYY= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.13 h1:BFubHS/xN5bjl818QaroN6mQdjneYQ+AOx44KNXlyH4= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.13/go.mod h1:BzqsVVFduubEmzrVtUFQQIQdFqvUItF8XUq2EnS8Wog= +github.com/aws/aws-sdk-go-v2/service/sts v1.19.3 h1:e5mnydVdCVWxP+5rPAGi2PYxC7u2OZgH1ypC114H04U= +github.com/aws/aws-sdk-go-v2/service/sts v1.19.3/go.mod h1:yVGZA1CPkmUhBdA039jXNJJG7/6t+G+EBWmFq23xqnY= +github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8= +github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -27,6 +53,7 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= diff --git a/receiver/kafkametricsreceiver/receiver.go b/receiver/kafkametricsreceiver/receiver.go index a2f79f6231e1..0032c762591a 100644 --- a/receiver/kafkametricsreceiver/receiver.go +++ b/receiver/kafkametricsreceiver/receiver.go @@ -49,7 +49,7 @@ var newMetricsReceiver = func( } sc.Version = version } - if err := kafka.ConfigureAuthentication(config.Authentication, sc); err != nil { + if err := kafka.ConfigureAuthentication(ctx, config.Authentication, sc); err != nil { return nil, err } scraperControllerOptions := make([]scraperhelper.ScraperControllerOption, 0, len(config.Scrapers)) diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index c136e6279c20..b7b25c4eadb6 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -56,8 +56,8 @@ The following settings can be optionally configured: - `sasl` - `username`: The username to use. - `password`: The password to use - - `mechanism`: The sasl mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512, AWS_MSK_IAM or PLAIN) - - `aws_msk.region`: AWS Region in case of AWS_MSK_IAM mechanism + - `mechanism`: The sasl mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512, AWS_MSK_IAM, AWS_MSK_IAM_OAUTHBEARER or PLAIN) + - `aws_msk.region`: AWS Region in case of AWS_MSK_IAM or AWS_MSK_IAM_OAUTHBEARER mechanism - `aws_msk.broker_addr`: MSK Broker address in case of AWS_MSK_IAM mechanism - `tls` - `ca_file`: path to the CA cert. For a client this verifies the server certificate. Should diff --git a/receiver/kafkareceiver/go.mod b/receiver/kafkareceiver/go.mod index 914695e631b2..f50bcac35bce 100644 --- a/receiver/kafkareceiver/go.mod +++ b/receiver/kafkareceiver/go.mod @@ -37,7 +37,20 @@ require ( ) require ( + github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0 // indirect github.com/aws/aws-sdk-go v1.55.5 // indirect + github.com/aws/aws-sdk-go-v2 v1.19.0 // indirect + github.com/aws/aws-sdk-go-v2/config v1.18.28 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.13.27 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.5 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.35 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.29 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.3.36 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.29 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.12.13 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.13 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.19.3 // indirect + github.com/aws/smithy-go v1.13.5 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect diff --git a/receiver/kafkareceiver/go.sum b/receiver/kafkareceiver/go.sum index 3ec03355f910..d1893738a734 100644 --- a/receiver/kafkareceiver/go.sum +++ b/receiver/kafkareceiver/go.sum @@ -2,8 +2,34 @@ github.com/IBM/sarama v1.43.3 h1:Yj6L2IaNvb2mRBop39N7mmJAHBVY3dTPncr3qGVkxPA= github.com/IBM/sarama v1.43.3/go.mod h1:FVIRaLrhK3Cla/9FfRF5X9Zua2KpS3SYIXxhac1H+FQ= github.com/apache/thrift v0.21.0 h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE= github.com/apache/thrift v0.21.0/go.mod h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw= +github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0 h1:UyjtGmO0Uwl/K+zpzPwLoXzMhcN9xmnR2nrqJoBrg3c= +github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0/go.mod h1:TJAXuFs2HcMib3sN5L0gUC+Q01Qvy3DemvA55WuC+iA= github.com/aws/aws-sdk-go v1.55.5 h1:KKUZBfBoyqy5d3swXyiC7Q76ic40rYcbqH7qjh59kzU= github.com/aws/aws-sdk-go v1.55.5/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU= +github.com/aws/aws-sdk-go-v2 v1.19.0 h1:klAT+y3pGFBU/qVf1uzwttpBbiuozJYWzNLHioyDJ+k= +github.com/aws/aws-sdk-go-v2 v1.19.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= +github.com/aws/aws-sdk-go-v2/config v1.18.28 h1:TINEaKyh1Td64tqFvn09iYpKiWjmHYrG1fa91q2gnqw= +github.com/aws/aws-sdk-go-v2/config v1.18.28/go.mod h1:nIL+4/8JdAuNHEjn/gPEXqtnS02Q3NXB/9Z7o5xE4+A= +github.com/aws/aws-sdk-go-v2/credentials v1.13.27 h1:dz0yr/yR1jweAnsCx+BmjerUILVPQ6FS5AwF/OyG1kA= +github.com/aws/aws-sdk-go-v2/credentials v1.13.27/go.mod h1:syOqAek45ZXZp29HlnRS/BNgMIW6uiRmeuQsz4Qh2UE= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.5 h1:kP3Me6Fy3vdi+9uHd7YLr6ewPxRL+PU6y15urfTaamU= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.5/go.mod h1:Gj7tm95r+QsDoN2Fhuz/3npQvcZbkEf5mL70n3Xfluc= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.35 h1:hMUCiE3Zi5AHrRNGf5j985u0WyqI6r2NULhUfo0N/No= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.35/go.mod h1:ipR5PvpSPqIqL5Mi82BxLnfMkHVbmco8kUwO2xrCi0M= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.29 h1:yOpYx+FTBdpk/g+sBU6Cb1H0U/TLEcYYp66mYqsPpcc= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.29/go.mod h1:M/eUABlDbw2uVrdAn+UsI6M727qp2fxkp8K0ejcBDUY= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.36 h1:8r5m1BoAWkn0TDC34lUculryf7nUF25EgIMdjvGCkgo= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.36/go.mod h1:Rmw2M1hMVTwiUhjwMoIBFWFJMhvJbct06sSidxInkhY= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.29 h1:IiDolu/eLmuB18DRZibj77n1hHQT7z12jnGO7Ze3pLc= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.29/go.mod h1:fDbkK4o7fpPXWn8YAPmTieAMuB9mk/VgvW64uaUqxd4= +github.com/aws/aws-sdk-go-v2/service/sso v1.12.13 h1:sWDv7cMITPcZ21QdreULwxOOAmE05JjEsT6fCDtDA9k= +github.com/aws/aws-sdk-go-v2/service/sso v1.12.13/go.mod h1:DfX0sWuT46KpcqbMhJ9QWtxAIP1VozkDWf8VAkByjYY= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.13 h1:BFubHS/xN5bjl818QaroN6mQdjneYQ+AOx44KNXlyH4= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.13/go.mod h1:BzqsVVFduubEmzrVtUFQQIQdFqvUItF8XUq2EnS8Wog= +github.com/aws/aws-sdk-go-v2/service/sts v1.19.3 h1:e5mnydVdCVWxP+5rPAGi2PYxC7u2OZgH1ypC114H04U= +github.com/aws/aws-sdk-go-v2/service/sts v1.19.3/go.mod h1:yVGZA1CPkmUhBdA039jXNJJG7/6t+G+EBWmFq23xqnY= +github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8= +github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -33,6 +59,7 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index 6d2140b39aae..1ec2d5aca6e9 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -130,7 +130,7 @@ func newTracesReceiver(config Config, set receiver.Settings, nextConsumer consum }, nil } -func createKafkaClient(config Config) (sarama.ConsumerGroup, error) { +func createKafkaClient(ctx context.Context, config Config) (sarama.ConsumerGroup, error) { saramaConfig := sarama.NewConfig() saramaConfig.ClientID = config.ClientID saramaConfig.Metadata.Full = config.Metadata.Full @@ -156,7 +156,7 @@ func createKafkaClient(config Config) (sarama.ConsumerGroup, error) { return nil, err } } - if err := kafka.ConfigureAuthentication(config.Authentication, saramaConfig); err != nil { + if err := kafka.ConfigureAuthentication(ctx, config.Authentication, saramaConfig); err != nil { return nil, err } return sarama.NewConsumerGroup(config.Brokers, config.GroupID, saramaConfig) @@ -191,7 +191,7 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro } // consumerGroup may be set in tests to inject fake implementation. if c.consumerGroup == nil { - if c.consumerGroup, err = createKafkaClient(c.config); err != nil { + if c.consumerGroup, err = createKafkaClient(ctx, c.config); err != nil { return err } } @@ -299,7 +299,7 @@ func (c *kafkaMetricsConsumer) Start(_ context.Context, host component.Host) err } // consumerGroup may be set in tests to inject fake implementation. if c.consumerGroup == nil { - if c.consumerGroup, err = createKafkaClient(c.config); err != nil { + if c.consumerGroup, err = createKafkaClient(ctx, c.config); err != nil { return err } } @@ -410,7 +410,7 @@ func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error } // consumerGroup may be set in tests to inject fake implementation. if c.consumerGroup == nil { - if c.consumerGroup, err = createKafkaClient(c.config); err != nil { + if c.consumerGroup, err = createKafkaClient(ctx, c.config); err != nil { return err } }