Skip to content

Commit

Permalink
Improve reading kinesis stream state
Browse files Browse the repository at this point in the history
  • Loading branch information
Hristo Stoychev authored and tacho committed Nov 23, 2021
1 parent a098d25 commit dbc1f64
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 28 deletions.
99 changes: 71 additions & 28 deletions internal/service/kinesis/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@ func resourceStreamCreate(d *schema.ResourceData, meta interface{}) error {
sn, err)
}

s := streamRaw.(*kinesisStreamState)
s := streamRaw.(*kinesisStreamSummary)
d.SetId(s.arn)
d.Set("arn", s.arn)
d.Set("shard_count", len(s.openShards))
d.Set("shard_count", s.openShardsCount)

return resourceStreamUpdate(d, meta)
}
Expand Down Expand Up @@ -190,7 +190,7 @@ func resourceStreamRead(d *schema.ResourceData, meta interface{}) error {

sn := d.Get("name").(string)

state, err := readKinesisStreamState(conn, sn)
state, err := readKinesisStreamSummary(conn, sn)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == kinesis.ErrCodeResourceNotFoundException {
Expand All @@ -204,7 +204,7 @@ func resourceStreamRead(d *schema.ResourceData, meta interface{}) error {
}
d.SetId(state.arn)
d.Set("arn", state.arn)
d.Set("shard_count", len(state.openShards))
d.Set("shard_count", state.openShardsCount)
d.Set("retention_period", state.retentionPeriod)

d.Set("encryption_type", state.encryptionType)
Expand Down Expand Up @@ -446,47 +446,90 @@ func updateKinesisShardLevelMetrics(conn *kinesis.Kinesis, d *schema.ResourceDat
return nil
}

type kinesisStreamState struct {
type kinesisStreamSummary struct {
arn string
name string
creationTimestamp int64
status string
retentionPeriod int64
openShards []string
closedShards []string
shardLevelMetrics []string
encryptionType string
keyId string
openShardsCount int64
consumersCount int64
}

func readKinesisStreamState(conn *kinesis.Kinesis, sn string) (*kinesisStreamState, error) {
describeOpts := &kinesis.DescribeStreamInput{
func readKinesisStreamSummary(conn *kinesis.Kinesis, sn string) (*kinesisStreamSummary, error) {
describeOpts := &kinesis.DescribeStreamSummaryInput{
StreamName: aws.String(sn),
}

state := &kinesisStreamState{}
err := conn.DescribeStreamPages(describeOpts, func(page *kinesis.DescribeStreamOutput, lastPage bool) (shouldContinue bool) {
state.arn = aws.StringValue(page.StreamDescription.StreamARN)
state.creationTimestamp = aws.TimeValue(page.StreamDescription.StreamCreationTimestamp).Unix()
state.status = aws.StringValue(page.StreamDescription.StreamStatus)
state.retentionPeriod = aws.Int64Value(page.StreamDescription.RetentionPeriodHours)
state.openShards = append(state.openShards, FlattenShards(FilterShards(page.StreamDescription.Shards, true))...)
state.closedShards = append(state.closedShards, FlattenShards(FilterShards(page.StreamDescription.Shards, false))...)
state.shardLevelMetrics = FlattenShardLevelMetrics(page.StreamDescription.EnhancedMonitoring)
// EncryptionType can be nil in certain APIs, e.g. AWS China
if page.StreamDescription.EncryptionType != nil {
state.encryptionType = aws.StringValue(page.StreamDescription.EncryptionType)
} else {
state.encryptionType = kinesis.EncryptionTypeNone
streamSummaryResponse, err := conn.DescribeStreamSummary(describeOpts)
if err != nil {
return nil, err
}

streamSummaryDescription := streamSummaryResponse.StreamDescriptionSummary

return &kinesisStreamSummary{
arn: aws.StringValue(streamSummaryDescription.StreamARN),
name: aws.StringValue(streamSummaryDescription.StreamName),
creationTimestamp: aws.TimeValue(streamSummaryDescription.StreamCreationTimestamp).Unix(),
status: aws.StringValue(streamSummaryDescription.StreamStatus),
retentionPeriod: aws.Int64Value(streamSummaryDescription.RetentionPeriodHours),
shardLevelMetrics: FlattenShardLevelMetrics(streamSummaryDescription.EnhancedMonitoring),
encryptionType: aws.StringValue(streamSummaryDescription.EncryptionType),
keyId: aws.StringValue(streamSummaryDescription.KeyId),
openShardsCount: aws.Int64Value(streamSummaryDescription.OpenShardCount),
consumersCount: aws.Int64Value(streamSummaryDescription.ConsumerCount),
}, nil
}

type kinesisStream struct {
kinesisStreamSummary
openShards []string
closedShards []string
}

func readKinesisStreamState(conn *kinesis.Kinesis, sn string) (*kinesisStream, error) {
streamSummary, err := readKinesisStreamSummary(conn, sn)
if err != nil {
return nil, err
}

fullStream := &kinesisStream{
kinesisStreamSummary: *streamSummary,
}

nextToken := aws.String("")
listShardsOpts := &kinesis.ListShardsInput{
StreamName: aws.String(sn),
}

for nextToken != nil {
if *nextToken != "" {
listShardsOpts = &kinesis.ListShardsInput{
NextToken: nextToken,
}
}
state.keyId = aws.StringValue(page.StreamDescription.KeyId)
return !lastPage
})
return state, err

listShardsResponse, err := conn.ListShards(listShardsOpts)
if err != nil {
return nil, err
}

fullStream.openShards = append(fullStream.openShards, FlattenShards(FilterShards(listShardsResponse.Shards, true))...)
fullStream.closedShards = append(fullStream.closedShards, FlattenShards(FilterShards(listShardsResponse.Shards, false))...)

nextToken = listShardsResponse.NextToken
}

return fullStream, nil
}

func streamStateRefreshFunc(conn *kinesis.Kinesis, sn string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
state, err := readKinesisStreamState(conn, sn)
state, err := readKinesisStreamSummary(conn, sn)
if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
if awsErr.Code() == kinesis.ErrCodeResourceNotFoundException {
Expand Down
6 changes: 6 additions & 0 deletions internal/service/kinesis/stream_data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ func DataSourceStream() *schema.Resource {
Set: schema.HashString,
},

"consumers_count": {
Type: schema.TypeInt,
Computed: true,
},

"tags": tftags.TagsSchemaComputed(),
},
}
Expand All @@ -83,6 +88,7 @@ func dataSourceStreamRead(d *schema.ResourceData, meta interface{}) error {
d.Set("creation_timestamp", state.creationTimestamp)
d.Set("retention_period", state.retentionPeriod)
d.Set("shard_level_metrics", state.shardLevelMetrics)
d.Set("consumers_count", state.consumersCount)

tags, err := ListTags(conn, sn)

Expand Down
2 changes: 2 additions & 0 deletions internal/service/kinesis/stream_data_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func TestAccKinesisStreamDataSource_basic(t *testing.T) {
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "retention_period", "72"),
resource.TestCheckResourceAttrSet("data.aws_kinesis_stream.test_stream", "creation_timestamp"),
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "tags.Name", "tf-test"),
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "consumers_count", "1"),
),
},
{
Expand All @@ -70,6 +71,7 @@ func TestAccKinesisStreamDataSource_basic(t *testing.T) {
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "retention_period", "72"),
resource.TestCheckResourceAttrSet("data.aws_kinesis_stream.test_stream", "creation_timestamp"),
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "tags.Name", "tf-test"),
resource.TestCheckResourceAttr("data.aws_kinesis_stream.test_stream", "consumers_count", "1"),
),
},
},
Expand Down
1 change: 1 addition & 0 deletions website/docs/d/kinesis_stream.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ are exported:
* `open_shards` - The list of shard ids in the OPEN state. See [Shard State][2] for more.
* `closed_shards` - The list of shard ids in the CLOSED state. See [Shard State][2] for more.
* `shard_level_metrics` - A list of shard-level CloudWatch metrics which are enabled for the stream. See [Monitoring with CloudWatch][3] for more.
* `consumers_count` - The number of enhanced fan-out consumers registered with the stream.
* `tags` - A map of tags to assigned to the stream.

[1]: https://aws.amazon.com/documentation/kinesis/
Expand Down

0 comments on commit dbc1f64

Please sign in to comment.