Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kafka): iam roles anywhere + assume role auth profiles #3606

Merged
merged 20 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions .build-tools/builtin-authentication-profiles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,31 @@ aws:
description: |
AWS session token to use. A session token is only required if you are using
temporary security credentials.
- title: "AWS: Assume specific IAM Role"
description: |
Assume a specific IAM role. Note: This is only supported on Kafka and PostgreSQL components.
metadata:
- name: sessionToken
sicoyle marked this conversation as resolved.
Show resolved Hide resolved
required: false
sensitive: true
description: |
AWS session token to use. A session token is only required if you are using
temporary security credentials.
example: '"TOKEN"'
type: string
- name: assumeRoleArn
type: string
required: false
description: |
IAM role that has access to AWS resource.
This is another option to authenticate with MSK and RDS Aurora aside from the AWS Credentials.
example: '"arn:aws:iam::123456789:role/mskRole"'
- name: sessionName
type: string
description: |
The session name for assuming a role.
example: '"MyAppSession"'
default: '"MSKSASLDefaultSession"'
- title: "AWS: Credentials from Environment Variables"
description: Use AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY from the environment
- title: "AWS: IAM Roles Anywhere"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,25 @@ func ParseBuiltinAuthenticationProfile(bi BuiltinAuthenticationProfile, componen

res[i].Metadata = mergedMetadata(bi.Metadata, res[i].Metadata...)

// If component is PostgreSQL, filter out duplicated aws profile fields
if strings.ToLower(componentTitle) == "postgresql" && bi.Name == "aws" {
res[i].Metadata = filterOutDuplicateFields(res[i].Metadata)
switch profile.Title {
case "AWS: Access Key ID and Secret Access Key":
// If component is PostgreSQL, handle deprecation of aws profile fields that we will remove in Dapr 1.17
// Postgres has awsAccessKey and accessKey and awsSecretKey and secretKey.
// Therefore, we mark the non aws prefixed ones as not required as we deprecate the aws prefixed ones.
if strings.ToLower(componentTitle) == "postgresql" && bi.Name == "aws" {
res[i].Metadata = removeRequiredOnSomeAWSFields(res[i].Metadata)
}
// If component is Kafka, handle deprecation of aws profile fields with aws prefix that we will remove in Dapr 1.17
if strings.ToLower(componentTitle) == "Apache Kafka" {
res[i].Metadata = removeRequiredOnSomeAWSFields(res[i].Metadata)
}

case "AWS: Credentials from Environment Variables", "AWS: IAM Roles Anywhere":
// These two auth profiles do not use the fields that we are deprecated, so we can manually remove the unrelated fields
res[i].Metadata = removeAllDeprecatedFieldsOnUnrelatedAuthProfiles(res[i].Metadata)
case "AWS: Assume specific IAM Role":
// This is needed bc to assume a specific IAM role, we must allow for the field of awsStsSessionName to deprecate to sessionName
res[i].Metadata = removeSomeDeprecatedFieldsOnUnrelatedAuthProfiles(res[i].Metadata)
}

}
Expand All @@ -54,28 +70,61 @@ func mergedMetadata(base []Metadata, add ...Metadata) []Metadata {
return res
}

// filterOutDuplicateFields removes specific duplicated fields from the metadata
func filterOutDuplicateFields(metadata []Metadata) []Metadata {
// removeRequiredOnSomeAWSFields needs to be removed in Dapr 1.17 as duplicated AWS IAM fields get removed,
// and we standardize on these fields.
// Currently, there are: awsAccessKey, accessKey and awsSecretKey, secretKey fields.
// We normally have accessKey and secretKey fields marked required as it is part of the builtin AWS auth profile fields.
// However, as we rm the aws prefixed ones, we need to then mark the normally required ones as not required only for postgres and kafka.
// This way we do not break existing users, and transition them to the standardized fields.
func removeRequiredOnSomeAWSFields(metadata []Metadata) []Metadata {
duplicateFields := map[string]int{
"awsRegion": 0,
"accessKey": 0,
"secretKey": 0,
}

filteredMetadata := []Metadata{}

for _, field := range metadata {
if _, exists := duplicateFields[field.Name]; !exists {
if field.Name == "accessKey" && duplicateFields["accessKey"] == 0 {
field.Required = false
filteredMetadata = append(filteredMetadata, field)
} else if field.Name == "secretKey" && duplicateFields["secretKey"] == 0 {
field.Required = false
filteredMetadata = append(filteredMetadata, field)
} else {
if field.Name == "awsRegion" && duplicateFields["awsRegion"] == 0 {
filteredMetadata = append(filteredMetadata, field)
duplicateFields["awsRegion"]++
} else if field.Name != "awsRegion" {
continue
}
}
}

return filteredMetadata
}

func removeAllDeprecatedFieldsOnUnrelatedAuthProfiles(metadata []Metadata) []Metadata {
filteredMetadata := []Metadata{}

for _, field := range metadata {
switch field.Name {
case "awsAccessKey", "awsSecretKey", "awsSessionToken", "awsIamRoleArn", "awsStsSessionName":
continue
default:
filteredMetadata = append(filteredMetadata, field)
}

}

return filteredMetadata
}

func removeSomeDeprecatedFieldsOnUnrelatedAuthProfiles(metadata []Metadata) []Metadata {
filteredMetadata := []Metadata{}

for _, field := range metadata {
switch field.Name {
case "awsAccessKey", "awsSecretKey", "awsSessionToken":
continue
default:
filteredMetadata = append(filteredMetadata, field)
}

}

return filteredMetadata
}
102 changes: 53 additions & 49 deletions bindings/kafka/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,59 @@ binding:
operations:
- name: create
description: "Publish a new message in the topic."
builtinAuthenticationProfiles:
- name: "aws"
sicoyle marked this conversation as resolved.
Show resolved Hide resolved
metadata:
- name: authType
type: string
required: true
description: |
Authentication type.
This must be set to "awsiam" for this authentication profile.
example: '"awsiam"'
allowedValues:
- "awsiam"
- name: awsAccessKey
type: string
required: false
description: |
Deprecated as of Dapr 1.17. Use 'accessKey' instead.
If both fields are set, then 'accessKey' value will be used.
AWS access key associated with an IAM account.
example: '"AKIAIOSFODNN7EXAMPLE"'
- name: awsSecretKey
type: string
required: false
sensitive: true
description: |
Deprecated as of Dapr 1.17. Use 'secretKey' instead.
If both fields are set, then 'secretKey' value will be used.
The secret key associated with the access key.
example: '"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"'
- name: awsSessionToken
type: string
sensitive: true
description: |
Deprecated as of Dapr 1.17. Use 'sessionToken' instead.
If both fields are set, then 'sessionToken' value will be used.
AWS session token to use. A session token is only required if you are using temporary security credentials.
example: '"TOKEN"'
- name: awsIamRoleArn
type: string
required: false
description: |
Deprecated as of Dapr 1.17. Use 'assumeRoleArn' instead.
If both fields are set, then 'assumeRoleArn' value will be used.
IAM role that has access to MSK. This is another option to authenticate with MSK aside from the AWS Credentials.
example: '"arn:aws:iam::123456789:role/mskRole"'
- name: awsStsSessionName
type: string
description: |
Deprecated as of Dapr 1.17. Use 'sessionName' instead.
If both fields are set, then 'sessionName' value will be used.
Represents the session name for assuming a role.
example: '"MyAppSession"'
default: '"MSKSASLDefaultSession"'
authenticationProfiles:
- title: "OIDC Authentication"
description: |
Expand Down Expand Up @@ -139,55 +192,6 @@ authenticationProfiles:
example: '"none"'
allowedValues:
- "none"
- title: "AWS IAM"
description: "Authenticate using AWS IAM credentials or role for AWS MSK"
metadata:
- name: authType
type: string
required: true
description: |
Authentication type.
This must be set to "awsiam" for this authentication profile.
example: '"awsiam"'
allowedValues:
- "awsiam"
- name: awsRegion
type: string
required: true
description: |
The AWS Region where the MSK Kafka broker is deployed to.
example: '"us-east-1"'
- name: awsAccessKey
type: string
required: true
description: |
AWS access key associated with an IAM account.
example: '"AKIAIOSFODNN7EXAMPLE"'
- name: awsSecretKey
type: string
required: true
sensitive: true
description: |
The secret key associated with the access key.
example: '"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"'
- name: awsSessionToken
type: string
sensitive: true
description: |
AWS session token to use. A session token is only required if you are using temporary security credentials.
example: '"TOKEN"'
- name: awsIamRoleArn
type: string
required: true
description: |
IAM role that has access to MSK. This is another option to authenticate with MSK aside from the AWS Credentials.
example: '"arn:aws:iam::123456789:role/mskRole"'
- name: awsStsSessionName
type: string
description: |
Represents the session name for assuming a role.
example: '"MyAppSession"'
default: '"MSKSASLDefaultSession"'
metadata:
- name: topics
type: string
Expand Down
12 changes: 9 additions & 3 deletions common/authentication/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"strconv"
"time"

"github.com/IBM/sarama"
"github.com/aws/aws-sdk-go-v2/config"
v2creds "github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/feature/rds/auth"
Expand Down Expand Up @@ -59,9 +60,11 @@ type Options struct {
PoolConfig *pgxpool.Config `json:"poolConfig" mapstructure:"poolConfig"`
ConnectionString string `json:"connectionString" mapstructure:"connectionString"`

Region string `json:"region" mapstructure:"region"`
AccessKey string `json:"accessKey" mapstructure:"accessKey"`
SecretKey string `json:"secretKey" mapstructure:"secretKey"`
Region string `json:"region" mapstructure:"region"`
AccessKey string `json:"accessKey" mapstructure:"accessKey"`
SecretKey string `json:"secretKey" mapstructure:"secretKey"`
SessionName string `mapstructure:"sessionName"`
AssumeRoleARN string `mapstructure:"assumeRoleArn"`

Endpoint string
SessionToken string
Expand All @@ -80,6 +83,7 @@ func GetConfig(opts Options) *aws.Config {
return cfg
}

//nolint:interfacebloat
type Provider interface {
S3() *S3Clients
DynamoDB() *DynamoDBClients
Expand All @@ -91,6 +95,8 @@ type Provider interface {
Kinesis() *KinesisClients
Ses() *SesClients

UpdateKafka(*sarama.Config) error

Close() error
}

Expand Down
41 changes: 41 additions & 0 deletions common/authentication/aws/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ import (
"context"
"errors"
"sync"
"time"

"github.com/IBM/sarama"
"github.com/aws/aws-msk-iam-sasl-signer-go/signer"
aws2 "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
Expand Down Expand Up @@ -207,3 +211,40 @@ func (c *KinesisClients) WorkerCfg(ctx context.Context, stream, consumer, mode s
func (c *SesClients) New(session *session.Session) {
c.Ses = ses.New(session, session.Config)
}

// Kafka specific
type mskTokenProvider struct {
generateTokenTimeout time.Duration
accessKey string
secretKey string
sessionToken string
awsIamRoleArn string
awsStsSessionName string
region string
}

func (m *mskTokenProvider) Token() (*sarama.AccessToken, error) {
// this function can't use the context passed on Init because that context would be cancelled right after Init
ctx, cancel := context.WithTimeout(context.Background(), m.generateTokenTimeout)
defer cancel()

switch {
// we must first check if we are using the assume role auth profile
case m.awsIamRoleArn != "" && m.awsStsSessionName != "":
token, _, err := signer.GenerateAuthTokenFromRole(ctx, m.region, m.awsIamRoleArn, m.awsStsSessionName)
return &sarama.AccessToken{Token: token}, err
case m.accessKey != "" && m.secretKey != "":
token, _, err := signer.GenerateAuthTokenFromCredentialsProvider(ctx, m.region, aws2.CredentialsProviderFunc(func(ctx context.Context) (aws2.Credentials, error) {
return aws2.Credentials{
AccessKeyID: m.accessKey,
SecretAccessKey: m.secretKey,
SessionToken: m.sessionToken,
}, nil
}))
return &sarama.AccessToken{Token: token}, err

default: // load default aws creds
token, _, err := signer.GenerateAuthToken(ctx, m.region)
return &sarama.AccessToken{Token: token}, err
}
}
Loading
Loading