From a93114292f55af42bc1b261d5611c6630596e7e9 Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Mon, 18 Nov 2024 15:49:56 -0600 Subject: [PATCH 01/14] feat(kafka): add iam roles anywhere auth profile + mv assume role to new profile Signed-off-by: Samantha Coyle --- .../builtin-authentication-profiles.yaml | 23 ++++ .../builtin-authentication-profiles.go | 77 ++++++++++--- bindings/kafka/metadata.yaml | 102 +++++++++--------- common/authentication/aws/aws.go | 11 +- common/authentication/aws/client.go | 41 +++++++ common/authentication/aws/static.go | 76 +++++++++++-- common/authentication/aws/x509.go | 30 ++++++ common/component/kafka/auth.go | 59 ---------- common/component/kafka/kafka.go | 79 ++++++++++++-- pubsub/kafka/metadata.yaml | 102 +++++++++--------- 10 files changed, 409 insertions(+), 191 deletions(-) diff --git a/.build-tools/builtin-authentication-profiles.yaml b/.build-tools/builtin-authentication-profiles.yaml index cccb195a44..f07a12abcd 100644 --- a/.build-tools/builtin-authentication-profiles.yaml +++ b/.build-tools/builtin-authentication-profiles.yaml @@ -25,8 +25,31 @@ aws: description: | AWS session token to use. A session token is only required if you are using temporary security credentials. + - title: "AWS: Assume specific IAM Role" + description: | + Assume a specific IAM role. Note: This is only supported on Kafka and PostgreSQL components. + metadata: + - name: sessionToken + required: false + sensitive: true + description: | + AWS session token to use. A session token is only required if you are using + temporary security credentials. example: '"TOKEN"' type: string + - name: assumeRoleArn + type: string + required: false + description: | + IAM role that has access to AWS resource. + This is another option to authenticate with MSK and RDS Aurora aside from the AWS Credentials. + example: '"arn:aws:iam::123456789:role/mskRole"' + - name: sessionName + type: string + description: | + The session name for assuming a role. + example: '"MyAppSession"' + default: '"MSKSASLDefaultSession"' - title: "AWS: Credentials from Environment Variables" description: Use AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY from the environment - title: "AWS: IAM Roles Anywhere" diff --git a/.build-tools/pkg/metadataschema/builtin-authentication-profiles.go b/.build-tools/pkg/metadataschema/builtin-authentication-profiles.go index 28d6659188..49d18803ab 100644 --- a/.build-tools/pkg/metadataschema/builtin-authentication-profiles.go +++ b/.build-tools/pkg/metadataschema/builtin-authentication-profiles.go @@ -34,9 +34,25 @@ func ParseBuiltinAuthenticationProfile(bi BuiltinAuthenticationProfile, componen res[i].Metadata = mergedMetadata(bi.Metadata, res[i].Metadata...) - // If component is PostgreSQL, filter out duplicated aws profile fields - if strings.ToLower(componentTitle) == "postgresql" && bi.Name == "aws" { - res[i].Metadata = filterOutDuplicateFields(res[i].Metadata) + switch profile.Title { + case "AWS: Access Key ID and Secret Access Key": + // If component is PostgreSQL, handle deprecation of aws profile fields that we will remove in Dapr 1.17 + // Postgres has awsAccessKey and accessKey and awsSecretKey and secretKey. + // Therefore, we mark the non aws prefixed ones as not required as we deprecate the aws prefixed ones. + if strings.ToLower(componentTitle) == "postgresql" && bi.Name == "aws" { + res[i].Metadata = removeRequiredOnSomeAWSFields(res[i].Metadata) + } + // If component is Kafka, handle deprecation of aws profile fields with aws prefix that we will remove in Dapr 1.17 + if strings.ToLower(componentTitle) == "Apache Kafka" { + res[i].Metadata = removeRequiredOnSomeAWSFields(res[i].Metadata) + } + + case "AWS: Credentials from Environment Variables", "AWS: IAM Roles Anywhere": + // These two auth profiles do not use the fields that we are deprecated, so we can manually remove the unrelated fields + res[i].Metadata = removeAllDeprecatedFieldsOnUnrelatedAuthProfiles(res[i].Metadata) + case "AWS: Assume specific IAM Role": + // This is needed bc to assume a specific IAM role, we must allow for the field of awsStsSessionName to deprecate to sessionName + res[i].Metadata = removeSomeDeprecatedFieldsOnUnrelatedAuthProfiles(res[i].Metadata) } } @@ -54,10 +70,14 @@ func mergedMetadata(base []Metadata, add ...Metadata) []Metadata { return res } -// filterOutDuplicateFields removes specific duplicated fields from the metadata -func filterOutDuplicateFields(metadata []Metadata) []Metadata { +// removeRequiredOnSomeAWSFields needs to be removed in Dapr 1.17 as duplicated AWS IAM fields get removed, +// and we standardize on these fields. +// Currently, there are: awsAccessKey, accessKey and awsSecretKey, secretKey fields. +// We normally have accessKey and secretKey fields marked required as it is part of the builtin AWS auth profile fields. +// However, as we rm the aws prefixed ones, we need to then mark the normally required ones as not required only for postgres and kafka. +// This way we do not break existing users, and transition them to the standardized fields. +func removeRequiredOnSomeAWSFields(metadata []Metadata) []Metadata { duplicateFields := map[string]int{ - "awsRegion": 0, "accessKey": 0, "secretKey": 0, } @@ -65,17 +85,46 @@ func filterOutDuplicateFields(metadata []Metadata) []Metadata { filteredMetadata := []Metadata{} for _, field := range metadata { - if _, exists := duplicateFields[field.Name]; !exists { + if field.Name == "accessKey" && duplicateFields["accessKey"] == 0 { + field.Required = false + filteredMetadata = append(filteredMetadata, field) + } else if field.Name == "secretKey" && duplicateFields["secretKey"] == 0 { + field.Required = false filteredMetadata = append(filteredMetadata, field) - } else { - if field.Name == "awsRegion" && duplicateFields["awsRegion"] == 0 { - filteredMetadata = append(filteredMetadata, field) - duplicateFields["awsRegion"]++ - } else if field.Name != "awsRegion" { - continue - } } } return filteredMetadata } + +func removeAllDeprecatedFieldsOnUnrelatedAuthProfiles(metadata []Metadata) []Metadata { + filteredMetadata := []Metadata{} + + for _, field := range metadata { + switch field.Name { + case "awsAccessKey", "awsSecretKey", "awsSessionToken", "awsIamRoleArn", "awsStsSessionName": + continue + default: + filteredMetadata = append(filteredMetadata, field) + } + + } + + return filteredMetadata +} + +func removeSomeDeprecatedFieldsOnUnrelatedAuthProfiles(metadata []Metadata) []Metadata { + filteredMetadata := []Metadata{} + + for _, field := range metadata { + switch field.Name { + case "awsAccessKey", "awsSecretKey", "awsSessionToken": + continue + default: + filteredMetadata = append(filteredMetadata, field) + } + + } + + return filteredMetadata +} diff --git a/bindings/kafka/metadata.yaml b/bindings/kafka/metadata.yaml index ab24f3e8fe..bb52b69f18 100644 --- a/bindings/kafka/metadata.yaml +++ b/bindings/kafka/metadata.yaml @@ -14,6 +14,59 @@ binding: operations: - name: create description: "Publish a new message in the topic." +builtinAuthenticationProfiles: + - name: "aws" + metadata: + - name: authType + type: string + required: true + description: | + Authentication type. + This must be set to "awsiam" for this authentication profile. + example: '"awsiam"' + allowedValues: + - "awsiam" + - name: awsAccessKey + type: string + required: false + description: | + Deprecated as of Dapr 1.17. Use 'accessKey' instead. + If both fields are set, then 'accessKey' value will be used. + AWS access key associated with an IAM account. + example: '"AKIAIOSFODNN7EXAMPLE"' + - name: awsSecretKey + type: string + required: false + sensitive: true + description: | + Deprecated as of Dapr 1.17. Use 'secretKey' instead. + If both fields are set, then 'secretKey' value will be used. + The secret key associated with the access key. + example: '"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"' + - name: awsSessionToken + type: string + sensitive: true + description: | + Deprecated as of Dapr 1.17. Use 'sessionToken' instead. + If both fields are set, then 'sessionToken' value will be used. + AWS session token to use. A session token is only required if you are using temporary security credentials. + example: '"TOKEN"' + - name: awsIamRoleArn + type: string + required: false + description: | + Deprecated as of Dapr 1.17. Use 'assumeRoleArn' instead. + If both fields are set, then 'assumeRoleArn' value will be used. + IAM role that has access to MSK. This is another option to authenticate with MSK aside from the AWS Credentials. + example: '"arn:aws:iam::123456789:role/mskRole"' + - name: awsStsSessionName + type: string + description: | + Deprecated as of Dapr 1.17. Use 'sessionName' instead. + If both fields are set, then 'sessionName' value will be used. + Represents the session name for assuming a role. + example: '"MyAppSession"' + default: '"MSKSASLDefaultSession"' authenticationProfiles: - title: "OIDC Authentication" description: | @@ -139,55 +192,6 @@ authenticationProfiles: example: '"none"' allowedValues: - "none" - - title: "AWS IAM" - description: "Authenticate using AWS IAM credentials or role for AWS MSK" - metadata: - - name: authType - type: string - required: true - description: | - Authentication type. - This must be set to "awsiam" for this authentication profile. - example: '"awsiam"' - allowedValues: - - "awsiam" - - name: awsRegion - type: string - required: true - description: | - The AWS Region where the MSK Kafka broker is deployed to. - example: '"us-east-1"' - - name: awsAccessKey - type: string - required: true - description: | - AWS access key associated with an IAM account. - example: '"AKIAIOSFODNN7EXAMPLE"' - - name: awsSecretKey - type: string - required: true - sensitive: true - description: | - The secret key associated with the access key. - example: '"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"' - - name: awsSessionToken - type: string - sensitive: true - description: | - AWS session token to use. A session token is only required if you are using temporary security credentials. - example: '"TOKEN"' - - name: awsIamRoleArn - type: string - required: true - description: | - IAM role that has access to MSK. This is another option to authenticate with MSK aside from the AWS Credentials. - example: '"arn:aws:iam::123456789:role/mskRole"' - - name: awsStsSessionName - type: string - description: | - Represents the session name for assuming a role. - example: '"MyAppSession"' - default: '"MSKSASLDefaultSession"' metadata: - name: topics type: string diff --git a/common/authentication/aws/aws.go b/common/authentication/aws/aws.go index a45eb48277..1b81b92234 100644 --- a/common/authentication/aws/aws.go +++ b/common/authentication/aws/aws.go @@ -20,6 +20,7 @@ import ( "strconv" "time" + "github.com/IBM/sarama" "github.com/aws/aws-sdk-go-v2/config" v2creds "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/feature/rds/auth" @@ -59,9 +60,11 @@ type Options struct { PoolConfig *pgxpool.Config `json:"poolConfig" mapstructure:"poolConfig"` ConnectionString string `json:"connectionString" mapstructure:"connectionString"` - Region string `json:"region" mapstructure:"region"` - AccessKey string `json:"accessKey" mapstructure:"accessKey"` - SecretKey string `json:"secretKey" mapstructure:"secretKey"` + Region string `json:"region" mapstructure:"region"` + AccessKey string `json:"accessKey" mapstructure:"accessKey"` + SecretKey string `json:"secretKey" mapstructure:"secretKey"` + SessionName string `mapstructure:"sessionName"` + AssumeRoleARN string `mapstructure:"assumeRoleArn"` Endpoint string SessionToken string @@ -91,6 +94,8 @@ type Provider interface { Kinesis() *KinesisClients Ses() *SesClients + UpdateKafka(*sarama.Config) error + Close() error } diff --git a/common/authentication/aws/client.go b/common/authentication/aws/client.go index 8d0e9de20b..25b479272a 100644 --- a/common/authentication/aws/client.go +++ b/common/authentication/aws/client.go @@ -17,7 +17,11 @@ import ( "context" "errors" "sync" + "time" + "github.com/IBM/sarama" + "github.com/aws/aws-msk-iam-sasl-signer-go/signer" + aws2 "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" @@ -207,3 +211,40 @@ func (c *KinesisClients) WorkerCfg(ctx context.Context, stream, consumer, mode s func (c *SesClients) New(session *session.Session) { c.Ses = ses.New(session, session.Config) } + +// Kafka specific +type mskTokenProvider struct { + generateTokenTimeout time.Duration + accessKey string + secretKey string + sessionToken string + awsIamRoleArn string + awsStsSessionName string + region string +} + +func (m *mskTokenProvider) Token() (*sarama.AccessToken, error) { + // this function can't use the context passed on Init because that context would be cancelled right after Init + ctx, cancel := context.WithTimeout(context.Background(), m.generateTokenTimeout) + defer cancel() + + switch { + // we must first check if we are using the assume role auth profile + case m.awsIamRoleArn != "" && m.awsStsSessionName != "": + token, _, err := signer.GenerateAuthTokenFromRole(ctx, m.region, m.awsIamRoleArn, m.awsStsSessionName) + return &sarama.AccessToken{Token: token}, err + case m.accessKey != "" && m.secretKey != "": + token, _, err := signer.GenerateAuthTokenFromCredentialsProvider(ctx, m.region, aws2.CredentialsProviderFunc(func(ctx context.Context) (aws2.Credentials, error) { + return aws2.Credentials{ + AccessKeyID: m.accessKey, + SecretAccessKey: m.secretKey, + SessionToken: m.sessionToken, + }, nil + })) + return &sarama.AccessToken{Token: token}, err + + default: // load default aws creds + token, _, err := signer.GenerateAuthToken(ctx, m.region) + return &sarama.AccessToken{Token: token}, err + } +} diff --git a/common/authentication/aws/static.go b/common/authentication/aws/static.go index a66ef86e1e..4b1dad390f 100644 --- a/common/authentication/aws/static.go +++ b/common/authentication/aws/static.go @@ -17,7 +17,9 @@ import ( "context" "fmt" "sync" + "time" + "github.com/IBM/sarama" awsv2 "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" v2creds "github.com/aws/aws-sdk-go-v2/credentials" @@ -39,6 +41,9 @@ type StaticAuth struct { secretKey *string sessionToken *string + assumeRoleARN *string + sessionName *string + session *session.Session cfg *aws.Config clients *Clients @@ -46,12 +51,15 @@ type StaticAuth struct { func newStaticIAM(_ context.Context, opts Options, cfg *aws.Config) (*StaticAuth, error) { auth := &StaticAuth{ - logger: opts.Logger, - region: &opts.Region, - endpoint: &opts.Endpoint, - accessKey: &opts.AccessKey, - secretKey: &opts.SecretKey, - sessionToken: &opts.SessionToken, + logger: opts.Logger, + region: &opts.Region, + endpoint: &opts.Endpoint, + accessKey: &opts.AccessKey, + secretKey: &opts.SecretKey, + sessionToken: &opts.SessionToken, + assumeRoleARN: &opts.AssumeRoleARN, + sessionName: &opts.SessionName, + cfg: func() *aws.Config { // if nil is passed or it's just a default cfg, // then we use the options to build the aws cfg. @@ -206,6 +214,62 @@ func (a *StaticAuth) Ses() *SesClients { return a.clients.ses } +func (a *StaticAuth) UpdateKafka(config *sarama.Config) error { + a.mu.Lock() + defer a.mu.Unlock() + + var ( + accessKey, secretKey, sessionToken string + ) + creds, err := a.session.Config.Credentials.Get() + if err != nil { + return fmt.Errorf("failed to get credentials from session: %w", err) + } + if a.accessKey == nil { + accessKey = creds.AccessKeyID + } else { + accessKey = *a.accessKey + } + + if a.secretKey == nil { + secretKey = creds.SecretAccessKey + } else { + secretKey = *a.secretKey + } + + if a.sessionToken == nil { + secretKey = creds.SessionToken + } else { + sessionToken = *a.sessionToken + } + + config.Net.SASL.Enable = true + config.Net.SASL.Mechanism = sarama.SASLTypeOAuth + tokenProvider := mskTokenProvider{ + generateTokenTimeout: 10 * time.Second, + region: *a.region, + accessKey: accessKey, + secretKey: secretKey, + sessionToken: sessionToken, + } + + if a.assumeRoleARN != nil { + tokenProvider.awsIamRoleArn = *a.assumeRoleARN + } + + if a.sessionName != nil { + tokenProvider.awsStsSessionName = *a.sessionName + } + + config.Net.SASL.TokenProvider = &tokenProvider + + _, err = config.Net.SASL.TokenProvider.Token() + if err != nil { + return fmt.Errorf("error validating iam credentials %v", err) + } + return nil +} + func (a *StaticAuth) getTokenClient() (*session.Session, error) { var awsConfig *aws.Config if a.cfg == nil { diff --git a/common/authentication/aws/x509.go b/common/authentication/aws/x509.go index cb1bafdeb3..ccd67bb056 100644 --- a/common/authentication/aws/x509.go +++ b/common/authentication/aws/x509.go @@ -25,6 +25,7 @@ import ( "sync" "time" + "github.com/IBM/sarama" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/arn" "github.com/aws/aws-sdk-go/aws/credentials" @@ -65,6 +66,11 @@ type x509 struct { trustProfileArn *string trustAnchorArn *string assumeRoleArn *string + sessionName *string + + accessKey *string + secretKey *string + sessionToken *string } func newX509(ctx context.Context, opts Options, cfg *aws.Config) (*x509, error) { @@ -274,6 +280,27 @@ func (a *x509) Ses() *SesClients { return a.clients.ses } +func (a *x509) UpdateKafka(config *sarama.Config) error { + a.mu.Lock() + defer a.mu.Unlock() + + config.Net.SASL.Enable = true + config.Net.SASL.Mechanism = sarama.SASLTypeOAuth + config.Net.SASL.TokenProvider = &mskTokenProvider{ + generateTokenTimeout: 10 * time.Second, + region: *a.region, + accessKey: *a.accessKey, + secretKey: *a.secretKey, + sessionToken: *a.sessionToken, + } + + _, err := config.Net.SASL.TokenProvider.Token() + if err != nil { + return fmt.Errorf("error validating iam credentials %v", err) + } + return nil +} + func (a *x509) initializeTrustAnchors() error { var ( trustAnchor arn.ARN @@ -401,6 +428,9 @@ func (a *x509) createOrRefreshSession(ctx context.Context) (*session.Session, er if sess == nil { return nil, errors.New("session is nil") } + a.accessKey = accessKey + a.secretKey = secretKey + a.sessionToken = sessionToken return sess, nil } diff --git a/common/component/kafka/auth.go b/common/component/kafka/auth.go index bd61690c05..ea8cc43fac 100644 --- a/common/component/kafka/auth.go +++ b/common/component/kafka/auth.go @@ -14,16 +14,12 @@ limitations under the License. package kafka import ( - "context" "crypto/tls" "crypto/x509" "errors" "fmt" - "time" "github.com/IBM/sarama" - "github.com/aws/aws-msk-iam-sasl-signer-go/signer" - aws2 "github.com/aws/aws-sdk-go-v2/aws" ) func updatePasswordAuthInfo(config *sarama.Config, metadata *KafkaMetadata, saslUsername, saslPassword string) { @@ -92,58 +88,3 @@ func updateOidcAuthInfo(config *sarama.Config, metadata *KafkaMetadata) error { return nil } - -func updateAWSIAMAuthInfo(ctx context.Context, config *sarama.Config, metadata *KafkaMetadata) error { - config.Net.SASL.Enable = true - config.Net.SASL.Mechanism = sarama.SASLTypeOAuth - config.Net.SASL.TokenProvider = &mskAccessTokenProvider{ - ctx: ctx, - generateTokenTimeout: 10 * time.Second, - region: metadata.AWSRegion, - accessKey: metadata.AWSAccessKey, - secretKey: metadata.AWSSecretKey, - sessionToken: metadata.AWSSessionToken, - awsIamRoleArn: metadata.AWSIamRoleArn, - awsStsSessionName: metadata.AWSStsSessionName, - } - - _, err := config.Net.SASL.TokenProvider.Token() - if err != nil { - return fmt.Errorf("error validating iam credentials %v", err) - } - return nil -} - -type mskAccessTokenProvider struct { - ctx context.Context - generateTokenTimeout time.Duration - accessKey string - secretKey string - sessionToken string - awsIamRoleArn string - awsStsSessionName string - region string -} - -func (m *mskAccessTokenProvider) Token() (*sarama.AccessToken, error) { - // this function can't use the context passed on Init because that context would be cancelled right after Init - ctx, cancel := context.WithTimeout(m.ctx, m.generateTokenTimeout) - defer cancel() - - if m.accessKey != "" && m.secretKey != "" { - token, _, err := signer.GenerateAuthTokenFromCredentialsProvider(ctx, m.region, aws2.CredentialsProviderFunc(func(ctx context.Context) (aws2.Credentials, error) { - return aws2.Credentials{ - AccessKeyID: m.accessKey, - SecretAccessKey: m.secretKey, - SessionToken: m.sessionToken, - }, nil - })) - return &sarama.AccessToken{Token: token}, err - } else if m.awsIamRoleArn != "" { - token, _, err := signer.GenerateAuthTokenFromRole(ctx, m.region, m.awsIamRoleArn, m.awsStsSessionName) - return &sarama.AccessToken{Token: token}, err - } - - token, _, err := signer.GenerateAuthToken(ctx, m.region) - return &sarama.AccessToken{Token: token}, err -} diff --git a/common/component/kafka/kafka.go b/common/component/kafka/kafka.go index 2f3b67be0d..946f76da02 100644 --- a/common/component/kafka/kafka.go +++ b/common/component/kafka/kafka.go @@ -28,6 +28,7 @@ import ( "github.com/linkedin/goavro/v2" "github.com/riferrei/srclient" + awsAuth "github.com/dapr/components-contrib/common/authentication/aws" "github.com/dapr/components-contrib/pubsub" "github.com/dapr/kit/logger" kitmd "github.com/dapr/kit/metadata" @@ -36,16 +37,17 @@ import ( // Kafka allows reading/writing to a Kafka consumer group. type Kafka struct { - producer sarama.SyncProducer - consumerGroup string - brokers []string - logger logger.Logger - authType string - saslUsername string - saslPassword string - initialOffset int64 - config *sarama.Config - escapeHeaders bool + producer sarama.SyncProducer + consumerGroup string + brokers []string + logger logger.Logger + authType string + saslUsername string + saslPassword string + initialOffset int64 + config *sarama.Config + escapeHeaders bool + awsAuthProvider awsAuth.Provider cg sarama.ConsumerGroup subscribeTopics TopicHandlerConfig @@ -182,10 +184,28 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error { // already handled in updateTLSConfig case awsIAMAuthType: k.logger.Info("Configuring AWS IAM authentication") - err = updateAWSIAMAuthInfo(k.internalContext, config, meta) + region, accessKey, secretKey, assumeRole, sessionName, validateErr := k.ValidateAWS(metadata) + if validateErr != nil { + return fmt.Errorf("failed to validate AWS IAM authentication fields: %w", validateErr) + } + opts := awsAuth.Options{ + Logger: k.logger, + Properties: metadata, + Region: region, + Endpoint: "", + AccessKey: accessKey, + SecretKey: secretKey, + SessionToken: "", + AssumeRoleARN: assumeRole, + SessionName: sessionName, + } + var provider awsAuth.Provider + provider, err = awsAuth.NewProvider(ctx, opts, awsAuth.GetConfig(opts)) if err != nil { return err } + k.awsAuthProvider = provider + k.awsAuthProvider.UpdateKafka(config) } k.config = config @@ -229,6 +249,43 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error { return nil } +func (k *Kafka) ValidateAWS(metadata map[string]string) (string, string, string, string, string, error) { + awsRegion, _ := metadata["awsRegion"] + if awsRegion == "" { + return "", "", "", "", "", errors.New("metadata property AWSRegion is missing") + } + + // Note: access key and secret keys can be optional + // in the event users are leveraging the credential files for an access token. + awsAccessKey, _ := metadata["awsAccessKey"] + // This is needed as we remove the awsAccessKey field to use the builtin AWS profile 'accessKey' field instead. + accessKey, _ := metadata["accessKey"] + if awsAccessKey == "" || accessKey != "" { + awsAccessKey = accessKey + } + awsSecretKey, _ := metadata["awsSecretKey"] + // This is needed as we remove the awsSecretKey field to use the builtin AWS profile 'secretKey' field instead. + secretKey, _ := metadata["secretKey"] + if awsSecretKey == "" || secretKey != "" { + awsSecretKey = secretKey + } + + awsRole, _ := metadata["awsIamRoleArn"] + // This is needed as we remove the awsIamRoleArn field to use the builtin AWS profile 'assumeRoleArn' field instead. + role, _ := metadata["assumeRoleArn"] + if awsRole == "" || role != "" { + awsRole = role + } + + awsSession, _ := metadata["awsStsSessionName"] + // This is needed as we remove the awsStsSessionName field to use the builtin AWS profile 'sessionName' field instead. + session, _ := metadata["sessionName"] + if awsSession == "" || session != "" { + awsSession = session + } + return awsRegion, awsAccessKey, awsSecretKey, awsRole, awsSession, nil +} + func (k *Kafka) Close() error { defer k.wg.Wait() defer k.consumerWG.Wait() diff --git a/pubsub/kafka/metadata.yaml b/pubsub/kafka/metadata.yaml index b6536c2b43..394c93e4e3 100644 --- a/pubsub/kafka/metadata.yaml +++ b/pubsub/kafka/metadata.yaml @@ -8,6 +8,59 @@ title: "Apache Kafka" urls: - title: Reference url: https://docs.dapr.io/reference/components-reference/supported-pubsub/setup-apache-kafka/ +builtinAuthenticationProfiles: + - name: "aws" + metadata: + - name: authType + type: string + required: true + description: | + Authentication type. + This must be set to "awsiam" for this authentication profile. + example: '"awsiam"' + allowedValues: + - "awsiam" + - name: awsAccessKey + type: string + required: false + description: | + Deprecated as of Dapr 1.17. Use 'accessKey' instead. + If both fields are set, then 'accessKey' value will be used. + AWS access key associated with an IAM account. + example: '"AKIAIOSFODNN7EXAMPLE"' + - name: awsSecretKey + type: string + required: false + sensitive: true + description: | + Deprecated as of Dapr 1.17. Use 'secretKey' instead. + If both fields are set, then 'secretKey' value will be used. + The secret key associated with the access key. + example: '"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"' + - name: awsSessionToken + type: string + sensitive: true + description: | + Deprecated as of Dapr 1.17. Use 'sessionToken' instead. + If both fields are set, then 'sessionToken' value will be used. + AWS session token to use. A session token is only required if you are using temporary security credentials. + example: '"TOKEN"' + - name: awsIamRoleArn + type: string + required: false + description: | + Deprecated as of Dapr 1.17. Use 'assumeRoleArn' instead. + If both fields are set, then 'assumeRoleArn' value will be used. + IAM role that has access to MSK. This is another option to authenticate with MSK aside from the AWS Credentials. + example: '"arn:aws:iam::123456789:role/mskRole"' + - name: awsStsSessionName + type: string + description: | + Deprecated as of Dapr 1.17. Use 'sessionName' instead. + If both fields are set, then 'sessionName' value will be used. + Represents the session name for assuming a role. + example: '"MyAppSession"' + default: '"MSKSASLDefaultSession"' authenticationProfiles: - title: "OIDC Authentication" description: | @@ -133,55 +186,6 @@ authenticationProfiles: example: '"none"' allowedValues: - "none" - - title: "AWS IAM" - description: "Authenticate using AWS IAM credentials or role for AWS MSK" - metadata: - - name: authType - type: string - required: true - description: | - Authentication type. - This must be set to "awsiam" for this authentication profile. - example: '"awsiam"' - allowedValues: - - "awsiam" - - name: awsRegion - type: string - required: true - description: | - The AWS Region where the MSK Kafka broker is deployed to. - example: '"us-east-1"' - - name: awsAccessKey - type: string - required: true - description: | - AWS access key associated with an IAM account. - example: '"AKIAIOSFODNN7EXAMPLE"' - - name: awsSecretKey - type: string - required: true - sensitive: true - description: | - The secret key associated with the access key. - example: '"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"' - - name: awsSessionToken - type: string - sensitive: true - description: | - AWS session token to use. A session token is only required if you are using temporary security credentials. - example: '"TOKEN"' - - name: awsIamRoleArn - type: string - required: true - description: | - IAM role that has access to MSK. This is another option to authenticate with MSK aside from the AWS Credentials. - example: '"arn:aws:iam::123456789:role/mskRole"' - - name: awsStsSessionName - type: string - description: | - Represents the session name for assuming a role. - example: '"MyAppSession"' - default: '"MSKSASLDefaultSession"' metadata: - name: brokers type: string From 25eade83189d77b63cc8f5fb57be54d00218682b Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Mon, 18 Nov 2024 16:12:25 -0600 Subject: [PATCH 02/14] fix: do some cleaning up of things Signed-off-by: Samantha Coyle --- common/component/kafka/kafka.go | 4 ++++ common/component/kafka/metadata.go | 11 ++++------- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/common/component/kafka/kafka.go b/common/component/kafka/kafka.go index 946f76da02..67eb2bafc5 100644 --- a/common/component/kafka/kafka.go +++ b/common/component/kafka/kafka.go @@ -312,6 +312,10 @@ func (k *Kafka) Close() error { if k.cg != nil { errs[1] = k.cg.Close() } + + if k.awsAuthProvider != nil { + errs[2] = k.awsAuthProvider.Close() + } } return errors.Join(errs...) diff --git a/common/component/kafka/metadata.go b/common/component/kafka/metadata.go index c4d0a6bb56..31b11de307 100644 --- a/common/component/kafka/metadata.go +++ b/common/component/kafka/metadata.go @@ -98,13 +98,15 @@ type KafkaMetadata struct { ClientConnectionKeepAliveInterval time.Duration `mapstructure:"clientConnectionKeepAliveInterval"` // aws iam auth profile + // Note: these custom AWS specific fields will become deprecated and deleted in Dapr 1.17. + // This will move Kafka to leverage all of the common AWS options using the builtin AWS profile for authentication. AWSAccessKey string `mapstructure:"awsAccessKey"` AWSSecretKey string `mapstructure:"awsSecretKey"` AWSSessionToken string `mapstructure:"awsSessionToken"` AWSIamRoleArn string `mapstructure:"awsIamRoleArn"` AWSStsSessionName string `mapstructure:"awsStsSessionName"` - AWSRegion string `mapstructure:"awsRegion"` - channelBufferSize int `mapstructure:"-"` + + channelBufferSize int `mapstructure:"-"` consumerFetchMin int32 `mapstructure:"-"` consumerFetchDefault int32 `mapstructure:"-"` @@ -264,11 +266,6 @@ func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error) return nil, errors.New("missing CA certificate property 'caCert' for authType 'certificate'") } k.logger.Debug("Configuring root certificate authentication.") - case awsIAMAuthType: - if m.AWSRegion == "" { - return nil, errors.New("missing AWS region property 'awsRegion' for authType 'awsiam'") - } - k.logger.Debug("Configuring AWS IAM authentication.") default: return nil, errors.New("kafka error: invalid value for 'authType' attribute") } From 24e71e276d4f0f4c250631a6adaa0d9b8a875cf2 Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Mon, 18 Nov 2024 19:18:05 -0600 Subject: [PATCH 03/14] style: make linter happy Signed-off-by: Samantha Coyle --- common/authentication/aws/aws.go | 1 + common/authentication/aws/static.go | 4 +--- common/authentication/aws/x509.go | 1 - common/component/kafka/kafka.go | 2 +- 4 files changed, 3 insertions(+), 5 deletions(-) diff --git a/common/authentication/aws/aws.go b/common/authentication/aws/aws.go index 1b81b92234..d173738acd 100644 --- a/common/authentication/aws/aws.go +++ b/common/authentication/aws/aws.go @@ -83,6 +83,7 @@ func GetConfig(opts Options) *aws.Config { return cfg } +//nolint:interfacebloat type Provider interface { S3() *S3Clients DynamoDB() *DynamoDBClients diff --git a/common/authentication/aws/static.go b/common/authentication/aws/static.go index 4b1dad390f..3482eea397 100644 --- a/common/authentication/aws/static.go +++ b/common/authentication/aws/static.go @@ -218,9 +218,7 @@ func (a *StaticAuth) UpdateKafka(config *sarama.Config) error { a.mu.Lock() defer a.mu.Unlock() - var ( - accessKey, secretKey, sessionToken string - ) + var accessKey, secretKey, sessionToken string creds, err := a.session.Config.Credentials.Get() if err != nil { return fmt.Errorf("failed to get credentials from session: %w", err) diff --git a/common/authentication/aws/x509.go b/common/authentication/aws/x509.go index ccd67bb056..6a562ca8d1 100644 --- a/common/authentication/aws/x509.go +++ b/common/authentication/aws/x509.go @@ -66,7 +66,6 @@ type x509 struct { trustProfileArn *string trustAnchorArn *string assumeRoleArn *string - sessionName *string accessKey *string secretKey *string diff --git a/common/component/kafka/kafka.go b/common/component/kafka/kafka.go index 67eb2bafc5..8830d14888 100644 --- a/common/component/kafka/kafka.go +++ b/common/component/kafka/kafka.go @@ -290,7 +290,7 @@ func (k *Kafka) Close() error { defer k.wg.Wait() defer k.consumerWG.Wait() - errs := make([]error, 2) + errs := make([]error, 3) if k.closed.CompareAndSwap(false, true) { close(k.closeCh) From d6e5da07b77b20f7cd1dc303bdcc956f5140fc7e Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Tue, 19 Nov 2024 15:21:14 -0600 Subject: [PATCH 04/14] fix: address init feedback Signed-off-by: Samantha Coyle --- .../builtin-authentication-profiles.yaml | 14 ++-- bindings/aws/sns/sns.go | 1 + bindings/kafka/metadata.yaml | 26 ++++-- common/authentication/aws/aws.go | 26 +++++- common/component/kafka/kafka.go | 66 ++++++--------- common/component/kafka/kafka_test.go | 81 +++++++++++++++++++ common/component/kafka/metadata.go | 9 --- pubsub/aws/snssqs/metadata.go | 1 + pubsub/kafka/metadata.yaml | 26 ++++-- state/aws/dynamodb/dynamodb.go | 1 + 10 files changed, 183 insertions(+), 68 deletions(-) diff --git a/.build-tools/builtin-authentication-profiles.yaml b/.build-tools/builtin-authentication-profiles.yaml index f07a12abcd..f46d324192 100644 --- a/.build-tools/builtin-authentication-profiles.yaml +++ b/.build-tools/builtin-authentication-profiles.yaml @@ -3,7 +3,7 @@ aws: description: | Authenticate using an Access Key ID and Secret Access Key included in the metadata metadata: - - name: awsRegion + - name: region type: string required: true description: | @@ -29,14 +29,12 @@ aws: description: | Assume a specific IAM role. Note: This is only supported on Kafka and PostgreSQL components. metadata: - - name: sessionToken - required: false - sensitive: true - description: | - AWS session token to use. A session token is only required if you are using - temporary security credentials. - example: '"TOKEN"' + - name: region type: string + required: true + description: | + The AWS Region where the AWS resource is deployed to. + example: '"us-east-1"' - name: assumeRoleArn type: string required: false diff --git a/bindings/aws/sns/sns.go b/bindings/aws/sns/sns.go index 55e3ccefa5..18fc8fbaf9 100644 --- a/bindings/aws/sns/sns.go +++ b/bindings/aws/sns/sns.go @@ -43,6 +43,7 @@ type snsMetadata struct { SessionToken string `json:"sessionToken" mapstructure:"sessionToken" mdignore:"true"` TopicArn string `json:"topicArn"` + // TODO: in Dapr 1.17 rm the alias on region as we remove the aws prefix on these fields Region string `json:"region" mapstructure:"region" mapstructurealiases:"awsRegion" mdignore:"true"` Endpoint string `json:"endpoint"` } diff --git a/bindings/kafka/metadata.yaml b/bindings/kafka/metadata.yaml index bb52b69f18..435debf90e 100644 --- a/bindings/kafka/metadata.yaml +++ b/bindings/kafka/metadata.yaml @@ -14,6 +14,9 @@ binding: operations: - name: create description: "Publish a new message in the topic." +# This auth profile has duplicate fields intentionally as we maintain backwards compatibility, +# but also move Kafka to utilize the noramlized AWS fields in the builtin auth profiles. +# TODO: rm the duplicate aws prefixed fields in Dapr 1.17. builtinAuthenticationProfiles: - name: "aws" metadata: @@ -26,11 +29,20 @@ builtinAuthenticationProfiles: example: '"awsiam"' allowedValues: - "awsiam" + - name: awsRegion + type: string + required: false + description: | + This maintains backwards compatibility with existing fields. + It will be deprecated as of Dapr 1.17. Use 'region' instead. + The AWS Region where the AWS Relational Database Service is deployed to. + example: '"us-east-1"' - name: awsAccessKey type: string required: false description: | - Deprecated as of Dapr 1.17. Use 'accessKey' instead. + This maintains backwards compatibility with existing fields. + It will be deprecated as of Dapr 1.17. Use 'accessKey' instead. If both fields are set, then 'accessKey' value will be used. AWS access key associated with an IAM account. example: '"AKIAIOSFODNN7EXAMPLE"' @@ -39,7 +51,8 @@ builtinAuthenticationProfiles: required: false sensitive: true description: | - Deprecated as of Dapr 1.17. Use 'secretKey' instead. + This maintains backwards compatibility with existing fields. + It will be deprecated as of Dapr 1.17. Use 'secretKey' instead. If both fields are set, then 'secretKey' value will be used. The secret key associated with the access key. example: '"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"' @@ -47,7 +60,8 @@ builtinAuthenticationProfiles: type: string sensitive: true description: | - Deprecated as of Dapr 1.17. Use 'sessionToken' instead. + This maintains backwards compatibility with existing fields. + It will be deprecated as of Dapr 1.17. Use 'sessionToken' instead. If both fields are set, then 'sessionToken' value will be used. AWS session token to use. A session token is only required if you are using temporary security credentials. example: '"TOKEN"' @@ -55,14 +69,16 @@ builtinAuthenticationProfiles: type: string required: false description: | - Deprecated as of Dapr 1.17. Use 'assumeRoleArn' instead. + This maintains backwards compatibility with existing fields. + It will be deprecated as of Dapr 1.17. Use 'assumeRoleArn' instead. If both fields are set, then 'assumeRoleArn' value will be used. IAM role that has access to MSK. This is another option to authenticate with MSK aside from the AWS Credentials. example: '"arn:aws:iam::123456789:role/mskRole"' - name: awsStsSessionName type: string description: | - Deprecated as of Dapr 1.17. Use 'sessionName' instead. + This maintains backwards compatibility with existing fields. + It will be deprecated as of Dapr 1.17. Use 'sessionName' instead. If both fields are set, then 'sessionName' value will be used. Represents the session name for assuming a role. example: '"MyAppSession"' diff --git a/common/authentication/aws/aws.go b/common/authentication/aws/aws.go index d173738acd..53af7c1e92 100644 --- a/common/authentication/aws/aws.go +++ b/common/authentication/aws/aws.go @@ -45,6 +45,17 @@ type AWSIAM struct { AWSRegion string `json:"awsRegion" mapstructure:"awsRegion"` } +// TODO: Delete in Dapr 1.17 so we can move all IAM fields to use the defaults of: +// accessKey and secretKey and region as noted in the docs, and Options struct above. +type DeprecatedKafkaIAM struct { + Region string `json:"awsRegion" mapstructure:"awsRegion"` + AccessKey string `json:"awsAccessKey" mapstructure:"awsAccessKey"` + SecretKey string `json:"awsSecretKey" mapstructure:"awsSecretKey"` + SessionToken string `json:"awsSessionToken" mapstructure:"awsSessionToken"` + IamRoleArn string `json:"awsIamRoleArn" mapstructure:"awsIamRoleArn"` + StsSessionName string `json:"awsStsSessionName" mapstructure:"awsStsSessionName"` +} + type AWSIAMAuthOptions struct { PoolConfig *pgxpool.Config `json:"poolConfig" mapstructure:"poolConfig"` ConnectionString string `json:"connectionString" mapstructure:"connectionString"` @@ -60,7 +71,9 @@ type Options struct { PoolConfig *pgxpool.Config `json:"poolConfig" mapstructure:"poolConfig"` ConnectionString string `json:"connectionString" mapstructure:"connectionString"` - Region string `json:"region" mapstructure:"region"` + // TODO: in Dapr 1.17 rm the alias on regions as we rm the aws prefixed one. + // Docs have it just as region, but most metadata fields show the aws prefix... + Region string `json:"region" mapstructure:"region" mapstructurealiases:"awsRegion"` AccessKey string `json:"accessKey" mapstructure:"accessKey"` SecretKey string `json:"secretKey" mapstructure:"secretKey"` SessionName string `mapstructure:"sessionName"` @@ -185,3 +198,14 @@ func (opts *Options) InitiateAWSIAMAuth() error { return nil } + +// Coalesce is a helper function to return the first non-empty string from the inputs +// This helps us to migrate away from the deprecated duplicate aws auth profile metadata fields in Dapr 1.17. +func Coalesce(values ...string) string { + for _, v := range values { + if v != "" { + return v + } + } + return "" +} diff --git a/common/component/kafka/kafka.go b/common/component/kafka/kafka.go index 8830d14888..f63106cd2a 100644 --- a/common/component/kafka/kafka.go +++ b/common/component/kafka/kafka.go @@ -184,20 +184,20 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error { // already handled in updateTLSConfig case awsIAMAuthType: k.logger.Info("Configuring AWS IAM authentication") - region, accessKey, secretKey, assumeRole, sessionName, validateErr := k.ValidateAWS(metadata) + kafkaIAM, validateErr := k.ValidateAWS(metadata) if validateErr != nil { return fmt.Errorf("failed to validate AWS IAM authentication fields: %w", validateErr) } opts := awsAuth.Options{ Logger: k.logger, Properties: metadata, - Region: region, + Region: kafkaIAM.Region, Endpoint: "", - AccessKey: accessKey, - SecretKey: secretKey, - SessionToken: "", - AssumeRoleARN: assumeRole, - SessionName: sessionName, + AccessKey: kafkaIAM.AccessKey, + SecretKey: kafkaIAM.SecretKey, + SessionToken: kafkaIAM.SessionToken, + AssumeRoleARN: kafkaIAM.IamRoleArn, + SessionName: kafkaIAM.StsSessionName, } var provider awsAuth.Provider provider, err = awsAuth.NewProvider(ctx, opts, awsAuth.GetConfig(opts)) @@ -249,41 +249,27 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error { return nil } -func (k *Kafka) ValidateAWS(metadata map[string]string) (string, string, string, string, string, error) { - awsRegion, _ := metadata["awsRegion"] - if awsRegion == "" { - return "", "", "", "", "", errors.New("metadata property AWSRegion is missing") +func (k *Kafka) ValidateAWS(metadata map[string]string) (*awsAuth.DeprecatedKafkaIAM, error) { + // This is needed as we remove the aws prefixed fields to use the builtin AWS profile fields instead. + region := awsAuth.Coalesce(metadata["region"], metadata["awsRegion"]) + accessKey := awsAuth.Coalesce(metadata["accessKey"], metadata["awsAccessKey"]) + secretKey := awsAuth.Coalesce(metadata["secretKey"], metadata["awsSecretKey"]) + role := awsAuth.Coalesce(metadata["assumeRoleArn"], metadata["awsIamRoleArn"]) + session := awsAuth.Coalesce(metadata["sessionName"], metadata["awsStsSessionName"]) + token := awsAuth.Coalesce(metadata["sessionToken"], metadata["awsSessionToken"]) + + if region == "" { + return nil, errors.New("metadata property AWSRegion is missing") } - // Note: access key and secret keys can be optional - // in the event users are leveraging the credential files for an access token. - awsAccessKey, _ := metadata["awsAccessKey"] - // This is needed as we remove the awsAccessKey field to use the builtin AWS profile 'accessKey' field instead. - accessKey, _ := metadata["accessKey"] - if awsAccessKey == "" || accessKey != "" { - awsAccessKey = accessKey - } - awsSecretKey, _ := metadata["awsSecretKey"] - // This is needed as we remove the awsSecretKey field to use the builtin AWS profile 'secretKey' field instead. - secretKey, _ := metadata["secretKey"] - if awsSecretKey == "" || secretKey != "" { - awsSecretKey = secretKey - } - - awsRole, _ := metadata["awsIamRoleArn"] - // This is needed as we remove the awsIamRoleArn field to use the builtin AWS profile 'assumeRoleArn' field instead. - role, _ := metadata["assumeRoleArn"] - if awsRole == "" || role != "" { - awsRole = role - } - - awsSession, _ := metadata["awsStsSessionName"] - // This is needed as we remove the awsStsSessionName field to use the builtin AWS profile 'sessionName' field instead. - session, _ := metadata["sessionName"] - if awsSession == "" || session != "" { - awsSession = session - } - return awsRegion, awsAccessKey, awsSecretKey, awsRole, awsSession, nil + return &awsAuth.DeprecatedKafkaIAM{ + Region: region, + AccessKey: accessKey, + SecretKey: secretKey, + IamRoleArn: role, + StsSessionName: session, + SessionToken: token, + }, nil } func (k *Kafka) Close() error { diff --git a/common/component/kafka/kafka_test.go b/common/component/kafka/kafka_test.go index 3fbe8c7a2e..ea6834ba66 100644 --- a/common/component/kafka/kafka_test.go +++ b/common/component/kafka/kafka_test.go @@ -3,6 +3,7 @@ package kafka import ( "encoding/binary" "encoding/json" + "errors" "testing" "time" @@ -10,8 +11,10 @@ import ( gomock "github.com/golang/mock/gomock" "github.com/linkedin/goavro/v2" "github.com/riferrei/srclient" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + awsAuth "github.com/dapr/components-contrib/common/authentication/aws" mock_srclient "github.com/dapr/components-contrib/common/component/kafka/mocks" ) @@ -344,3 +347,81 @@ func TestLatestSchemaCaching(t *testing.T) { require.NoError(t, err) }) } + +func TestValidateAWS(t *testing.T) { + tests := []struct { + name string + metadata map[string]string + expected *awsAuth.DeprecatedKafkaIAM + err error + }{ + { + name: "Valid metadata with all fields without aws prefix", + metadata: map[string]string{ + "region": "us-east-1", + "accessKey": "testAccessKey", + "secretKey": "testSecretKey", + "assumeRoleArn": "testRoleArn", + "sessionName": "testSessionName", + "sessionToken": "testSessionToken", + }, + expected: &awsAuth.DeprecatedKafkaIAM{ + Region: "us-east-1", + AccessKey: "testAccessKey", + SecretKey: "testSecretKey", + IamRoleArn: "testRoleArn", + StsSessionName: "testSessionName", + SessionToken: "testSessionToken", + }, + err: nil, + }, + { + name: "Fallback to aws-prefixed fields with aws prefix", + metadata: map[string]string{ + "awsRegion": "us-west-2", + "awsAccessKey": "awsAccessKey", + "awsSecretKey": "awsSecretKey", + "awsIamRoleArn": "awsRoleArn", + "awsStsSessionName": "awsSessionName", + "awsSessionToken": "awsSessionToken", + }, + expected: &awsAuth.DeprecatedKafkaIAM{ + Region: "us-west-2", + AccessKey: "awsAccessKey", + SecretKey: "awsSecretKey", + IamRoleArn: "awsRoleArn", + StsSessionName: "awsSessionName", + SessionToken: "awsSessionToken", + }, + err: nil, + }, + { + name: "Missing region field", + metadata: map[string]string{ + "accessKey": "key", + "secretKey": "secret", + }, + expected: nil, + err: errors.New("metadata property AWSRegion is missing"), + }, + { + name: "Empty metadata", + metadata: map[string]string{}, + expected: nil, + err: errors.New("metadata property AWSRegion is missing"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + k := &Kafka{} + result, err := k.ValidateAWS(tt.metadata) + if tt.err != nil { + require.EqualError(t, err, tt.err.Error()) + } else { + require.NoError(t, err) + } + assert.Equal(t, result, tt.expected) + }) + } +} diff --git a/common/component/kafka/metadata.go b/common/component/kafka/metadata.go index 31b11de307..c911e0cf2c 100644 --- a/common/component/kafka/metadata.go +++ b/common/component/kafka/metadata.go @@ -97,15 +97,6 @@ type KafkaMetadata struct { ClientConnectionTopicMetadataRefreshInterval time.Duration `mapstructure:"clientConnectionTopicMetadataRefreshInterval"` ClientConnectionKeepAliveInterval time.Duration `mapstructure:"clientConnectionKeepAliveInterval"` - // aws iam auth profile - // Note: these custom AWS specific fields will become deprecated and deleted in Dapr 1.17. - // This will move Kafka to leverage all of the common AWS options using the builtin AWS profile for authentication. - AWSAccessKey string `mapstructure:"awsAccessKey"` - AWSSecretKey string `mapstructure:"awsSecretKey"` - AWSSessionToken string `mapstructure:"awsSessionToken"` - AWSIamRoleArn string `mapstructure:"awsIamRoleArn"` - AWSStsSessionName string `mapstructure:"awsStsSessionName"` - channelBufferSize int `mapstructure:"-"` consumerFetchMin int32 `mapstructure:"-"` diff --git a/pubsub/aws/snssqs/metadata.go b/pubsub/aws/snssqs/metadata.go index 4b469106b7..d3819034bb 100644 --- a/pubsub/aws/snssqs/metadata.go +++ b/pubsub/aws/snssqs/metadata.go @@ -22,6 +22,7 @@ type snsSqsMetadata struct { // aws endpoint for the component to use. Endpoint string `mapstructure:"endpoint"` // aws region in which SNS/SQS should create resources. + // TODO: rm the alias on region in Dapr 1.17. Region string `json:"region" mapstructure:"region" mapstructurealiases:"awsRegion" mdignore:"true"` // aws partition in which SNS/SQS should create resources. internalPartition string `mapstructure:"-"` diff --git a/pubsub/kafka/metadata.yaml b/pubsub/kafka/metadata.yaml index 394c93e4e3..9fd66e9444 100644 --- a/pubsub/kafka/metadata.yaml +++ b/pubsub/kafka/metadata.yaml @@ -8,6 +8,9 @@ title: "Apache Kafka" urls: - title: Reference url: https://docs.dapr.io/reference/components-reference/supported-pubsub/setup-apache-kafka/ +# This auth profile has duplicate fields intentionally as we maintain backwards compatibility, +# but also move Kafka to utilize the noramlized AWS fields in the builtin auth profiles. +# TODO: rm the duplicate aws prefixed fields in Dapr 1.17. builtinAuthenticationProfiles: - name: "aws" metadata: @@ -20,11 +23,20 @@ builtinAuthenticationProfiles: example: '"awsiam"' allowedValues: - "awsiam" + - name: awsRegion + type: string + required: false + description: | + This maintains backwards compatibility with existing fields. + It will be deprecated as of Dapr 1.17. Use 'region' instead. + The AWS Region where the AWS Relational Database Service is deployed to. + example: '"us-east-1"' - name: awsAccessKey type: string required: false description: | - Deprecated as of Dapr 1.17. Use 'accessKey' instead. + This maintains backwards compatibility with existing fields. + It will be deprecated as of Dapr 1.17. Use 'accessKey' instead. If both fields are set, then 'accessKey' value will be used. AWS access key associated with an IAM account. example: '"AKIAIOSFODNN7EXAMPLE"' @@ -33,7 +45,8 @@ builtinAuthenticationProfiles: required: false sensitive: true description: | - Deprecated as of Dapr 1.17. Use 'secretKey' instead. + This maintains backwards compatibility with existing fields. + It will be deprecated as of Dapr 1.17. Use 'secretKey' instead. If both fields are set, then 'secretKey' value will be used. The secret key associated with the access key. example: '"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"' @@ -41,7 +54,8 @@ builtinAuthenticationProfiles: type: string sensitive: true description: | - Deprecated as of Dapr 1.17. Use 'sessionToken' instead. + This maintains backwards compatibility with existing fields. + It will be deprecated as of Dapr 1.17. Use 'sessionToken' instead. If both fields are set, then 'sessionToken' value will be used. AWS session token to use. A session token is only required if you are using temporary security credentials. example: '"TOKEN"' @@ -49,14 +63,16 @@ builtinAuthenticationProfiles: type: string required: false description: | - Deprecated as of Dapr 1.17. Use 'assumeRoleArn' instead. + This maintains backwards compatibility with existing fields. + It will be deprecated as of Dapr 1.17. Use 'assumeRoleArn' instead. If both fields are set, then 'assumeRoleArn' value will be used. IAM role that has access to MSK. This is another option to authenticate with MSK aside from the AWS Credentials. example: '"arn:aws:iam::123456789:role/mskRole"' - name: awsStsSessionName type: string description: | - Deprecated as of Dapr 1.17. Use 'sessionName' instead. + This maintains backwards compatibility with existing fields. + It will be deprecated as of Dapr 1.17. Use 'sessionName' instead. If both fields are set, then 'sessionName' value will be used. Represents the session name for assuming a role. example: '"MyAppSession"' diff --git a/state/aws/dynamodb/dynamodb.go b/state/aws/dynamodb/dynamodb.go index ae4ba7c5e9..289fa00e8c 100644 --- a/state/aws/dynamodb/dynamodb.go +++ b/state/aws/dynamodb/dynamodb.go @@ -53,6 +53,7 @@ type dynamoDBMetadata struct { SecretKey string `json:"secretKey" mapstructure:"secretKey" mdignore:"true"` SessionToken string `json:"sessionToken" mapstructure:"sessionToken" mdignore:"true"` + // TODO: rm the alias in Dapr 1.17 Region string `json:"region" mapstructure:"region" mapstructurealiases:"awsRegion" mdignore:"true"` Endpoint string `json:"endpoint"` Table string `json:"table"` From 41f240fbcde4b181e262e0b18e2e797f42623451 Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Tue, 19 Nov 2024 16:47:35 -0600 Subject: [PATCH 05/14] refactor(clients): mv clients to be dynamic based on config updates Signed-off-by: Samantha Coyle --- common/authentication/aws/aws.go | 3 +- common/authentication/aws/client.go | 93 ++++++++++++++++++++++- common/authentication/aws/static.go | 65 ++++++---------- common/authentication/aws/x509.go | 46 +++++------ common/component/kafka/kafka.go | 33 +++----- common/component/kafka/producer.go | 21 +++-- common/component/kafka/producer_test.go | 10 +-- common/component/kafka/subscriber.go | 9 ++- common/component/kafka/subscriber_test.go | 52 ++++++------- 9 files changed, 206 insertions(+), 126 deletions(-) diff --git a/common/authentication/aws/aws.go b/common/authentication/aws/aws.go index 53af7c1e92..0c03731509 100644 --- a/common/authentication/aws/aws.go +++ b/common/authentication/aws/aws.go @@ -20,7 +20,6 @@ import ( "strconv" "time" - "github.com/IBM/sarama" "github.com/aws/aws-sdk-go-v2/config" v2creds "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/feature/rds/auth" @@ -108,7 +107,7 @@ type Provider interface { Kinesis() *KinesisClients Ses() *SesClients - UpdateKafka(*sarama.Config) error + Kafka(KafkaOptions) (*KafkaClients, error) Close() error } diff --git a/common/authentication/aws/client.go b/common/authentication/aws/client.go index 25b479272a..b96ef89298 100644 --- a/common/authentication/aws/client.go +++ b/common/authentication/aws/client.go @@ -16,6 +16,7 @@ package aws import ( "context" "errors" + "fmt" "sync" "time" @@ -55,13 +56,14 @@ type Clients struct { ParameterStore *ParameterStoreClients kinesis *KinesisClients ses *SesClients + kafka *KafkaClients } func newClients() *Clients { return new(Clients) } -func (c *Clients) refresh(session *session.Session) { +func (c *Clients) refresh(session *session.Session) error { c.mu.Lock() defer c.mu.Unlock() switch { @@ -83,7 +85,16 @@ func (c *Clients) refresh(session *session.Session) { c.kinesis.New(session) case c.ses != nil: c.ses.New(session) + case c.kafka != nil: + // Note: we pass in nil for token provider + // as there are no special fields for x509 auth for it. + // Only static auth passes it in. + err := c.kafka.New(session, nil) + if err != nil { + return fmt.Errorf("failed to refresh Kafka AWS IAM Config: %w", err) + } } + return nil } type S3Clients struct { @@ -128,6 +139,16 @@ type SesClients struct { Ses *ses.SES } +type KafkaClients struct { + config *sarama.Config + consumerGroup *string + brokers *[]string + maxMessageBytes *int + + ConsumerGroup sarama.ConsumerGroup + Producer sarama.SyncProducer +} + func (c *S3Clients) New(session *session.Session) { refreshedS3 := s3.New(session, session.Config) c.S3 = refreshedS3 @@ -212,6 +233,63 @@ func (c *SesClients) New(session *session.Session) { c.Ses = ses.New(session, session.Config) } +type KafkaOptions struct { + Config *sarama.Config + ConsumerGroup string + Brokers []string + MaxMessageBytes int +} + +func initKafkaClients(opts KafkaOptions) *KafkaClients { + return &KafkaClients{ + config: opts.Config, + consumerGroup: &opts.ConsumerGroup, + brokers: &opts.Brokers, + maxMessageBytes: &opts.MaxMessageBytes, + } +} + +func (c *KafkaClients) New(session *session.Session, tokenProvider *mskTokenProvider) error { + const timeout = 10 * time.Second + creds, err := session.Config.Credentials.Get() + if err != nil { + return fmt.Errorf("failed to get credentials from session: %w", err) + } + + // fill in token provider common fields across x509 and static auth + if tokenProvider == nil { + tokenProvider = &mskTokenProvider{} + } + tokenProvider.generateTokenTimeout = timeout + tokenProvider.region = *session.Config.Region + tokenProvider.accessKey = creds.AccessKeyID + tokenProvider.secretKey = creds.SecretAccessKey + tokenProvider.sessionToken = creds.SessionToken + + c.config.Net.SASL.Enable = true + c.config.Net.SASL.Mechanism = sarama.SASLTypeOAuth + c.config.Net.SASL.TokenProvider = tokenProvider + + _, err = c.config.Net.SASL.TokenProvider.Token() + if err != nil { + return fmt.Errorf("error validating iam credentials %v", err) + } + + consumerGroup, err := sarama.NewConsumerGroup(*c.brokers, *c.consumerGroup, c.config) + if err != nil { + return err + } + c.ConsumerGroup = consumerGroup + + producer, err := c.getSyncProducer() + if err != nil { + return err + } + c.Producer = producer + + return nil +} + // Kafka specific type mskTokenProvider struct { generateTokenTimeout time.Duration @@ -248,3 +326,16 @@ func (m *mskTokenProvider) Token() (*sarama.AccessToken, error) { return &sarama.AccessToken{Token: token}, err } } + +func (c *KafkaClients) getSyncProducer() (sarama.SyncProducer, error) { + // Add SyncProducer specific properties to copy of base config + c.config.Producer.RequiredAcks = sarama.WaitForAll + c.config.Producer.Retry.Max = 5 + c.config.Producer.Return.Successes = true + + if *c.maxMessageBytes > 0 { + c.config.Producer.MaxMessageBytes = *c.maxMessageBytes + } + + return sarama.NewSyncProducer(*c.brokers, c.config) +} diff --git a/common/authentication/aws/static.go b/common/authentication/aws/static.go index 3482eea397..9137ff9d8b 100644 --- a/common/authentication/aws/static.go +++ b/common/authentication/aws/static.go @@ -15,11 +15,10 @@ package aws import ( "context" + "errors" "fmt" "sync" - "time" - "github.com/IBM/sarama" awsv2 "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" v2creds "github.com/aws/aws-sdk-go-v2/credentials" @@ -214,58 +213,34 @@ func (a *StaticAuth) Ses() *SesClients { return a.clients.ses } -func (a *StaticAuth) UpdateKafka(config *sarama.Config) error { +func (a *StaticAuth) Kafka(opts KafkaOptions) (*KafkaClients, error) { a.mu.Lock() defer a.mu.Unlock() - var accessKey, secretKey, sessionToken string - creds, err := a.session.Config.Credentials.Get() - if err != nil { - return fmt.Errorf("failed to get credentials from session: %w", err) - } - if a.accessKey == nil { - accessKey = creds.AccessKeyID - } else { - accessKey = *a.accessKey - } - - if a.secretKey == nil { - secretKey = creds.SecretAccessKey - } else { - secretKey = *a.secretKey - } - - if a.sessionToken == nil { - secretKey = creds.SessionToken - } else { - sessionToken = *a.sessionToken - } - - config.Net.SASL.Enable = true - config.Net.SASL.Mechanism = sarama.SASLTypeOAuth - tokenProvider := mskTokenProvider{ - generateTokenTimeout: 10 * time.Second, - region: *a.region, - accessKey: accessKey, - secretKey: secretKey, - sessionToken: sessionToken, + // This means we've already set the config in our New function + // to use the SASL token provider. + if a.clients.kafka != nil { + return a.clients.kafka, nil } + a.clients.kafka = initKafkaClients(opts) + // static auth has additional fields we need added, + // so we add those static auth specific fields here, + // and the rest of the token provider fields are added in New() + tokenProvider := mskTokenProvider{} if a.assumeRoleARN != nil { tokenProvider.awsIamRoleArn = *a.assumeRoleARN } - if a.sessionName != nil { tokenProvider.awsStsSessionName = *a.sessionName } - config.Net.SASL.TokenProvider = &tokenProvider - - _, err = config.Net.SASL.TokenProvider.Token() + err := a.clients.kafka.New(a.session, &tokenProvider) if err != nil { - return fmt.Errorf("error validating iam credentials %v", err) + return nil, fmt.Errorf("failed to create AWS IAM Kafka config: %w", err) } - return nil + + return a.clients.kafka, nil } func (a *StaticAuth) getTokenClient() (*session.Session, error) { @@ -307,7 +282,15 @@ func (a *StaticAuth) getTokenClient() (*session.Session, error) { } func (a *StaticAuth) Close() error { - return nil + a.mu.Lock() + defer a.mu.Unlock() + + errs := make([]error, 2) + if a.clients.kafka != nil { + errs[0] = a.clients.kafka.Producer.Close() + errs[1] = a.clients.kafka.ConsumerGroup.Close() + } + return errors.Join(errs...) } func GetConfigV2(accessKey string, secretKey string, sessionToken string, region string, endpoint string) (awsv2.Config, error) { diff --git a/common/authentication/aws/x509.go b/common/authentication/aws/x509.go index 6a562ca8d1..25c615bd42 100644 --- a/common/authentication/aws/x509.go +++ b/common/authentication/aws/x509.go @@ -25,7 +25,6 @@ import ( "sync" "time" - "github.com/IBM/sarama" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/arn" "github.com/aws/aws-sdk-go/aws/credentials" @@ -66,10 +65,6 @@ type x509 struct { trustProfileArn *string trustAnchorArn *string assumeRoleArn *string - - accessKey *string - secretKey *string - sessionToken *string } func newX509(ctx context.Context, opts Options, cfg *aws.Config) (*x509, error) { @@ -123,9 +118,17 @@ func newX509(ctx context.Context, opts Options, cfg *aws.Config) (*x509, error) } func (a *x509) Close() error { + a.mu.Lock() + defer a.mu.Unlock() close(a.closeCh) a.wg.Wait() - return nil + + errs := make([]error, 2) + if a.clients.kafka != nil { + errs[0] = a.clients.kafka.Producer.Close() + errs[1] = a.clients.kafka.ConsumerGroup.Close() + } + return errors.Join(errs...) } func (a *x509) getCertPEM(ctx context.Context) error { @@ -279,25 +282,24 @@ func (a *x509) Ses() *SesClients { return a.clients.ses } -func (a *x509) UpdateKafka(config *sarama.Config) error { +func (a *x509) Kafka(opts KafkaOptions) (*KafkaClients, error) { a.mu.Lock() defer a.mu.Unlock() - config.Net.SASL.Enable = true - config.Net.SASL.Mechanism = sarama.SASLTypeOAuth - config.Net.SASL.TokenProvider = &mskTokenProvider{ - generateTokenTimeout: 10 * time.Second, - region: *a.region, - accessKey: *a.accessKey, - secretKey: *a.secretKey, - sessionToken: *a.sessionToken, + // This means we've already set the config in our New function + // to use the SASL token provider. + if a.clients.kafka != nil { + return a.clients.kafka, nil } - _, err := config.Net.SASL.TokenProvider.Token() + a.clients.kafka = initKafkaClients(opts) + // Note: we pass in nil for token provider, + // as there are no special fields for x509 auth for it. + err := a.clients.kafka.New(a.session, nil) if err != nil { - return fmt.Errorf("error validating iam credentials %v", err) + return nil, fmt.Errorf("failed to create AWS IAM Kafka config: %w", err) } - return nil + return a.clients.kafka, nil } func (a *x509) initializeTrustAnchors() error { @@ -427,9 +429,6 @@ func (a *x509) createOrRefreshSession(ctx context.Context) (*session.Session, er if sess == nil { return nil, errors.New("session is nil") } - a.accessKey = accessKey - a.secretKey = secretKey - a.sessionToken = sessionToken return sess, nil } @@ -464,7 +463,10 @@ func (a *x509) refreshClient() { for { newSession, err := a.createOrRefreshSession(context.Background()) if err == nil { - a.clients.refresh(newSession) + err := a.clients.refresh(newSession) + if err != nil { + a.logger.Errorf("Failed to refresh client, retrying in 5 seconds: %w", err) + } a.logger.Debugf("AWS IAM Roles Anywhere session credentials refreshed successfully") return } diff --git a/common/component/kafka/kafka.go b/common/component/kafka/kafka.go index f63106cd2a..88aef85ac7 100644 --- a/common/component/kafka/kafka.go +++ b/common/component/kafka/kafka.go @@ -37,7 +37,12 @@ import ( // Kafka allows reading/writing to a Kafka consumer group. type Kafka struct { - producer sarama.SyncProducer + // These are used to inject mocked clients for tests + mockConsumerGroup sarama.ConsumerGroup + mockProducer sarama.SyncProducer + clients *clients + + maxMessageBytes int consumerGroup string brokers []string logger logger.Logger @@ -49,7 +54,6 @@ type Kafka struct { escapeHeaders bool awsAuthProvider awsAuth.Provider - cg sarama.ConsumerGroup subscribeTopics TopicHandlerConfig subscribeLock sync.Mutex consumerCancel context.CancelFunc @@ -120,6 +124,7 @@ func NewKafka(logger logger.Logger) *Kafka { } } + // Init does metadata parsing and connection establishment. func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error { upgradedMetadata, err := k.upgradeMetadata(metadata) @@ -205,16 +210,11 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error { return err } k.awsAuthProvider = provider - k.awsAuthProvider.UpdateKafka(config) } k.config = config sarama.Logger = SaramaLogBridge{daprLogger: k.logger} - - k.producer, err = getSyncProducer(*k.config, k.brokers, meta.MaxMessageBytes) - if err != nil { - return err - } + k.maxMessageBytes = meta.MaxMessageBytes // Default retry configuration is used if no // backOff properties are set. @@ -241,11 +241,6 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error { } k.logger.Debug("Kafka message bus initialization complete") - k.cg, err = sarama.NewConsumerGroup(k.brokers, k.consumerGroup, k.config) - if err != nil { - return err - } - return nil } @@ -279,12 +274,6 @@ func (k *Kafka) Close() error { errs := make([]error, 3) if k.closed.CompareAndSwap(false, true) { close(k.closeCh) - - if k.producer != nil { - errs[0] = k.producer.Close() - k.producer = nil - } - if k.internalContext != nil { k.internalContextCancel() } @@ -295,10 +284,10 @@ func (k *Kafka) Close() error { } k.subscribeLock.Unlock() - if k.cg != nil { - errs[1] = k.cg.Close() + if k.clients != nil { + errs[0] = k.clients.producer.Close() + errs[1] = k.clients.consumerGroup.Close() } - if k.awsAuthProvider != nil { errs[2] = k.awsAuthProvider.Close() } diff --git a/common/component/kafka/producer.go b/common/component/kafka/producer.go index 0083cb86a8..33e5dd4fa3 100644 --- a/common/component/kafka/producer.go +++ b/common/component/kafka/producer.go @@ -16,6 +16,7 @@ package kafka import ( "context" "errors" + "fmt" "maps" "github.com/IBM/sarama" @@ -23,7 +24,7 @@ import ( "github.com/dapr/components-contrib/pubsub" ) -func getSyncProducer(config sarama.Config, brokers []string, maxMessageBytes int) (sarama.SyncProducer, error) { +func GetSyncProducer(config sarama.Config, brokers []string, maxMessageBytes int) (sarama.SyncProducer, error) { // Add SyncProducer specific properties to copy of base config config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Max = 5 @@ -43,9 +44,14 @@ func getSyncProducer(config sarama.Config, brokers []string, maxMessageBytes int // Publish message to Kafka cluster. func (k *Kafka) Publish(_ context.Context, topic string, data []byte, metadata map[string]string) error { - if k.producer == nil { + clients, err := k.latestClients() + if err != nil || clients == nil { + return fmt.Errorf("failed to get latest Kafka clients: %w", err) + } + if clients.producer == nil { return errors.New("component is closed") } + // k.logger.Debugf("Publishing topic %v with data: %v", topic, string(data)) k.logger.Debugf("Publishing on topic %v", topic) @@ -73,7 +79,7 @@ func (k *Kafka) Publish(_ context.Context, topic string, data []byte, metadata m }) } - partition, offset, err := k.producer.SendMessage(msg) + partition, offset, err := clients.producer.SendMessage(msg) k.logger.Debugf("Partition: %v, offset: %v", partition, offset) @@ -85,7 +91,12 @@ func (k *Kafka) Publish(_ context.Context, topic string, data []byte, metadata m } func (k *Kafka) BulkPublish(_ context.Context, topic string, entries []pubsub.BulkMessageEntry, metadata map[string]string) (pubsub.BulkPublishResponse, error) { - if k.producer == nil { + clients, err := k.latestClients() + if err != nil || clients == nil { + err := fmt.Errorf("failed to get latest Kafka clients: %w", err) + return pubsub.NewBulkPublishResponse(entries, err), err + } + if clients.producer == nil { err := errors.New("component is closed") return pubsub.NewBulkPublishResponse(entries, err), err } @@ -134,7 +145,7 @@ func (k *Kafka) BulkPublish(_ context.Context, topic string, entries []pubsub.Bu msgs = append(msgs, msg) } - if err := k.producer.SendMessages(msgs); err != nil { + if err := clients.producer.SendMessages(msgs); err != nil { // map the returned error to different entries return k.mapKafkaProducerErrors(err, entries), err } diff --git a/common/component/kafka/producer_test.go b/common/component/kafka/producer_test.go index 3dd1b75a9e..a0769767a0 100644 --- a/common/component/kafka/producer_test.go +++ b/common/component/kafka/producer_test.go @@ -13,17 +13,15 @@ import ( ) func arrangeKafkaWithAssertions(t *testing.T, msgCheckers ...saramamocks.MessageChecker) *Kafka { - cfg := saramamocks.NewTestConfig() - mockProducer := saramamocks.NewSyncProducer(t, cfg) + mockP := saramamocks.NewSyncProducer(t, saramamocks.NewTestConfig()) for _, msgChecker := range msgCheckers { - mockProducer.ExpectSendMessageWithMessageCheckerFunctionAndSucceed(msgChecker) + mockP.ExpectSendMessageWithMessageCheckerFunctionAndSucceed(msgChecker) } return &Kafka{ - producer: mockProducer, - config: cfg, - logger: logger.NewLogger("kafka_test"), + mockProducer: mockP, + logger: logger.NewLogger("kafka_test"), } } diff --git a/common/component/kafka/subscriber.go b/common/component/kafka/subscriber.go index 95bdd5a232..4e1dc7ae8f 100644 --- a/common/component/kafka/subscriber.go +++ b/common/component/kafka/subscriber.go @@ -84,7 +84,14 @@ func (k *Kafka) reloadConsumerGroup() { func (k *Kafka) consume(ctx context.Context, topics []string, consumer *consumer) { for { - err := k.cg.Consume(ctx, topics, consumer) + clients, err := k.latestClients() + if err != nil || clients == nil { + k.logger.Errorf("failed to get latest Kafka clients: %w", err) + } + if clients.consumerGroup == nil { + k.logger.Errorf("component is closed") + } + err = clients.consumerGroup.Consume(ctx, topics, consumer) if errors.Is(err, context.Canceled) { return } diff --git a/common/component/kafka/subscriber_test.go b/common/component/kafka/subscriber_test.go index 57b87cf4f2..dbfc696341 100644 --- a/common/component/kafka/subscriber_test.go +++ b/common/component/kafka/subscriber_test.go @@ -41,11 +41,11 @@ func Test_reloadConsumerGroup(t *testing.T) { }) k := &Kafka{ - logger: logger.NewLogger("test"), - cg: cg, - subscribeTopics: nil, - closeCh: make(chan struct{}), - consumerCancel: cancel, + logger: logger.NewLogger("test"), + mockConsumerGroup: cg, + subscribeTopics: nil, + closeCh: make(chan struct{}), + consumerCancel: cancel, } k.reloadConsumerGroup() @@ -64,11 +64,11 @@ func Test_reloadConsumerGroup(t *testing.T) { return nil }) k := &Kafka{ - logger: logger.NewLogger("test"), - cg: cg, - consumerCancel: cancel, - closeCh: make(chan struct{}), - subscribeTopics: TopicHandlerConfig{"foo": SubscriptionHandlerConfig{}}, + logger: logger.NewLogger("test"), + mockConsumerGroup: cg, + consumerCancel: cancel, + closeCh: make(chan struct{}), + subscribeTopics: TopicHandlerConfig{"foo": SubscriptionHandlerConfig{}}, } k.closed.Store(true) @@ -89,11 +89,11 @@ func Test_reloadConsumerGroup(t *testing.T) { return nil }) k := &Kafka{ - logger: logger.NewLogger("test"), - cg: cg, - consumerCancel: nil, - closeCh: make(chan struct{}), - subscribeTopics: TopicHandlerConfig{"foo": SubscriptionHandlerConfig{}}, + logger: logger.NewLogger("test"), + mockConsumerGroup: cg, + consumerCancel: nil, + closeCh: make(chan struct{}), + subscribeTopics: TopicHandlerConfig{"foo": SubscriptionHandlerConfig{}}, } k.reloadConsumerGroup() @@ -114,7 +114,7 @@ func Test_reloadConsumerGroup(t *testing.T) { }) k := &Kafka{ logger: logger.NewLogger("test"), - cg: cg, + mockConsumerGroup: cg, consumerCancel: nil, closeCh: make(chan struct{}), subscribeTopics: TopicHandlerConfig{"foo": SubscriptionHandlerConfig{}}, @@ -146,7 +146,7 @@ func Test_reloadConsumerGroup(t *testing.T) { }) k := &Kafka{ logger: logger.NewLogger("test"), - cg: cg, + mockConsumerGroup: cg, consumerCancel: nil, closeCh: make(chan struct{}), subscribeTopics: map[string]SubscriptionHandlerConfig{"foo": {}}, @@ -174,7 +174,7 @@ func Test_reloadConsumerGroup(t *testing.T) { }) k := &Kafka{ logger: logger.NewLogger("test"), - cg: cg, + mockConsumerGroup: cg, consumerCancel: nil, closeCh: make(chan struct{}), subscribeTopics: map[string]SubscriptionHandlerConfig{"foo": {}}, @@ -210,7 +210,7 @@ func Test_reloadConsumerGroup(t *testing.T) { }) k := &Kafka{ logger: logger.NewLogger("test"), - cg: cg, + mockConsumerGroup: cg, consumerCancel: nil, closeCh: make(chan struct{}), subscribeTopics: map[string]SubscriptionHandlerConfig{"foo": {}}, @@ -248,7 +248,7 @@ func Test_Subscribe(t *testing.T) { }) k := &Kafka{ logger: logger.NewLogger("test"), - cg: cg, + mockConsumerGroup: cg, consumerCancel: nil, closeCh: make(chan struct{}), consumeRetryInterval: time.Millisecond, @@ -273,7 +273,7 @@ func Test_Subscribe(t *testing.T) { }) k := &Kafka{ logger: logger.NewLogger("test"), - cg: cg, + mockConsumerGroup: cg, consumerCancel: nil, closeCh: make(chan struct{}), consumeRetryInterval: time.Millisecond, @@ -302,7 +302,7 @@ func Test_Subscribe(t *testing.T) { }) k := &Kafka{ logger: logger.NewLogger("test"), - cg: cg, + mockConsumerGroup: cg, consumerCancel: nil, closeCh: make(chan struct{}), consumeRetryInterval: time.Millisecond, @@ -340,7 +340,7 @@ func Test_Subscribe(t *testing.T) { }) k := &Kafka{ logger: logger.NewLogger("test"), - cg: cg, + mockConsumerGroup: cg, consumerCancel: nil, closeCh: make(chan struct{}), consumeRetryInterval: time.Millisecond, @@ -391,7 +391,7 @@ func Test_Subscribe(t *testing.T) { }) k := &Kafka{ logger: logger.NewLogger("test"), - cg: cg, + mockConsumerGroup: cg, consumerCancel: nil, closeCh: make(chan struct{}), subscribeTopics: make(TopicHandlerConfig), @@ -421,7 +421,7 @@ func Test_Subscribe(t *testing.T) { }) k := &Kafka{ logger: logger.NewLogger("test"), - cg: cg, + mockConsumerGroup: cg, consumerCancel: nil, closeCh: make(chan struct{}), subscribeTopics: make(TopicHandlerConfig), @@ -495,7 +495,7 @@ func Test_Subscribe(t *testing.T) { }) k := &Kafka{ logger: logger.NewLogger("test"), - cg: cg, + mockConsumerGroup: cg, consumerCancel: nil, closeCh: make(chan struct{}), subscribeTopics: make(TopicHandlerConfig), From c86148ae171bf4195e44aceb5d96424edd22cca8 Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Tue, 19 Nov 2024 16:47:47 -0600 Subject: [PATCH 06/14] fix: add file i forgot Signed-off-by: Samantha Coyle --- common/component/kafka/clients.go | 63 +++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 common/component/kafka/clients.go diff --git a/common/component/kafka/clients.go b/common/component/kafka/clients.go new file mode 100644 index 0000000000..7331bd4b00 --- /dev/null +++ b/common/component/kafka/clients.go @@ -0,0 +1,63 @@ +package kafka + +import ( + "fmt" + + "github.com/IBM/sarama" + awsAuth "github.com/dapr/components-contrib/common/authentication/aws" +) + +type clients struct { + consumerGroup sarama.ConsumerGroup + producer sarama.SyncProducer +} + +func (k *Kafka) latestClients() (*clients, error) { + switch { + // case 0: use mock clients for testing + case k.mockProducer != nil || k.mockConsumerGroup != nil: + return &clients{ + consumerGroup: k.mockConsumerGroup, + producer: k.mockProducer, + }, nil + + // case 1: use aws clients with refreshable tokens in the cfg + case k.awsAuthProvider != nil: + awsKafkaOpts := awsAuth.KafkaOptions{ + Config: k.config, + ConsumerGroup: k.consumerGroup, + Brokers: k.brokers, + MaxMessageBytes: k.maxMessageBytes, + } + awsKafkaClients, err := k.awsAuthProvider.Kafka(awsKafkaOpts) + if err != nil { + return nil, fmt.Errorf("failed to get AWS IAM Kafka clients: %w", err) + } + return &clients{ + consumerGroup: awsKafkaClients.ConsumerGroup, + producer: awsKafkaClients.Producer, + }, nil + + // case 2: normal static auth profile clients + default: + if k.clients != nil { + return k.clients, nil + } + cg, err := sarama.NewConsumerGroup(k.brokers, k.consumerGroup, k.config) + if err != nil { + return nil, err + } + + p, err := GetSyncProducer(*k.config, k.brokers, k.maxMessageBytes) + if err != nil { + return nil, err + } + + newStaticClients := clients{ + consumerGroup: cg, + producer: p, + } + k.clients = &newStaticClients + return k.clients, nil + } +} From add4a99b6346b4df57683c100674da9194c00caf Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Tue, 19 Nov 2024 20:19:40 -0600 Subject: [PATCH 07/14] fix(metadata): final metadata tweaks for kafka Signed-off-by: Samantha Coyle --- .../builtin-authentication-profiles.go | 92 ++++++++++--------- 1 file changed, 49 insertions(+), 43 deletions(-) diff --git a/.build-tools/pkg/metadataschema/builtin-authentication-profiles.go b/.build-tools/pkg/metadataschema/builtin-authentication-profiles.go index 49d18803ab..eb925cd917 100644 --- a/.build-tools/pkg/metadataschema/builtin-authentication-profiles.go +++ b/.build-tools/pkg/metadataschema/builtin-authentication-profiles.go @@ -32,30 +32,46 @@ func ParseBuiltinAuthenticationProfile(bi BuiltinAuthenticationProfile, componen for i, profile := range profiles { res[i] = profile - res[i].Metadata = mergedMetadata(bi.Metadata, res[i].Metadata...) - - switch profile.Title { - case "AWS: Access Key ID and Secret Access Key": - // If component is PostgreSQL, handle deprecation of aws profile fields that we will remove in Dapr 1.17 - // Postgres has awsAccessKey and accessKey and awsSecretKey and secretKey. - // Therefore, we mark the non aws prefixed ones as not required as we deprecate the aws prefixed ones. - if strings.ToLower(componentTitle) == "postgresql" && bi.Name == "aws" { - res[i].Metadata = removeRequiredOnSomeAWSFields(res[i].Metadata) - } - // If component is Kafka, handle deprecation of aws profile fields with aws prefix that we will remove in Dapr 1.17 - if strings.ToLower(componentTitle) == "Apache Kafka" { - res[i].Metadata = removeRequiredOnSomeAWSFields(res[i].Metadata) + // convert slice to a slice of pointers to update in place for required -> non-required fields + metadataPtr := make([]*Metadata, len(profile.Metadata)) + for j := range profile.Metadata { + metadataPtr[j] = &profile.Metadata[j] + } + + if componentTitle == "Apache Kafka" { + removeRequiredOnSomeAWSFields(&metadataPtr) + } + + // convert back to value slices for merging + updatedMetadata := make([]Metadata, 0, len(metadataPtr)) + for _, ptr := range metadataPtr { + if ptr != nil { + updatedMetadata = append(updatedMetadata, *ptr) } + } + + merged := mergedMetadata(bi.Metadata, updatedMetadata...) - case "AWS: Credentials from Environment Variables", "AWS: IAM Roles Anywhere": - // These two auth profiles do not use the fields that we are deprecated, so we can manually remove the unrelated fields - res[i].Metadata = removeAllDeprecatedFieldsOnUnrelatedAuthProfiles(res[i].Metadata) - case "AWS: Assume specific IAM Role": - // This is needed bc to assume a specific IAM role, we must allow for the field of awsStsSessionName to deprecate to sessionName - res[i].Metadata = removeSomeDeprecatedFieldsOnUnrelatedAuthProfiles(res[i].Metadata) + // Note: We must apply the removal of deprecated fields after the merge!! + + // Here, we remove some deprecated fields as we support the transition to a new auth profile + if profile.Title == "AWS: Assume specific IAM Role" && componentTitle == "Apache Kafka" { + merged = removeSomeDeprecatedFieldsOnUnrelatedAuthProfiles(merged) + } + + // Here, there are no metadata fields that need deprecating + if profile.Title == "AWS: Credentials from Environment Variables" && componentTitle == "Apache Kafka" { + merged = removeAllDeprecatedFieldsOnUnrelatedAuthProfiles(merged) } + // Here, this is a new auth profile, so rm all deprecating fields as unrelated. + if profile.Title == "AWS: IAM Roles Anywhere" && componentTitle == "Apache Kafka" { + merged = removeAllDeprecatedFieldsOnUnrelatedAuthProfiles(merged) + } + + res[i].Metadata = merged } + return res, nil } @@ -72,42 +88,34 @@ func mergedMetadata(base []Metadata, add ...Metadata) []Metadata { // removeRequiredOnSomeAWSFields needs to be removed in Dapr 1.17 as duplicated AWS IAM fields get removed, // and we standardize on these fields. -// Currently, there are: awsAccessKey, accessKey and awsSecretKey, secretKey fields. -// We normally have accessKey and secretKey fields marked required as it is part of the builtin AWS auth profile fields. +// Currently, there are: awsAccessKey, accessKey and awsSecretKey, secretKey, and awsRegion and region fields. +// We normally have accessKey, secretKey, and region fields marked required as it is part of the builtin AWS auth profile fields. // However, as we rm the aws prefixed ones, we need to then mark the normally required ones as not required only for postgres and kafka. // This way we do not break existing users, and transition them to the standardized fields. -func removeRequiredOnSomeAWSFields(metadata []Metadata) []Metadata { - duplicateFields := map[string]int{ - "accessKey": 0, - "secretKey": 0, +func removeRequiredOnSomeAWSFields(metadata *[]*Metadata) { + if metadata == nil { + return } - filteredMetadata := []Metadata{} + for _, field := range *metadata { + if field == nil { + continue + } - for _, field := range metadata { - if field.Name == "accessKey" && duplicateFields["accessKey"] == 0 { - field.Required = false - filteredMetadata = append(filteredMetadata, field) - } else if field.Name == "secretKey" && duplicateFields["secretKey"] == 0 { + if field.Name == "accessKey" || field.Name == "secretKey" || field.Name == "region" { field.Required = false - filteredMetadata = append(filteredMetadata, field) } } - - return filteredMetadata } func removeAllDeprecatedFieldsOnUnrelatedAuthProfiles(metadata []Metadata) []Metadata { filteredMetadata := []Metadata{} - for _, field := range metadata { - switch field.Name { - case "awsAccessKey", "awsSecretKey", "awsSessionToken", "awsIamRoleArn", "awsStsSessionName": + if strings.HasPrefix(field.Name, "aws") { continue - default: + } else { filteredMetadata = append(filteredMetadata, field) } - } return filteredMetadata @@ -117,13 +125,11 @@ func removeSomeDeprecatedFieldsOnUnrelatedAuthProfiles(metadata []Metadata) []Me filteredMetadata := []Metadata{} for _, field := range metadata { - switch field.Name { - case "awsAccessKey", "awsSecretKey", "awsSessionToken": + if field.Name == "awsAccessKey" || field.Name == "awsSecretKey" || field.Name == "awsSessionToken" { continue - default: + } else { filteredMetadata = append(filteredMetadata, field) } - } return filteredMetadata From 16834edb1504a7e8f3d7bc9daf143e1f1ea7deb7 Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Tue, 19 Nov 2024 20:23:50 -0600 Subject: [PATCH 08/14] style: make linter happy Signed-off-by: Samantha Coyle --- common/authentication/aws/x509.go | 2 +- common/component/kafka/clients.go | 1 + common/component/kafka/kafka.go | 1 - common/component/kafka/kafka_test.go | 2 +- common/component/kafka/producer.go | 2 +- 5 files changed, 4 insertions(+), 4 deletions(-) diff --git a/common/authentication/aws/x509.go b/common/authentication/aws/x509.go index 25c615bd42..0b560f5c5a 100644 --- a/common/authentication/aws/x509.go +++ b/common/authentication/aws/x509.go @@ -463,7 +463,7 @@ func (a *x509) refreshClient() { for { newSession, err := a.createOrRefreshSession(context.Background()) if err == nil { - err := a.clients.refresh(newSession) + err = a.clients.refresh(newSession) if err != nil { a.logger.Errorf("Failed to refresh client, retrying in 5 seconds: %w", err) } diff --git a/common/component/kafka/clients.go b/common/component/kafka/clients.go index 7331bd4b00..8e8111b7b2 100644 --- a/common/component/kafka/clients.go +++ b/common/component/kafka/clients.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/IBM/sarama" + awsAuth "github.com/dapr/components-contrib/common/authentication/aws" ) diff --git a/common/component/kafka/kafka.go b/common/component/kafka/kafka.go index 88aef85ac7..b147aaae4b 100644 --- a/common/component/kafka/kafka.go +++ b/common/component/kafka/kafka.go @@ -124,7 +124,6 @@ func NewKafka(logger logger.Logger) *Kafka { } } - // Init does metadata parsing and connection establishment. func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error { upgradedMetadata, err := k.upgradeMetadata(metadata) diff --git a/common/component/kafka/kafka_test.go b/common/component/kafka/kafka_test.go index ea6834ba66..ab4362738c 100644 --- a/common/component/kafka/kafka_test.go +++ b/common/component/kafka/kafka_test.go @@ -421,7 +421,7 @@ func TestValidateAWS(t *testing.T) { } else { require.NoError(t, err) } - assert.Equal(t, result, tt.expected) + assert.Equal(t, tt.expected, result) }) } } diff --git a/common/component/kafka/producer.go b/common/component/kafka/producer.go index 33e5dd4fa3..65a8a0771b 100644 --- a/common/component/kafka/producer.go +++ b/common/component/kafka/producer.go @@ -93,7 +93,7 @@ func (k *Kafka) Publish(_ context.Context, topic string, data []byte, metadata m func (k *Kafka) BulkPublish(_ context.Context, topic string, entries []pubsub.BulkMessageEntry, metadata map[string]string) (pubsub.BulkPublishResponse, error) { clients, err := k.latestClients() if err != nil || clients == nil { - err := fmt.Errorf("failed to get latest Kafka clients: %w", err) + err = fmt.Errorf("failed to get latest Kafka clients: %w", err) return pubsub.NewBulkPublishResponse(entries, err), err } if clients.producer == nil { From 64da4cb4b621fc0f87dc6b689b192db2340a7203 Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Tue, 19 Nov 2024 21:11:31 -0600 Subject: [PATCH 09/14] fix(tests): rm test that I accounted for elsewhere Signed-off-by: Samantha Coyle --- common/component/kafka/metadata_test.go | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/common/component/kafka/metadata_test.go b/common/component/kafka/metadata_test.go index e53b1721d8..acb6eb1208 100644 --- a/common/component/kafka/metadata_test.go +++ b/common/component/kafka/metadata_test.go @@ -376,20 +376,6 @@ func TestTls(t *testing.T) { }) } -func TestAwsIam(t *testing.T) { - k := getKafka() - - t.Run("missing aws region", func(t *testing.T) { - m := getBaseMetadata() - m[authType] = awsIAMAuthType - meta, err := k.getKafkaMetadata(m) - require.Error(t, err) - require.Nil(t, meta) - - require.Equal(t, "missing AWS region property 'awsRegion' for authType 'awsiam'", err.Error()) - }) -} - func TestMetadataConsumerFetchValues(t *testing.T) { k := getKafka() m := getCompleteMetadata() From e801cd99e1a61791b5f25f172a0847f5258bb2f7 Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Tue, 26 Nov 2024 14:31:42 -0600 Subject: [PATCH 10/14] fix: update to acct for feedback Signed-off-by: Samantha Coyle --- common/component/kafka/kafka.go | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/common/component/kafka/kafka.go b/common/component/kafka/kafka.go index b147aaae4b..a7157592ee 100644 --- a/common/component/kafka/kafka.go +++ b/common/component/kafka/kafka.go @@ -238,6 +238,18 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error { k.latestSchemaCacheTTL = meta.SchemaLatestVersionCacheTTL } } + + clients, err := k.latestClients() + if err != nil || clients == nil { + return fmt.Errorf("failed to get latest Kafka clients for initialization: %w", err) + } + if clients.producer == nil { + return errors.New("component is closed") + } + if clients.consumerGroup == nil { + return errors.New("component is closed") + } + k.logger.Debug("Kafka message bus initialization complete") return nil @@ -284,8 +296,15 @@ func (k *Kafka) Close() error { k.subscribeLock.Unlock() if k.clients != nil { - errs[0] = k.clients.producer.Close() - errs[1] = k.clients.consumerGroup.Close() + if k.clients.producer != nil { + errs[0] = k.clients.producer.Close() + k.clients.producer = nil + } + if k.clients.consumerGroup != nil { + errs[1] = k.clients.consumerGroup.Close() + k.clients.consumerGroup = nil + } + } if k.awsAuthProvider != nil { errs[2] = k.awsAuthProvider.Close() From b2984651b2d2c23db5cc72ac453efb5f79467d67 Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Tue, 26 Nov 2024 16:14:40 -0600 Subject: [PATCH 11/14] style: update defaultg Signed-off-by: Samantha Coyle --- .build-tools/builtin-authentication-profiles.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.build-tools/builtin-authentication-profiles.yaml b/.build-tools/builtin-authentication-profiles.yaml index f46d324192..43e35a9998 100644 --- a/.build-tools/builtin-authentication-profiles.yaml +++ b/.build-tools/builtin-authentication-profiles.yaml @@ -47,7 +47,7 @@ aws: description: | The session name for assuming a role. example: '"MyAppSession"' - default: '"MSKSASLDefaultSession"' + default: '"DaprDefaultSession"' - title: "AWS: Credentials from Environment Variables" description: Use AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY from the environment - title: "AWS: IAM Roles Anywhere" From 02241fe7b04edbb8ee3dbf9ded6116836e2e5cb1 Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Tue, 26 Nov 2024 16:19:56 -0600 Subject: [PATCH 12/14] style: make linter happy Signed-off-by: Samantha Coyle --- common/component/kafka/kafka.go | 1 - 1 file changed, 1 deletion(-) diff --git a/common/component/kafka/kafka.go b/common/component/kafka/kafka.go index a7157592ee..d362fbe91a 100644 --- a/common/component/kafka/kafka.go +++ b/common/component/kafka/kafka.go @@ -304,7 +304,6 @@ func (k *Kafka) Close() error { errs[1] = k.clients.consumerGroup.Close() k.clients.consumerGroup = nil } - } if k.awsAuthProvider != nil { errs[2] = k.awsAuthProvider.Close() From 7c23636eefc80a688e0c30920a346355e000de64 Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Tue, 26 Nov 2024 17:38:56 -0600 Subject: [PATCH 13/14] fix: update other closes Signed-off-by: Samantha Coyle --- common/authentication/aws/static.go | 10 ++++++++-- common/authentication/aws/x509.go | 10 ++++++++-- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/common/authentication/aws/static.go b/common/authentication/aws/static.go index 9137ff9d8b..1972089fdf 100644 --- a/common/authentication/aws/static.go +++ b/common/authentication/aws/static.go @@ -287,8 +287,14 @@ func (a *StaticAuth) Close() error { errs := make([]error, 2) if a.clients.kafka != nil { - errs[0] = a.clients.kafka.Producer.Close() - errs[1] = a.clients.kafka.ConsumerGroup.Close() + if a.clients.kafka.Producer != nil { + errs[0] = a.clients.kafka.Producer.Close() + a.clients.kafka.Producer = nil + } + if a.clients.kafka.ConsumerGroup != nil { + errs[1] = a.clients.kafka.ConsumerGroup.Close() + a.clients.kafka.ConsumerGroup = nil + } } return errors.Join(errs...) } diff --git a/common/authentication/aws/x509.go b/common/authentication/aws/x509.go index 4ce31b78ad..1c6d6dcf0d 100644 --- a/common/authentication/aws/x509.go +++ b/common/authentication/aws/x509.go @@ -126,8 +126,14 @@ func (a *x509) Close() error { errs := make([]error, 2) if a.clients.kafka != nil { - errs[0] = a.clients.kafka.Producer.Close() - errs[1] = a.clients.kafka.ConsumerGroup.Close() + if a.clients.kafka.Producer != nil { + errs[0] = a.clients.kafka.Producer.Close() + a.clients.kafka.Producer = nil + } + if a.clients.kafka.ConsumerGroup != nil { + errs[1] = a.clients.kafka.ConsumerGroup.Close() + a.clients.kafka.ConsumerGroup = nil + } } return errors.Join(errs...) } From 0ce8e18d484cbb9a9520dd374b4a425cd0fb5bb1 Mon Sep 17 00:00:00 2001 From: Samantha Coyle Date: Wed, 27 Nov 2024 12:21:09 -0600 Subject: [PATCH 14/14] style: last few tweaks Signed-off-by: Samantha Coyle --- .build-tools/builtin-authentication-profiles.yaml | 4 ++-- common/authentication/aws/client.go | 12 +++++++++++- common/component/kafka/kafka.go | 8 ++++++-- common/component/kafka/metadata.go | 2 ++ common/component/kafka/producer.go | 7 ++++++- 5 files changed, 27 insertions(+), 6 deletions(-) diff --git a/.build-tools/builtin-authentication-profiles.yaml b/.build-tools/builtin-authentication-profiles.yaml index 43e35a9998..56f0621017 100644 --- a/.build-tools/builtin-authentication-profiles.yaml +++ b/.build-tools/builtin-authentication-profiles.yaml @@ -25,9 +25,9 @@ aws: description: | AWS session token to use. A session token is only required if you are using temporary security credentials. - - title: "AWS: Assume specific IAM Role" + - title: "AWS: Assume IAM Role" description: | - Assume a specific IAM role. Note: This is only supported on Kafka and PostgreSQL components. + Assume a specific IAM role. Note: This is only supported for Kafka and PostgreSQL. metadata: - name: region type: string diff --git a/common/authentication/aws/client.go b/common/authentication/aws/client.go index b96ef89298..11b26e4988 100644 --- a/common/authentication/aws/client.go +++ b/common/authentication/aws/client.go @@ -337,5 +337,15 @@ func (c *KafkaClients) getSyncProducer() (sarama.SyncProducer, error) { c.config.Producer.MaxMessageBytes = *c.maxMessageBytes } - return sarama.NewSyncProducer(*c.brokers, c.config) + saramaClient, err := sarama.NewClient(*c.brokers, c.config) + if err != nil { + return nil, err + } + + producer, err := sarama.NewSyncProducerFromClient(saramaClient) + if err != nil { + return nil, err + } + + return producer, nil } diff --git a/common/component/kafka/kafka.go b/common/component/kafka/kafka.go index 01b30ad3f6..3e930ec4b2 100644 --- a/common/component/kafka/kafka.go +++ b/common/component/kafka/kafka.go @@ -259,12 +259,13 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error { } func (k *Kafka) ValidateAWS(metadata map[string]string) (*awsAuth.DeprecatedKafkaIAM, error) { + const defaultSessionName = "DaprDefaultSession" // This is needed as we remove the aws prefixed fields to use the builtin AWS profile fields instead. region := awsAuth.Coalesce(metadata["region"], metadata["awsRegion"]) accessKey := awsAuth.Coalesce(metadata["accessKey"], metadata["awsAccessKey"]) secretKey := awsAuth.Coalesce(metadata["secretKey"], metadata["awsSecretKey"]) role := awsAuth.Coalesce(metadata["assumeRoleArn"], metadata["awsIamRoleArn"]) - session := awsAuth.Coalesce(metadata["sessionName"], metadata["awsStsSessionName"]) + session := awsAuth.Coalesce(metadata["sessionName"], metadata["awsStsSessionName"], defaultSessionName) // set default if no value is provided token := awsAuth.Coalesce(metadata["sessionToken"], metadata["awsSessionToken"]) if region == "" { @@ -287,7 +288,9 @@ func (k *Kafka) Close() error { errs := make([]error, 3) if k.closed.CompareAndSwap(false, true) { - close(k.closeCh) + if k.closeCh != nil { + close(k.closeCh) + } if k.internalContext != nil { k.internalContextCancel() } @@ -310,6 +313,7 @@ func (k *Kafka) Close() error { } if k.awsAuthProvider != nil { errs[2] = k.awsAuthProvider.Close() + k.awsAuthProvider = nil } } diff --git a/common/component/kafka/metadata.go b/common/component/kafka/metadata.go index cd4d6fd045..7122feb813 100644 --- a/common/component/kafka/metadata.go +++ b/common/component/kafka/metadata.go @@ -259,6 +259,8 @@ func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error) return nil, errors.New("missing CA certificate property 'caCert' for authType 'certificate'") } k.logger.Debug("Configuring root certificate authentication.") + case awsIAMAuthType: + k.logger.Debug("Configuring AWS IAM authentication.") default: return nil, errors.New("kafka error: invalid value for 'authType' attribute") } diff --git a/common/component/kafka/producer.go b/common/component/kafka/producer.go index 65a8a0771b..97e5a6bbed 100644 --- a/common/component/kafka/producer.go +++ b/common/component/kafka/producer.go @@ -34,7 +34,12 @@ func GetSyncProducer(config sarama.Config, brokers []string, maxMessageBytes int config.Producer.MaxMessageBytes = maxMessageBytes } - producer, err := sarama.NewSyncProducer(brokers, &config) + saramaClient, err := sarama.NewClient(brokers, &config) + if err != nil { + return nil, err + } + + producer, err := sarama.NewSyncProducerFromClient(saramaClient) if err != nil { return nil, err }