Skip to content

Commit

Permalink
style: last few tweaks
Browse files Browse the repository at this point in the history
Signed-off-by: Samantha Coyle <[email protected]>
  • Loading branch information
sicoyle committed Nov 27, 2024
1 parent 7c23636 commit 0ce8e18
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 6 deletions.
4 changes: 2 additions & 2 deletions .build-tools/builtin-authentication-profiles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ aws:
description: |
AWS session token to use. A session token is only required if you are using
temporary security credentials.
- title: "AWS: Assume specific IAM Role"
- title: "AWS: Assume IAM Role"
description: |
Assume a specific IAM role. Note: This is only supported on Kafka and PostgreSQL components.
Assume a specific IAM role. Note: This is only supported for Kafka and PostgreSQL.
metadata:
- name: region
type: string
Expand Down
12 changes: 11 additions & 1 deletion common/authentication/aws/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,5 +337,15 @@ func (c *KafkaClients) getSyncProducer() (sarama.SyncProducer, error) {
c.config.Producer.MaxMessageBytes = *c.maxMessageBytes
}

return sarama.NewSyncProducer(*c.brokers, c.config)
saramaClient, err := sarama.NewClient(*c.brokers, c.config)
if err != nil {
return nil, err
}

producer, err := sarama.NewSyncProducerFromClient(saramaClient)
if err != nil {
return nil, err
}

return producer, nil
}
8 changes: 6 additions & 2 deletions common/component/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,12 +259,13 @@ func (k *Kafka) Init(ctx context.Context, metadata map[string]string) error {
}

func (k *Kafka) ValidateAWS(metadata map[string]string) (*awsAuth.DeprecatedKafkaIAM, error) {
const defaultSessionName = "DaprDefaultSession"
// This is needed as we remove the aws prefixed fields to use the builtin AWS profile fields instead.
region := awsAuth.Coalesce(metadata["region"], metadata["awsRegion"])
accessKey := awsAuth.Coalesce(metadata["accessKey"], metadata["awsAccessKey"])
secretKey := awsAuth.Coalesce(metadata["secretKey"], metadata["awsSecretKey"])
role := awsAuth.Coalesce(metadata["assumeRoleArn"], metadata["awsIamRoleArn"])
session := awsAuth.Coalesce(metadata["sessionName"], metadata["awsStsSessionName"])
session := awsAuth.Coalesce(metadata["sessionName"], metadata["awsStsSessionName"], defaultSessionName) // set default if no value is provided
token := awsAuth.Coalesce(metadata["sessionToken"], metadata["awsSessionToken"])

if region == "" {
Expand All @@ -287,7 +288,9 @@ func (k *Kafka) Close() error {

errs := make([]error, 3)
if k.closed.CompareAndSwap(false, true) {
close(k.closeCh)
if k.closeCh != nil {
close(k.closeCh)
}
if k.internalContext != nil {
k.internalContextCancel()
}
Expand All @@ -310,6 +313,7 @@ func (k *Kafka) Close() error {
}
if k.awsAuthProvider != nil {
errs[2] = k.awsAuthProvider.Close()
k.awsAuthProvider = nil
}
}

Expand Down
2 changes: 2 additions & 0 deletions common/component/kafka/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error)
return nil, errors.New("missing CA certificate property 'caCert' for authType 'certificate'")
}
k.logger.Debug("Configuring root certificate authentication.")
case awsIAMAuthType:
k.logger.Debug("Configuring AWS IAM authentication.")
default:
return nil, errors.New("kafka error: invalid value for 'authType' attribute")
}
Expand Down
7 changes: 6 additions & 1 deletion common/component/kafka/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ func GetSyncProducer(config sarama.Config, brokers []string, maxMessageBytes int
config.Producer.MaxMessageBytes = maxMessageBytes
}

producer, err := sarama.NewSyncProducer(brokers, &config)
saramaClient, err := sarama.NewClient(brokers, &config)
if err != nil {
return nil, err
}

producer, err := sarama.NewSyncProducerFromClient(saramaClient)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 0ce8e18

Please sign in to comment.