From dbc1f649abab90693ff1086cd00480d7010f3c7d Mon Sep 17 00:00:00 2001 From: Hristo Stoychev Date: Tue, 23 Nov 2021 17:11:55 +0200 Subject: [PATCH] Improve reading kinesis stream state --- internal/service/kinesis/stream.go | 99 +++++++++++++------ .../service/kinesis/stream_data_source.go | 6 ++ .../kinesis/stream_data_source_test.go | 2 + website/docs/d/kinesis_stream.html.markdown | 1 + 4 files changed, 80 insertions(+), 28 deletions(-) diff --git a/internal/service/kinesis/stream.go b/internal/service/kinesis/stream.go index 728fa58d24a..6eb7b27f1fa 100644 --- a/internal/service/kinesis/stream.go +++ b/internal/service/kinesis/stream.go @@ -146,10 +146,10 @@ func resourceStreamCreate(d *schema.ResourceData, meta interface{}) error { sn, err) } - s := streamRaw.(*kinesisStreamState) + s := streamRaw.(*kinesisStreamSummary) d.SetId(s.arn) d.Set("arn", s.arn) - d.Set("shard_count", len(s.openShards)) + d.Set("shard_count", s.openShardsCount) return resourceStreamUpdate(d, meta) } @@ -190,7 +190,7 @@ func resourceStreamRead(d *schema.ResourceData, meta interface{}) error { sn := d.Get("name").(string) - state, err := readKinesisStreamState(conn, sn) + state, err := readKinesisStreamSummary(conn, sn) if err != nil { if awsErr, ok := err.(awserr.Error); ok { if awsErr.Code() == kinesis.ErrCodeResourceNotFoundException { @@ -204,7 +204,7 @@ func resourceStreamRead(d *schema.ResourceData, meta interface{}) error { } d.SetId(state.arn) d.Set("arn", state.arn) - d.Set("shard_count", len(state.openShards)) + d.Set("shard_count", state.openShardsCount) d.Set("retention_period", state.retentionPeriod) d.Set("encryption_type", state.encryptionType) @@ -446,47 +446,90 @@ func updateKinesisShardLevelMetrics(conn *kinesis.Kinesis, d *schema.ResourceDat return nil } -type kinesisStreamState struct { +type kinesisStreamSummary struct { arn string + name string creationTimestamp int64 status string retentionPeriod int64 - openShards []string - closedShards []string shardLevelMetrics []string encryptionType string keyId string + openShardsCount int64 + consumersCount int64 } -func readKinesisStreamState(conn *kinesis.Kinesis, sn string) (*kinesisStreamState, error) { - describeOpts := &kinesis.DescribeStreamInput{ +func readKinesisStreamSummary(conn *kinesis.Kinesis, sn string) (*kinesisStreamSummary, error) { + describeOpts := &kinesis.DescribeStreamSummaryInput{ StreamName: aws.String(sn), } - state := &kinesisStreamState{} - err := conn.DescribeStreamPages(describeOpts, func(page *kinesis.DescribeStreamOutput, lastPage bool) (shouldContinue bool) { - state.arn = aws.StringValue(page.StreamDescription.StreamARN) - state.creationTimestamp = aws.TimeValue(page.StreamDescription.StreamCreationTimestamp).Unix() - state.status = aws.StringValue(page.StreamDescription.StreamStatus) - state.retentionPeriod = aws.Int64Value(page.StreamDescription.RetentionPeriodHours) - state.openShards = append(state.openShards, FlattenShards(FilterShards(page.StreamDescription.Shards, true))...) - state.closedShards = append(state.closedShards, FlattenShards(FilterShards(page.StreamDescription.Shards, false))...) - state.shardLevelMetrics = FlattenShardLevelMetrics(page.StreamDescription.EnhancedMonitoring) - // EncryptionType can be nil in certain APIs, e.g. AWS China - if page.StreamDescription.EncryptionType != nil { - state.encryptionType = aws.StringValue(page.StreamDescription.EncryptionType) - } else { - state.encryptionType = kinesis.EncryptionTypeNone + streamSummaryResponse, err := conn.DescribeStreamSummary(describeOpts) + if err != nil { + return nil, err + } + + streamSummaryDescription := streamSummaryResponse.StreamDescriptionSummary + + return &kinesisStreamSummary{ + arn: aws.StringValue(streamSummaryDescription.StreamARN), + name: aws.StringValue(streamSummaryDescription.StreamName), + creationTimestamp: aws.TimeValue(streamSummaryDescription.StreamCreationTimestamp).Unix(), + status: aws.StringValue(streamSummaryDescription.StreamStatus), + retentionPeriod: aws.Int64Value(streamSummaryDescription.RetentionPeriodHours), + shardLevelMetrics: FlattenShardLevelMetrics(streamSummaryDescription.EnhancedMonitoring), + encryptionType: aws.StringValue(streamSummaryDescription.EncryptionType), + keyId: aws.StringValue(streamSummaryDescription.KeyId), + openShardsCount: aws.Int64Value(streamSummaryDescription.OpenShardCount), + consumersCount: aws.Int64Value(streamSummaryDescription.ConsumerCount), + }, nil +} + +type kinesisStream struct { + kinesisStreamSummary + openShards []string + closedShards []string +} + +func readKinesisStreamState(conn *kinesis.Kinesis, sn string) (*kinesisStream, error) { + streamSummary, err := readKinesisStreamSummary(conn, sn) + if err != nil { + return nil, err + } + + fullStream := &kinesisStream{ + kinesisStreamSummary: *streamSummary, + } + + nextToken := aws.String("") + listShardsOpts := &kinesis.ListShardsInput{ + StreamName: aws.String(sn), + } + + for nextToken != nil { + if *nextToken != "" { + listShardsOpts = &kinesis.ListShardsInput{ + NextToken: nextToken, + } } - state.keyId = aws.StringValue(page.StreamDescription.KeyId) - return !lastPage - }) - return state, err + + listShardsResponse, err := conn.ListShards(listShardsOpts) + if err != nil { + return nil, err + } + + fullStream.openShards = append(fullStream.openShards, FlattenShards(FilterShards(listShardsResponse.Shards, true))...) + fullStream.closedShards = append(fullStream.closedShards, FlattenShards(FilterShards(listShardsResponse.Shards, false))...) + + nextToken = listShardsResponse.NextToken + } + + return fullStream, nil } func streamStateRefreshFunc(conn *kinesis.Kinesis, sn string) resource.StateRefreshFunc { return func() (interface{}, string, error) { - state, err := readKinesisStreamState(conn, sn) + state, err := readKinesisStreamSummary(conn, sn) if err != nil { if awsErr, ok := err.(awserr.Error); ok { if awsErr.Code() == kinesis.ErrCodeResourceNotFoundException { diff --git a/internal/service/kinesis/stream_data_source.go b/internal/service/kinesis/stream_data_source.go index 4b25045ce06..a0f84834324 100644 --- a/internal/service/kinesis/stream_data_source.go +++ b/internal/service/kinesis/stream_data_source.go @@ -59,6 +59,11 @@ func DataSourceStream() *schema.Resource { Set: schema.HashString, }, + "consumers_count": { + Type: schema.TypeInt, + Computed: true, + }, + "tags": tftags.TagsSchemaComputed(), }, } @@ -83,6 +88,7 @@ func dataSourceStreamRead(d *schema.ResourceData, meta interface{}) error { d.Set("creation_timestamp", state.creationTimestamp) d.Set("retention_period", state.retentionPeriod) d.Set("shard_level_metrics", state.shardLevelMetrics) + d.Set("consumers_count", state.consumersCount) tags, err := ListTags(conn, sn) diff --git a/internal/service/kinesis/stream_data_source_test.go b/internal/service/kinesis/stream_data_source_test.go index d492e282a18..701d7e3f501 100644 --- a/internal/service/kinesis/stream_data_source_test.go +++ b/internal/service/kinesis/stream_data_source_test.go @@ -54,6 +54,7 @@ func TestAccKinesisStreamDataSource_basic(t *testing.T) { resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "retention_period", "72"), resource.TestCheckResourceAttrSet("data.aws_kinesis_stream.test_stream", "creation_timestamp"), resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "tags.Name", "tf-test"), + resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "consumers_count", "1"), ), }, { @@ -70,6 +71,7 @@ func TestAccKinesisStreamDataSource_basic(t *testing.T) { resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "retention_period", "72"), resource.TestCheckResourceAttrSet("data.aws_kinesis_stream.test_stream", "creation_timestamp"), resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "tags.Name", "tf-test"), + resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "consumers_count", "1"), ), }, }, diff --git a/website/docs/d/kinesis_stream.html.markdown b/website/docs/d/kinesis_stream.html.markdown index b6679da1d36..35bef0d122e 100644 --- a/website/docs/d/kinesis_stream.html.markdown +++ b/website/docs/d/kinesis_stream.html.markdown @@ -38,6 +38,7 @@ are exported: * `open_shards` - The list of shard ids in the OPEN state. See [Shard State][2] for more. * `closed_shards` - The list of shard ids in the CLOSED state. See [Shard State][2] for more. * `shard_level_metrics` - A list of shard-level CloudWatch metrics which are enabled for the stream. See [Monitoring with CloudWatch][3] for more. +* `consumers_count` - The number of enhanced fan-out consumers registered with the stream. * `tags` - A map of tags to assigned to the stream. [1]: https://aws.amazon.com/documentation/kinesis/