Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kafka/internal, kafkaexporter, kafkareceiver] Add SASL mechanism "AWS_MSK_IAM_OAUTHBEARER" to kafkaexporter #32500

Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
c14de4e
Add SASL mechanism to kafkaexporter: "AWS_MSK_IAM_OAUTHBEARER"
donald-cheung Apr 18, 2024
e9ecbc1
Merge branch 'main' into kafka-exporter-aws-iam-oauth-bearer
donald-cheung May 7, 2024
7955f9d
Merge branch 'main' into kafka-exporter-aws-iam-oauth-bearer
donald-cheung Jul 23, 2024
b4c0c73
Run `make checks`
donald-cheung Jul 23, 2024
f6148a6
Run make checks again
donald-cheung Jul 23, 2024
bad9c2e
Merge branch 'main' into kafka-exporter-aws-iam-oauth-bearer
donald-cheung Aug 6, 2024
c25c058
fix chloggen error
donald-cheung Aug 6, 2024
e0f68d7
Revert the toolchain to 1.21.12
donald-cheung Aug 6, 2024
e934bc0
Merge branch 'main' into kafka-exporter-aws-iam-oauth-bearer
donald-cheung Aug 7, 2024
596c814
Merge branch 'main' into kafka-exporter-aws-iam-oauth-bearer
donald-cheung Aug 21, 2024
897d705
Update changelog
donald-cheung Aug 23, 2024
ddda641
update changelog
donald-cheung Aug 23, 2024
1c502fd
Pass context from parents
donald-cheung Sep 5, 2024
a35c776
fix unit test of authentication.go
donald-cheung Sep 6, 2024
fd12dd2
fix context
donald-cheung Sep 6, 2024
b5ffb5a
Merge branch 'main' into kafka-exporter-aws-iam-oauth-bearer
donald-cheung Sep 13, 2024
fa288ba
Merge branch 'main' into kafka-exporter-aws-iam-oauth-bearer
donald-cheung Sep 23, 2024
2c803f1
Merge branch 'main' into kafka-exporter-aws-iam-oauth-bearer
donald-cheung Sep 25, 2024
154dfe8
Merge branch 'main' into kafka-exporter-aws-iam-oauth-bearer
donald-cheung Oct 7, 2024
daf479b
Merge branch 'main' into kafka-exporter-aws-iam-oauth-bearer
donald-cheung Oct 9, 2024
f3c9057
Merge branch 'main' into kafka-exporter-aws-iam-oauth-bearer
donald-cheung Nov 13, 2024
15bd9f9
Merge branch 'main' into kafka-exporter-aws-iam-oauth-bearer
MovieStoreGuy Dec 3, 2024
f3f2c39
remove extra newline
donald-cheung Dec 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/kafka-exporter-aws-iam-oauth-bearer.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkaexporter, kafkareceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add a new mechanism "AWS_MSK_IAM_OAUTHBEARER" for kafka exporter and kafka receiver. This mechanism use the AWS MSK IAM SASL Signer for Go https://github.com/aws/aws-msk-iam-sasl-signer-go.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [19747]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
4 changes: 2 additions & 2 deletions exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ The following settings can be optionally configured:
- `sasl`
- `username`: The username to use.
- `password`: The password to use
- `mechanism`: The SASL mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512, AWS_MSK_IAM or PLAIN)
- `mechanism`: The SASL mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512, AWS_MSK_IAM, AWS_MSK_IAM_OAUTHBEARER or PLAIN)
- `version` (default = 0): The SASL protocol version to use (0 or 1)
- `aws_msk.region`: AWS Region in case of AWS_MSK_IAM mechanism
- `aws_msk.region`: AWS Region in case of AWS_MSK_IAM or AWS_MSK_IAM_OAUTHBEARER mechanism
- `aws_msk.broker_addr`: MSK Broker address in case of AWS_MSK_IAM mechanism
- `tls`: see [TLS Configuration Settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/configtls/README.md) for the full set of available options.
- `ca_file`: path to the CA cert. For a client this verifies the server certificate. Should
Expand Down
4 changes: 2 additions & 2 deletions exporter/kafkaexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,10 @@ func validateSASLConfig(c *kafka.SASLConfig) error {
}

