Skip to content

Commit

Permalink
Add SASL mechanism to kafkaexporter: "AWS_MSK_IAM_OAUTHBEARER"
Browse files Browse the repository at this point in the history
  • Loading branch information
donald-cheung committed Apr 18, 2024
1 parent 11d2988 commit c14de4e
Show file tree
Hide file tree
Showing 21 changed files with 238 additions and 12 deletions.
27 changes: 27 additions & 0 deletions .chloggen/kafka-exporter-aws-iam-oauth-bearer.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkaexporter, internal/kafka

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add a new mechanism "AWS_MSK_IAM_OAUTHBEARER" for kafka exporter and kafka receiver. This mechanism use the AWS MSK IAM SASL Signer for Go https://github.com/aws/aws-msk-iam-sasl-signer-go.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/19747]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
1 change: 1 addition & 0 deletions cmd/configschema/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ require (
github.com/apache/thrift v0.20.0 // indirect
github.com/ardielle/ardielle-go v1.5.2 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0 // indirect
github.com/aws/aws-sdk-go v1.51.22 // indirect
github.com/aws/aws-sdk-go-v2 v1.26.1 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions cmd/configschema/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cmd/otelcontribcol/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ require (
github.com/apache/thrift v0.20.0 // indirect
github.com/ardielle/ardielle-go v1.5.2 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0 // indirect
github.com/aws/aws-sdk-go v1.51.22 // indirect
github.com/aws/aws-sdk-go-v2 v1.26.1 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions cmd/otelcontribcol/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ The following settings can be optionally configured:
- `sasl`
- `username`: The username to use.
- `password`: The password to use
- `mechanism`: The SASL mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512, AWS_MSK_IAM or PLAIN)
- `mechanism`: The SASL mechanism to use (SCRAM-SHA-256, SCRAM-SHA-512, AWS_MSK_IAM, AWS_MSK_IAM_OAUTHBEARER or PLAIN)
- `version` (default = 0): The SASL protocol version to use (0 or 1)
- `aws_msk.region`: AWS Region in case of AWS_MSK_IAM mechanism
- `aws_msk.region`: AWS Region in case of AWS_MSK_IAM or AWS_MSK_IAM_OAUTHBEARER mechanism
- `aws_msk.broker_addr`: MSK Broker address in case of AWS_MSK_IAM mechanism
- `tls`
- `ca_file`: path to the CA cert. For a client this verifies the server certificate. Should
Expand Down
4 changes: 2 additions & 2 deletions exporter/kafkaexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,10 @@ func validateSASLConfig(c *kafka.SASLConfig) error {
}

switch c.Mechanism {
case "PLAIN", "AWS_MSK_IAM", "SCRAM-SHA-256", "SCRAM-SHA-512":
case "PLAIN", "AWS_MSK_IAM", "AWS_MSK_IAM_OAUTHBEARER", "SCRAM-SHA-256", "SCRAM-SHA-512":
// Do nothing, valid mechanism
default:
return fmt.Errorf("auth.sasl.mechanism should be one of 'PLAIN', 'AWS_MSK_IAM', 'SCRAM-SHA-256' or 'SCRAM-SHA-512'. configured value %v", c.Mechanism)
return fmt.Errorf("auth.sasl.mechanism should be one of 'PLAIN', 'AWS_MSK_IAM', 'AWS_MSK_IAM_OAUTHBEARER', 'SCRAM-SHA-256' or 'SCRAM-SHA-512'. configured value %v", c.Mechanism)
}

if c.Version < 0 || c.Version > 1 {
Expand Down
2 changes: 1 addition & 1 deletion exporter/kafkaexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func TestValidate_sasl_mechanism(t *testing.T) {
}

err := config.Validate()
assert.EqualError(t, err, "auth.sasl.mechanism should be one of 'PLAIN', 'AWS_MSK_IAM', 'SCRAM-SHA-256' or 'SCRAM-SHA-512'. configured value FAKE")
assert.EqualError(t, err, "auth.sasl.mechanism should be one of 'PLAIN', 'AWS_MSK_IAM', 'AWS_MSK_IAM_OAUTHBEARER', 'SCRAM-SHA-256' or 'SCRAM-SHA-512'. configured value FAKE")
}

func TestValidate_sasl_version(t *testing.T) {
Expand Down
13 changes: 13 additions & 0 deletions exporter/kafkaexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,20 @@ require (

require (
github.com/apache/thrift v0.19.0 // indirect
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0 // indirect
github.com/aws/aws-sdk-go v1.51.22 // indirect
github.com/aws/aws-sdk-go-v2 v1.19.0 // indirect
github.com/aws/aws-sdk-go-v2/config v1.18.28 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.27 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.35 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.29 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.36 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.29 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.12.13 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.13 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.19.3 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
Expand Down
27 changes: 27 additions & 0 deletions exporter/kafkaexporter/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ require (
github.com/apache/thrift v0.20.0 // indirect
github.com/ardielle/ardielle-go v1.5.2 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0 // indirect
github.com/aws/aws-sdk-go v1.51.22 // indirect
github.com/aws/aws-sdk-go-v2 v1.26.1 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 20 additions & 5 deletions internal/kafka/authentication.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"context"
"crypto/sha256"
"crypto/sha512"
"crypto/tls"
"fmt"

"github.com/IBM/sarama"
"github.com/aws/aws-msk-iam-sasl-signer-go/signer"
"go.opentelemetry.io/collector/config/configtls"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka/awsmsk"
Expand All @@ -35,7 +37,7 @@ type SASLConfig struct {
Username string `mapstructure:"username"`
// Password to be used on authentication
Password string `mapstructure:"password"`
// SASL Mechanism to be used, possible values are: (PLAIN, AWS_MSK_IAM, SCRAM-SHA-256 or SCRAM-SHA-512).
// SASL Mechanism to be used, possible values are: (PLAIN, AWS_MSK_IAM, AWS_MSK_IAM_OAUTHBEARER, SCRAM-SHA-256 or SCRAM-SHA-512).
Mechanism string `mapstructure:"mechanism"`
// SASL Protocol Version to be used, possible values are: (0, 1). Defaults to 0.
Version int `mapstructure:"version"`
Expand All @@ -44,14 +46,21 @@ type SASLConfig struct {
}

// AWSMSKConfig defines the additional SASL authentication
// measures needed to use AWS_MSK_IAM mechanism
// measures needed to use AWS_MSK_IAM and AWS_MSK_IAM_OAUTHBEARER mechanism
type AWSMSKConfig struct {
// Region is the AWS region the MSK cluster is based in
Region string `mapstructure:"region"`
// BrokerAddr is the client is connecting to in order to perform the auth required
BrokerAddr string `mapstructure:"broker_addr"`
}

// Token return the AWS session token for the AWS_MSK_IAM_OAUTHBEARER mechanism
func (c *AWSMSKConfig) Token() (*sarama.AccessToken, error) {
token, _, err := signer.GenerateAuthToken(context.TODO(), c.Region)

return &sarama.AccessToken{Token: token}, err
}

// KerberosConfig defines kereros configuration.
type KerberosConfig struct {
ServiceName string `mapstructure:"service_name"`
Expand Down Expand Up @@ -93,11 +102,11 @@ func configurePlaintext(config PlainTextConfig, saramaConfig *sarama.Config) {

func configureSASL(config SASLConfig, saramaConfig *sarama.Config) error {

if config.Username == "" {
if config.Username == "" && config.Mechanism != "AWS_MSK_IAM_OAUTHBEARER" {
return fmt.Errorf("username have to be provided")
}

if config.Password == "" {
if config.Password == "" && config.Mechanism != "AWS_MSK_IAM_OAUTHBEARER" {
return fmt.Errorf("password have to be provided")
}

Expand All @@ -119,8 +128,14 @@ func configureSASL(config SASLConfig, saramaConfig *sarama.Config) error {
return awsmsk.NewIAMSASLClient(config.AWSMSK.BrokerAddr, config.AWSMSK.Region, saramaConfig.ClientID)
}
saramaConfig.Net.SASL.Mechanism = awsmsk.Mechanism
case "AWS_MSK_IAM_OAUTHBEARER":
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth
saramaConfig.Net.SASL.TokenProvider = &config.AWSMSK
tlsConfig := tls.Config{}
saramaConfig.Net.TLS.Enable = true
saramaConfig.Net.TLS.Config = &tlsConfig
default:
return fmt.Errorf(`invalid SASL Mechanism %q: can be either "PLAIN", "AWS_MSK_IAM", "SCRAM-SHA-256" or "SCRAM-SHA-512"`, config.Mechanism)
return fmt.Errorf(`invalid SASL Mechanism %q: can be either "PLAIN", "AWS_MSK_IAM", "AWS_MSK_IAM_OAUTHBEARER", "SCRAM-SHA-256" or "SCRAM-SHA-512"`, config.Mechanism)
}

switch config.Version {
Expand Down
14 changes: 14 additions & 0 deletions internal/kafka/authentication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package kafka

import (
"context"
"crypto/tls"
"testing"

"github.com/IBM/sarama"
Expand Down Expand Up @@ -52,6 +53,15 @@ func TestAuthentication(t *testing.T) {
require.NoError(t, err)
saramaTLSCfg.Net.TLS.Config = tlscfg

saramaSASLAWSIAMOATUHConfig := &sarama.Config{}
saramaSASLAWSIAMOATUHConfig.Net.SASL.Enable = true
saramaSASLAWSIAMOATUHConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth
saramaSASLAWSIAMOATUHConfig.Net.SASL.TokenProvider = &AWSMSKConfig{Region: "region"}

tlsConfig := tls.Config{}
saramaSASLAWSIAMOATUHConfig.Net.TLS.Enable = true
saramaSASLAWSIAMOATUHConfig.Net.TLS.Config = &tlsConfig

saramaKerberosCfg := &sarama.Config{}
saramaKerberosCfg.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI
saramaKerberosCfg.Net.SASL.Enable = true
Expand Down Expand Up @@ -108,6 +118,10 @@ func TestAuthentication(t *testing.T) {
auth: Authentication{SASL: &SASLConfig{Username: "jdoe", Password: "pass", Mechanism: "PLAIN"}},
saramaConfig: saramaSASLPLAINConfig,
},
{
auth: Authentication{SASL: &SASLConfig{Username: "", Password: "", Mechanism: "AWS_MSK_IAM_OAUTHBEARER", AWSMSK: AWSMSKConfig{Region: "region"}}},
saramaConfig: saramaSASLAWSIAMOATUHConfig,
},
{
auth: Authentication{SASL: &SASLConfig{Username: "jdoe", Password: "pass", Mechanism: "SCRAM-SHA-222"}},
saramaConfig: saramaSASLSCRAM512Config,
Expand Down
13 changes: 13 additions & 0 deletions internal/kafka/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.21

require (
github.com/IBM/sarama v1.43.1
github.com/aws/aws-msk-iam-sasl-signer-go v1.0.0
github.com/aws/aws-sdk-go v1.51.22
github.com/stretchr/testify v1.9.0
github.com/xdg-go/scram v1.1.2
Expand All @@ -13,6 +14,18 @@ require (
)

require (
github.com/aws/aws-sdk-go-v2 v1.19.0 // indirect
github.com/aws/aws-sdk-go-v2/config v1.18.28 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.27 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.35 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.29 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.36 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.29 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.12.13 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.13 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.19.3 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/eapache/go-resiliency v1.6.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
Expand Down
Loading

0 comments on commit c14de4e

Please sign in to comment.