Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve reading kinesis stream state #15489

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 0 additions & 25 deletions internal/service/kinesis/find.go

This file was deleted.

29 changes: 0 additions & 29 deletions internal/service/kinesis/status.go

This file was deleted.

75 changes: 25 additions & 50 deletions internal/service/kinesis/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func resourceStreamRead(d *schema.ResourceData, meta interface{}) error {
ignoreTagsConfig := meta.(*conns.AWSClient).IgnoreTagsConfig
name := d.Get("name").(string)

output, err := FindStreamByName(conn, name)
stream, err := FindStreamByName(conn, name)

if !d.IsNewResource() && tfresource.NotFound(err) {
log.Printf("[WARN] Kinesis Stream (%s) not found, removing from state", d.Id())
Expand All @@ -264,29 +264,29 @@ func resourceStreamRead(d *schema.ResourceData, meta interface{}) error {
return fmt.Errorf("error reading Kinesis Stream (%s): %w", name, err)
}

d.Set("arn", output.StreamARN)
d.Set("encryption_type", output.EncryptionType)
d.Set("kms_key_id", output.KeyId)
d.Set("name", output.StreamName)
d.Set("retention_period", output.RetentionPeriodHours)
d.Set("arn", stream.StreamARN)
d.Set("encryption_type", stream.EncryptionType)
d.Set("kms_key_id", stream.KeyId)
d.Set("name", stream.StreamName)
d.Set("retention_period", stream.RetentionPeriodHours)

streamMode := kinesis.StreamModeProvisioned
if details := output.StreamModeDetails; details != nil {
if details := stream.StreamModeDetails; details != nil {
streamMode = aws.StringValue(details.StreamMode)
}
if streamMode == kinesis.StreamModeProvisioned {
d.Set("shard_count", len(filterShards(output.Shards, true)))
d.Set("shard_count", stream.OpenShardCount)
} else {
d.Set("shard_count", nil)
}

var shardLevelMetrics []*string
for _, v := range output.EnhancedMonitoring {
for _, v := range stream.EnhancedMonitoring {
shardLevelMetrics = append(shardLevelMetrics, v.ShardLevelMetrics...)
}
d.Set("shard_level_metrics", aws.StringValueSlice(shardLevelMetrics))

if details := output.StreamModeDetails; details != nil {
if details := stream.StreamModeDetails; details != nil {
if err := d.Set("stream_mode_details", []interface{}{flattenStreamModeDetails(details)}); err != nil {
return fmt.Errorf("error setting stream_mode_details: %w", err)
}
Expand Down Expand Up @@ -557,26 +557,12 @@ func resourceStreamImport(d *schema.ResourceData, meta interface{}) ([]*schema.R
return []*schema.ResourceData{d}, nil
}

func FindStreamByName(conn *kinesis.Kinesis, name string) (*kinesis.StreamDescription, error) {
var output *kinesis.StreamDescription
input := &kinesis.DescribeStreamInput{
func FindStreamByName(conn *kinesis.Kinesis, name string) (*kinesis.StreamDescriptionSummary, error) {
input := &kinesis.DescribeStreamSummaryInput{
StreamName: aws.String(name),
}

err := conn.DescribeStreamPages(input, func(page *kinesis.DescribeStreamOutput, lastPage bool) bool {
if page == nil || page.StreamDescription == nil {
return !lastPage
}

if output == nil {
output = &kinesis.StreamDescription{}
*output = *page.StreamDescription
} else {
output.Shards = append(output.Shards, page.StreamDescription.Shards...)
}

return !lastPage
})
output, err := conn.DescribeStreamSummary(input)

if tfawserr.ErrCodeEquals(err, kinesis.ErrCodeResourceNotFoundException) {
return nil, &resource.NotFoundError{
Expand All @@ -585,11 +571,15 @@ func FindStreamByName(conn *kinesis.Kinesis, name string) (*kinesis.StreamDescri
}
}

if output == nil {
if err != nil {
return nil, err
}

if output == nil || output.StreamDescriptionSummary == nil {
return nil, tfresource.NewEmptyResultError(input)
}

return output, nil
return output.StreamDescriptionSummary, nil
}

func streamStatus(conn *kinesis.Kinesis, name string) resource.StateRefreshFunc {
Expand All @@ -608,7 +598,7 @@ func streamStatus(conn *kinesis.Kinesis, name string) resource.StateRefreshFunc
}
}

func waitStreamCreated(conn *kinesis.Kinesis, name string, timeout time.Duration) (*kinesis.StreamDescription, error) {
func waitStreamCreated(conn *kinesis.Kinesis, name string, timeout time.Duration) (*kinesis.StreamDescriptionSummary, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{kinesis.StreamStatusCreating},
Target: []string{kinesis.StreamStatusActive},
Expand All @@ -620,14 +610,14 @@ func waitStreamCreated(conn *kinesis.Kinesis, name string, timeout time.Duration

outputRaw, err := stateConf.WaitForState()

if output, ok := outputRaw.(*kinesis.StreamDescription); ok {
if output, ok := outputRaw.(*kinesis.StreamDescriptionSummary); ok {
return output, err
}

return nil, err
}

func waitStreamDeleted(conn *kinesis.Kinesis, name string, timeout time.Duration) (*kinesis.StreamDescription, error) {
func waitStreamDeleted(conn *kinesis.Kinesis, name string, timeout time.Duration) (*kinesis.StreamDescriptionSummary, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{kinesis.StreamStatusDeleting},
Target: []string{},
Expand All @@ -639,14 +629,14 @@ func waitStreamDeleted(conn *kinesis.Kinesis, name string, timeout time.Duration

outputRaw, err := stateConf.WaitForState()

if output, ok := outputRaw.(*kinesis.StreamDescription); ok {
if output, ok := outputRaw.(*kinesis.StreamDescriptionSummary); ok {
return output, err
}

return nil, err
}

func waitStreamUpdated(conn *kinesis.Kinesis, name string, timeout time.Duration) (*kinesis.StreamDescription, error) { //nolint:unparam
func waitStreamUpdated(conn *kinesis.Kinesis, name string, timeout time.Duration) (*kinesis.StreamDescriptionSummary, error) { //nolint:unparam
stateConf := &resource.StateChangeConf{
Pending: []string{kinesis.StreamStatusUpdating},
Target: []string{kinesis.StreamStatusActive},
Expand All @@ -658,28 +648,13 @@ func waitStreamUpdated(conn *kinesis.Kinesis, name string, timeout time.Duration

outputRaw, err := stateConf.WaitForState()

if output, ok := outputRaw.(*kinesis.StreamDescription); ok {
if output, ok := outputRaw.(*kinesis.StreamDescriptionSummary); ok {
return output, err
}

return nil, err
}

// See http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-using-sdk-java-resharding-merge.html
func filterShards(shards []*kinesis.Shard, open bool) []*kinesis.Shard {
var output []*kinesis.Shard

for _, shard := range shards {
if open && shard.SequenceNumberRange.EndingSequenceNumber == nil {
output = append(output, shard)
} else if !open && shard.SequenceNumberRange.EndingSequenceNumber != nil {
output = append(output, shard)
}
}

return output
}

func getStreamMode(d *schema.ResourceData) string {
streamMode, ok := d.GetOk("stream_mode_details.0.stream_mode")
if !ok {
Expand Down
Loading