switch c.Mechanism {
case "PLAIN", "AWS_MSK_IAM", "SCRAM-SHA-256", "SCRAM-SHA-512":
case "PLAIN", "AWS_MSK_IAM", "AWS_MSK_IAM_OAUTHBEARER", "SCRAM-SHA-256", "SCRAM-SHA-512":
// Do nothing, valid mechanism
default:
return fmt.Errorf("auth.sasl.mechanism should be one of 'PLAIN', 'AWS_MSK_IAM', 'SCRAM-SHA-256' or 'SCRAM-SHA-512'. configured value %v", c.Mechanism)
return fmt.Errorf("auth.sasl.mechanism should be one of 'PLAIN', 'AWS_MSK_IAM', 'AWS_MSK_IAM_OAUTHBEARER', 'SCRAM-SHA-256' or 'SCRAM-SHA-512'. configured value %v", c.Mechanism)
}

if c.Version < 0 || c.Version > 1 {
Expand Down
2 changes: 1 addition & 1 deletion exporter/kafkaexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func TestValidate_sasl_mechanism(t *testing.T) {
}

err := config.Validate()
assert.EqualError(t, err, "auth.sasl.mechanism should be one of 'PLAIN', 'AWS_MSK_IAM', 'SCRAM-SHA-256' or 'SCRAM-SHA-512'. configured value FAKE")
assert.EqualError(t, err, "auth.sasl.mechanism should be one of 'PLAIN', 'AWS_MSK_IAM', 'AWS_MSK_IAM_OAUTHBEARER', 'SCRAM-SHA-256' or 'SCRAM-SHA-512'. configured value FAKE")
}

