Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(iam auth): allow iam roles anywhere auth profile #3591

Merged
merged 43 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
5bd981c
feat(iam auth): allow iam roles anywhere auth profile
sicoyle Nov 4, 2024
45db3a1
fix(build): update more aws components
sicoyle Nov 4, 2024
58bbeaa
fix(metadata): add endpoint field to options
sicoyle Nov 4, 2024
e9493ae
style: update descriptions on new fields
sicoyle Nov 4, 2024
bff9b16
fix: acct for nil check
sicoyle Nov 4, 2024
fd30089
style: make linter happy
sicoyle Nov 4, 2024
f712b69
style: more linter fixes
sicoyle Nov 4, 2024
78cf670
style: final linter tweaks
sicoyle Nov 4, 2024
8aec6a5
fix(session): apply auto refresh to s3
sicoyle Nov 6, 2024
3a5c8bf
style: mv x509 auth around
sicoyle Nov 6, 2024
43ba1e3
refactor: overhaul + interfaces for cleanliness + tests + update comps
sicoyle Nov 11, 2024
423e993
fix: address initial feedback and fix tests
sicoyle Nov 12, 2024
b228e39
test: add tests and make things more testable
sicoyle Nov 12, 2024
b69e83c
Merge branch 'main' into feat-iam-rolesanywhere-auth
sicoyle Nov 12, 2024
4b1ad20
style: make linter happy
sicoyle Nov 12, 2024
ce94dfe
style: clean up logs
sicoyle Nov 12, 2024
30de3db
style: more linter things and adjust for mocking client
sicoyle Nov 12, 2024
f8e3567
fix: make 1 hr default timeout
sicoyle Nov 12, 2024
3e6a471
fix: update more tests
sicoyle Nov 12, 2024
bb22450
fix: address final feedback
sicoyle Nov 12, 2024
5a17558
fix(conformance): try to inject mocked creds for session
sicoyle Nov 12, 2024
72fe803
style: make linter happy
sicoyle Nov 13, 2024
d4f23ba
fix: go back on conformance test changes
sicoyle Nov 13, 2024
67ed5ec
fix: try this for conformance
sicoyle Nov 13, 2024
f80c594
fix: try another tweak for secretmanager
sicoyle Nov 13, 2024
29ae9f0
fix(test): fix dynamo unit test
sicoyle Nov 13, 2024
a9ce95e
fix(snssqs): see if this fixes snssqs conformance
sicoyle Nov 13, 2024
4f8c154
fix: this is what i need for conformance :)
sicoyle Nov 13, 2024
7749826
fix: update cfgs for aws
sicoyle Nov 13, 2024
690b3ec
fix(test): update for unit test
sicoyle Nov 13, 2024
a42e742
fix(cfg): leverage opts in cfgs for aws
sicoyle Nov 13, 2024
b5e1d97
fix: minor tweaks
sicoyle Nov 13, 2024
9e9d086
Merge branch 'main' into feat-iam-rolesanywhere-auth
sicoyle Nov 13, 2024
923ee94
style: final tweaks
sicoyle Nov 13, 2024
d8af3c0
Merge branch 'feat-iam-rolesanywhere-auth' of github.com:sicoyle/comp…
sicoyle Nov 13, 2024
37021c5
fix: update default to be one hour with timeout
sicoyle Nov 13, 2024
b081bfe
style: session duration can default to 1h so not required
sicoyle Nov 13, 2024
6263e19
Update builtin-authentication-profiles.yaml
sicoyle Nov 13, 2024
e6f7699
fix: address final feedback
sicoyle Nov 14, 2024
3d4da39
fix: address final feedback
sicoyle Nov 14, 2024
a300a8c
style: make linter happy
sicoyle Nov 14, 2024
0b80a39
fix: allow for mocked clients without exported field
sicoyle Nov 14, 2024
cde5a10
fix: add one last closer
sicoyle Nov 14, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions .build-tools/builtin-authentication-profiles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,32 @@ aws:
type: string
- title: "AWS: Credentials from Environment Variables"
description: Use AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY from the environment
- title: "AWS: IAM Roles Anywhere"
description: Use X.509 certificates to establish trust between AWS and your AWS account and the Dapr cluster using AWS IAM Roles Anywhere.
metadata:
- name: trustAnchorArn
description: |
ARN of the AWS Trust Anchor in the AWS account granting trust to the Dapr Certificate Authority.
example: arn:aws:rolesanywhere:us-west-1:012345678910:trust-anchor/01234568-0123-0123-0123-012345678901
required: true
- name: trustProfileArn
description: |
ARN of the AWS IAM Profile in the trusting AWS account.
example: arn:aws:rolesanywhere:us-west-1:012345678910:profile/01234568-0123-0123-0123-012345678901
required: true
- name: assumeRoleArn
description: |
ARN of the AWS IAM role to assume in the trusting AWS account.
example: arn:aws:iam:012345678910:role/exampleIAMRoleName
required: true
- name: sessionDuration
type: duration
description: |
Duration of the session using AWS IAM Roles Anywhere.
If set to 0m, temporary credentials will be created and automatically rotated.
default: '1h'
example: '0m'
required: false