func TestValidate_sasl_version(t *testing.T) {
Expand Down
13 changes: 13 additions & 0 deletions exporter/kafkaexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,20 @@ require (

require (
github.com/apache/thrift v0.21.0 // indirect
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0 // indirect
github.com/aws/aws-sdk-go v1.55.5 // indirect
github.com/aws/aws-sdk-go-v2 v1.19.0 // indirect
github.com/aws/aws-sdk-go-v2/config v1.18.28 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.27 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.35 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.29 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.36 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.29 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.12.13 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.13 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.19.3 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/eapache/go-resiliency v1.7.0 // indirect
Expand Down
27 changes: 27 additions & 0 deletions exporter/kafkaexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (e *kafkaTracesProducer) Close(context.Context) error {
return e.producer.Close()
}

func (e *kafkaTracesProducer) start(_ context.Context, host component.Host) error {
func (e *kafkaTracesProducer) start(ctx context.Context, host component.Host) error {
// extensions take precedence over internal encodings
if marshaler, errExt := loadEncodingExtension[ptrace.Marshaler](
host,
Expand All @@ -83,7 +83,7 @@ func (e *kafkaTracesProducer) start(_ context.Context, host component.Host) erro
if e.marshaler == nil {
return errUnrecognizedEncoding
}
producer, err := newSaramaProducer(e.cfg)
producer, err := newSaramaProducer(ctx, e.cfg)
if err != nil {
return err
}
Expand Down Expand Up @@ -124,7 +124,7 @@ func (e *kafkaMetricsProducer) Close(context.Context) error {
return e.producer.Close()
}

func (e *kafkaMetricsProducer) start(_ context.Context, host component.Host) error {
func (e *kafkaMetricsProducer) start(ctx context.Context, host component.Host) error {
// extensions take precedence over internal encodings
if marshaler, errExt := loadEncodingExtension[pmetric.Marshaler](
host,
Expand All @@ -141,7 +141,7 @@ func (e *kafkaMetricsProducer) start(_ context.Context, host component.Host) err
if e.marshaler == nil {
return errUnrecognizedEncoding
}
producer, err := newSaramaProducer(e.cfg)
producer, err := newSaramaProducer(ctx, e.cfg)
if err != nil {
return err
}
Expand Down Expand Up @@ -182,7 +182,7 @@ func (e *kafkaLogsProducer) Close(context.Context) error {
return e.producer.Close()
}

func (e *kafkaLogsProducer) start(_ context.Context, host component.Host) error {
func (e *kafkaLogsProducer) start(ctx context.Context, host component.Host) error {
// extensions take precedence over internal encodings
if marshaler, errExt := loadEncodingExtension[plog.Marshaler](
host,
Expand All @@ -199,15 +199,15 @@ func (e *kafkaLogsProducer) start(_ context.Context, host component.Host) error
if e.marshaler == nil {
return errUnrecognizedEncoding
}
producer, err := newSaramaProducer(e.cfg)
producer, err := newSaramaProducer(ctx, e.cfg)
if err != nil {
return err
}
e.producer = producer
return nil
}

func newSaramaProducer(config Config) (sarama.SyncProducer, error) {
func newSaramaProducer(ctx context.Context, config Config) (sarama.SyncProducer, error) {
c := sarama.NewConfig()

c.ClientID = config.ClientID
Expand Down Expand Up @@ -236,7 +236,7 @@ func newSaramaProducer(config Config) (sarama.SyncProducer, error) {
c.Version = version
}

if err := kafka.ConfigureAuthentication(config.Authentication, c); err != nil {
if err := kafka.ConfigureAuthentication(ctx, config.Authentication, c); err != nil {
return nil, err
}

Expand Down
35 changes: 27 additions & 8 deletions internal/kafka/authentication.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"context"
"crypto/sha256"
"crypto/sha512"
"crypto/tls"
"fmt"

"github.com/IBM/sarama"
"github.com/aws/aws-msk-iam-sasl-signer-go/signer"
"go.opentelemetry.io/collector/config/configtls"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/awsmsk"
Expand All @@ -35,7 +37,7 @@ type SASLConfig struct {
Username string `mapstructure:"username"`
// Password to be used on authentication
Password string `mapstructure:"password"`
// SASL Mechanism to be used, possible values are: (PLAIN, AWS_MSK_IAM, SCRAM-SHA-256 or SCRAM-SHA-512).
// SASL Mechanism to be used, possible values are: (PLAIN, AWS_MSK_IAM, AWS_MSK_IAM_OAUTHBEARER, SCRAM-SHA-256 or SCRAM-SHA-512).
Mechanism string `mapstructure:"mechanism"`
// SASL Protocol Version to be used, possible values are: (0, 1). Defaults to 0.
Version int `mapstructure:"version"`
Expand All @@ -44,12 +46,21 @@ type SASLConfig struct {
}

// AWSMSKConfig defines the additional SASL authentication
// measures needed to use AWS_MSK_IAM mechanism
// measures needed to use AWS_MSK_IAM and AWS_MSK_IAM_OAUTHBEARER mechanism
type AWSMSKConfig struct {
// Region is the AWS region the MSK cluster is based in
Region string `mapstructure:"region"`
// BrokerAddr is the client is connecting to in order to perform the auth required
BrokerAddr string `mapstructure:"broker_addr"`
// Context
ctx context.Context
}

// Token return the AWS session token for the AWS_MSK_IAM_OAUTHBEARER mechanism
func (c *AWSMSKConfig) Token() (*sarama.AccessToken, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please pass in the context to avoid using context.TODO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have changed the code to pass the context from parent's function. PTAL. Thank you.

token, _, err := signer.GenerateAuthToken(c.ctx, c.Region)

return &sarama.AccessToken{Token: token}, err
}

// KerberosConfig defines kerberos configuration.
Expand All @@ -65,7 +76,7 @@ type KerberosConfig struct {
}

// ConfigureAuthentication configures authentication in sarama.Config.
func ConfigureAuthentication(config Authentication, saramaConfig *sarama.Config) error {
func ConfigureAuthentication(ctx context.Context, config Authentication, saramaConfig *sarama.Config) error {
if config.PlainText != nil {
configurePlaintext(*config.PlainText, saramaConfig)
}
Expand All @@ -75,7 +86,7 @@ func ConfigureAuthentication(config Authentication, saramaConfig *sarama.Config)
}
}
if config.SASL != nil {
if err := configureSASL(*config.SASL, saramaConfig); err != nil {
if err := configureSASL(ctx, *config.SASL, saramaConfig); err != nil {
return err
}
}
Expand All @@ -92,12 +103,13 @@ func configurePlaintext(config PlainTextConfig, saramaConfig *sarama.Config) {
saramaConfig.Net.SASL.Password = config.Password
}

func configureSASL(config SASLConfig, saramaConfig *sarama.Config) error {
if config.Username == "" {
func configureSASL(ctx context.Context, config SASLConfig, saramaConfig *sarama.Config) error {

if config.Username == "" && config.Mechanism != "AWS_MSK_IAM_OAUTHBEARER" {
return fmt.Errorf("username have to be provided")
}

if config.Password == "" {
if config.Password == "" && config.Mechanism != "AWS_MSK_IAM_OAUTHBEARER" {
return fmt.Errorf("password have to be provided")
}

Expand All @@ -119,8 +131,15 @@ func configureSASL(config SASLConfig, saramaConfig *sarama.Config) error {
return awsmsk.NewIAMSASLClient(config.AWSMSK.BrokerAddr, config.AWSMSK.Region, saramaConfig.ClientID)
}
saramaConfig.Net.SASL.Mechanism = awsmsk.Mechanism
case "AWS_MSK_IAM_OAUTHBEARER":
config.AWSMSK.ctx = ctx
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth
saramaConfig.Net.SASL.TokenProvider = &config.AWSMSK
tlsConfig := tls.Config{}
saramaConfig.Net.TLS.Enable = true
saramaConfig.Net.TLS.Config = &tlsConfig
default:
return fmt.Errorf(`invalid SASL Mechanism %q: can be either "PLAIN", "AWS_MSK_IAM", "SCRAM-SHA-256" or "SCRAM-SHA-512"`, config.Mechanism)
return fmt.Errorf(`invalid SASL Mechanism %q: can be either "PLAIN", "AWS_MSK_IAM", "AWS_MSK_IAM_OAUTHBEARER", "SCRAM-SHA-256" or "SCRAM-SHA-512"`, config.Mechanism)
}

switch config.Version {
Expand Down
17 changes: 16 additions & 1 deletion internal/kafka/authentication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package kafka

import (
"context"
"crypto/tls"
"testing"

"github.com/IBM/sarama"
Expand Down Expand Up @@ -51,6 +52,16 @@ func TestAuthentication(t *testing.T) {
require.NoError(t, err)
saramaTLSCfg.Net.TLS.Config = tlscfg

ctx := context.Background()
saramaSASLAWSIAMOATUHConfig := &sarama.Config{}
saramaSASLAWSIAMOATUHConfig.Net.SASL.Enable = true
saramaSASLAWSIAMOATUHConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth
saramaSASLAWSIAMOATUHConfig.Net.SASL.TokenProvider = &AWSMSKConfig{Region: "region", ctx: ctx}

tlsConfig := tls.Config{}
saramaSASLAWSIAMOATUHConfig.Net.TLS.Enable = true
saramaSASLAWSIAMOATUHConfig.Net.TLS.Config = &tlsConfig

saramaKerberosCfg := &sarama.Config{}
saramaKerberosCfg.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI
saramaKerberosCfg.Net.SASL.Enable = true
Expand Down Expand Up @@ -129,6 +140,10 @@ func TestAuthentication(t *testing.T) {
auth: Authentication{SASL: &SASLConfig{Username: "jdoe", Password: "pass", Mechanism: "PLAIN"}},
saramaConfig: saramaSASLPLAINConfig,
},
{
auth: Authentication{SASL: &SASLConfig{Username: "", Password: "", Mechanism: "AWS_MSK_IAM_OAUTHBEARER", AWSMSK: AWSMSKConfig{Region: "region"}}},
saramaConfig: saramaSASLAWSIAMOATUHConfig,
},
{
auth: Authentication{SASL: &SASLConfig{Username: "jdoe", Password: "pass", Mechanism: "SCRAM-SHA-222"}},
saramaConfig: saramaSASLSCRAM512Config,
Expand All @@ -153,7 +168,7 @@ func TestAuthentication(t *testing.T) {
for _, test := range tests {
t.Run("", func(t *testing.T) {
config := &sarama.Config{}
err := ConfigureAuthentication(test.auth, config)
err := ConfigureAuthentication(context.Background(), test.auth, config)
if test.err != "" {
assert.ErrorContains(t, err, test.err)
} else {
Expand Down
13 changes: 13 additions & 0 deletions internal/kafka/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.22.0

require (
github.com/IBM/sarama v1.43.3
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0
github.com/aws/aws-sdk-go v1.55.5
github.com/stretchr/testify v1.10.0
github.com/xdg-go/scram v1.1.2
Expand All @@ -13,6 +14,18 @@ require (
)

require (
github.com/aws/aws-sdk-go-v2 v1.19.0 // indirect
github.com/aws/aws-sdk-go-v2/config v1.18.28 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.27 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.35 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.29 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.36 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.29 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.12.13 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.13 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.19.3 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/eapache/go-resiliency v1.7.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
Expand Down
Loading
Loading