azuread:
- title: "Azure AD: Managed identity"
Expand Down
37 changes: 18 additions & 19 deletions bindings/aws/dynamodb/dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ import (

// DynamoDB allows performing stateful operations on AWS DynamoDB.
type DynamoDB struct {
client *dynamodb.DynamoDB
table string
logger logger.Logger
authProvider awsAuth.Provider
table string
logger logger.Logger
}

type dynamoDBMetadata struct {
Expand All @@ -51,18 +51,27 @@ func NewDynamoDB(logger logger.Logger) bindings.OutputBinding {
}

// Init performs connection parsing for DynamoDB.
func (d *DynamoDB) Init(_ context.Context, metadata bindings.Metadata) error {
func (d *DynamoDB) Init(ctx context.Context, metadata bindings.Metadata) error {
meta, err := d.getDynamoDBMetadata(metadata)
if err != nil {
return err
}

client, err := d.getClient(meta)
opts := awsAuth.Options{
Logger: d.logger,
Properties: metadata.Properties,
Region: meta.Region,
Endpoint: meta.Endpoint,
AccessKey: meta.AccessKey,
SecretKey: meta.SecretKey,
SessionToken: meta.SessionToken,
}

provider, err := awsAuth.NewProvider(ctx, opts, awsAuth.GetConfig(opts))
if err != nil {
return err
}

d.client = client
d.authProvider = provider
d.table = meta.Table

return nil
Expand All @@ -84,7 +93,7 @@ func (d *DynamoDB) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bi
return nil, err
}

_, err = d.client.PutItemWithContext(ctx, &dynamodb.PutItemInput{
_, err = d.authProvider.DynamoDB().DynamoDB.PutItemWithContext(ctx, &dynamodb.PutItemInput{
Item: item,
TableName: aws.String(d.table),
})
Expand All @@ -105,16 +114,6 @@ func (d *DynamoDB) getDynamoDBMetadata(spec bindings.Metadata) (*dynamoDBMetadat
return &meta, nil
}

func (d *DynamoDB) getClient(metadata *dynamoDBMetadata) (*dynamodb.DynamoDB, error) {
sess, err := awsAuth.GetClient(metadata.AccessKey, metadata.SecretKey, metadata.SessionToken, metadata.Region, metadata.Endpoint)
if err != nil {
return nil, err
}
c := dynamodb.New(sess)

return c, nil
}

// GetComponentMetadata returns the metadata of the component.
func (d *DynamoDB) GetComponentMetadata() (metadataInfo metadata.MetadataMap) {
metadataStruct := dynamoDBMetadata{}
Expand All @@ -123,5 +122,5 @@ func (d *DynamoDB) GetComponentMetadata() (metadataInfo metadata.MetadataMap) {
}

func (d *DynamoDB) Close() error {
return nil
return d.authProvider.Close()
}
89 changes: 38 additions & 51 deletions bindings/aws/kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
"github.com/vmware/vmware-go-kcl/clientlibrary/config"
"github.com/vmware/vmware-go-kcl/clientlibrary/interfaces"
"github.com/vmware/vmware-go-kcl/clientlibrary/worker"

Expand All @@ -40,15 +39,16 @@ import (

// AWSKinesis allows receiving and sending data to/from AWS Kinesis stream.
type AWSKinesis struct {
client *kinesis.Kinesis
metadata *kinesisMetadata
authProvider awsAuth.Provider
metadata *kinesisMetadata

worker *worker.Worker
workerConfig *config.KinesisClientLibConfiguration
worker *worker.Worker

streamARN *string
consumerARN *string
logger logger.Logger
streamName string
consumerName string
consumerARN *string
logger logger.Logger
consumerMode string

closed atomic.Bool
closeCh chan struct{}
Expand Down Expand Up @@ -112,30 +112,25 @@ func (a *AWSKinesis) Init(ctx context.Context, metadata bindings.Metadata) error
return fmt.Errorf("%s invalid \"mode\" field %s", "aws.kinesis", m.KinesisConsumerMode)
}

client, err := a.getClient(m)
if err != nil {
return err
}
a.consumerMode = m.KinesisConsumerMode
a.streamName = m.StreamName
a.consumerName = m.ConsumerName
a.metadata = m

streamName := aws.String(m.StreamName)
stream, err := client.DescribeStreamWithContext(ctx, &kinesis.DescribeStreamInput{
StreamName: streamName,
})
opts := awsAuth.Options{
Logger: a.logger,
Properties: metadata.Properties,
Region: m.Region,
AccessKey: m.AccessKey,
SecretKey: m.SecretKey,
SessionToken: "",
}
// extra configs needed per component type
provider, err := awsAuth.NewProvider(ctx, opts, awsAuth.GetConfig(opts))
if err != nil {
return err
}

if m.KinesisConsumerMode == SharedThroughput {
kclConfig := config.NewKinesisClientLibConfigWithCredential(m.ConsumerName,
m.StreamName, m.Region, m.ConsumerName,
client.Config.Credentials)
a.workerConfig = kclConfig
}

a.streamARN = stream.StreamDescription.StreamARN
a.metadata = m
a.client = client

a.authProvider = provider
return nil
}

Expand All @@ -148,7 +143,7 @@ func (a *AWSKinesis) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*
if partitionKey == "" {
partitionKey = uuid.New().String()
}
_, err := a.client.PutRecordWithContext(ctx, &kinesis.PutRecordInput{
_, err := a.authProvider.Kinesis().Kinesis.PutRecordWithContext(ctx, &kinesis.PutRecordInput{
StreamName: &a.metadata.StreamName,
Data: req.Data,
PartitionKey: &partitionKey,
Expand All @@ -161,16 +156,15 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er
if a.closed.Load() {
return errors.New("binding is closed")
}

if a.metadata.KinesisConsumerMode == SharedThroughput {
a.worker = worker.NewWorker(a.recordProcessorFactory(ctx, handler), a.workerConfig)
a.worker = worker.NewWorker(a.recordProcessorFactory(ctx, handler), a.authProvider.Kinesis().WorkerCfg(ctx, a.streamName, a.consumerName, a.consumerMode))
err = a.worker.Start()
if err != nil {
return err
}
} else if a.metadata.KinesisConsumerMode == ExtendedFanout {
var stream *kinesis.DescribeStreamOutput
stream, err = a.client.DescribeStream(&kinesis.DescribeStreamInput{StreamName: &a.metadata.StreamName})
stream, err = a.authProvider.Kinesis().Kinesis.DescribeStream(&kinesis.DescribeStreamInput{StreamName: &a.metadata.StreamName})
if err != nil {
return err
}
Expand All @@ -180,6 +174,10 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er
}
}

stream, err := a.authProvider.Kinesis().Stream(ctx, a.streamName)
if err != nil {
return fmt.Errorf("failed to get kinesis stream arn: %v", err)
}
// Wait for context cancelation then stop
a.wg.Add(1)
go func() {
Expand All @@ -191,7 +189,7 @@ func (a *AWSKinesis) Read(ctx context.Context, handler bindings.Handler) (err er
if a.metadata.KinesisConsumerMode == SharedThroughput {
a.worker.Shutdown()
} else if a.metadata.KinesisConsumerMode == ExtendedFanout {
a.deregisterConsumer(a.streamARN, a.consumerARN)
a.deregisterConsumer(ctx, stream, a.consumerARN)
}
}()

Expand Down Expand Up @@ -226,8 +224,7 @@ func (a *AWSKinesis) Subscribe(ctx context.Context, streamDesc kinesis.StreamDes
return
default:
}

sub, err := a.client.SubscribeToShardWithContext(ctx, &kinesis.SubscribeToShardInput{
sub, err := a.authProvider.Kinesis().Kinesis.SubscribeToShardWithContext(ctx, &kinesis.SubscribeToShardInput{
ConsumerARN: consumerARN,
ShardId: s.ShardId,
StartingPosition: &kinesis.StartingPosition{Type: aws.String(kinesis.ShardIteratorTypeLatest)},
Expand Down Expand Up @@ -269,14 +266,14 @@ func (a *AWSKinesis) Close() error {
close(a.closeCh)
}
a.wg.Wait()
return nil
return a.authProvider.Close()
}

func (a *AWSKinesis) ensureConsumer(ctx context.Context, streamARN *string) (*string, error) {
// Only set timeout on consumer call.
conCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
consumer, err := a.client.DescribeStreamConsumerWithContext(conCtx, &kinesis.DescribeStreamConsumerInput{
consumer, err := a.authProvider.Kinesis().Kinesis.DescribeStreamConsumerWithContext(conCtx, &kinesis.DescribeStreamConsumerInput{
ConsumerName: &a.metadata.ConsumerName,
StreamARN: streamARN,
})
Expand All @@ -288,7 +285,7 @@ func (a *AWSKinesis) ensureConsumer(ctx context.Context, streamARN *string) (*st
}

func (a *AWSKinesis) registerConsumer(ctx context.Context, streamARN *string) (*string, error) {
consumer, err := a.client.RegisterStreamConsumerWithContext(ctx, &kinesis.RegisterStreamConsumerInput{
consumer, err := a.authProvider.Kinesis().Kinesis.RegisterStreamConsumerWithContext(ctx, &kinesis.RegisterStreamConsumerInput{
ConsumerName: &a.metadata.ConsumerName,
StreamARN: streamARN,
})
Expand All @@ -307,11 +304,11 @@ func (a *AWSKinesis) registerConsumer(ctx context.Context, streamARN *string) (*
return consumer.Consumer.ConsumerARN, nil
}

func (a *AWSKinesis) deregisterConsumer(streamARN *string, consumerARN *string) error {
func (a *AWSKinesis) deregisterConsumer(ctx context.Context, streamARN *string, consumerARN *string) error {
if a.consumerARN != nil {
// Use a background context because the running context may have been canceled already
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
_, err := a.client.DeregisterStreamConsumerWithContext(ctx, &kinesis.DeregisterStreamConsumerInput{
_, err := a.authProvider.Kinesis().Kinesis.DeregisterStreamConsumerWithContext(ctx, &kinesis.DeregisterStreamConsumerInput{
ConsumerARN: consumerARN,
StreamARN: streamARN,
ConsumerName: &a.metadata.ConsumerName,
Expand Down Expand Up @@ -342,7 +339,7 @@ func (a *AWSKinesis) waitUntilConsumerExists(ctx aws.Context, input *kinesis.Des
tmp := *input
inCpy = &tmp
}
req, _ := a.client.DescribeStreamConsumerRequest(inCpy)
req, _ := a.authProvider.Kinesis().Kinesis.DescribeStreamConsumerRequest(inCpy)
req.SetContext(ctx)
req.ApplyOptions(opts...)

Expand All @@ -354,16 +351,6 @@ func (a *AWSKinesis) waitUntilConsumerExists(ctx aws.Context, input *kinesis.Des
return w.WaitWithContext(ctx)
}

func (a *AWSKinesis) getClient(metadata *kinesisMetadata) (*kinesis.Kinesis, error) {
sess, err := awsAuth.GetClient(metadata.AccessKey, metadata.SecretKey, metadata.SessionToken, metadata.Region, metadata.Endpoint)
if err != nil {
return nil, err
}
k := kinesis.New(sess)

return k, nil
}

func (a *AWSKinesis) parseMetadata(meta bindings.Metadata) (*kinesisMetadata, error) {
var m kinesisMetadata
err := kitmd.DecodeMetadata(meta.Properties, &m)
Expand Down
Loading
Loading