Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds Kinesis Stream Consumer resource and data source (third time) #17149

Merged
merged 4 commits into from
Mar 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changelog/17149.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
```release-note:new-data-source
aws_kinesis_stream_consumer
```

```release-note:new-resource
aws_kinesis_stream_consumer
```
107 changes: 107 additions & 0 deletions aws/data_source_aws_kinesis_stream_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package aws

import (
"fmt"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
)

func dataSourceAwsKinesisStreamConsumer() *schema.Resource {
return &schema.Resource{
Read: dataSourceAwsKinesisStreamConsumerRead,

Schema: map[string]*schema.Schema{
"arn": {
Type: schema.TypeString,
Optional: true,
Computed: true,
ValidateFunc: validateArn,
},

"creation_timestamp": {
Type: schema.TypeString,
Computed: true,
},

"name": {
Type: schema.TypeString,
Optional: true,
Computed: true,
},

"status": {
Type: schema.TypeString,
Computed: true,
},

"stream_arn": {
Type: schema.TypeString,
Required: true,
ValidateFunc: validateArn,
},
},
}
}

func dataSourceAwsKinesisStreamConsumerRead(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).kinesisconn

streamArn := d.Get("stream_arn").(string)

input := &kinesis.ListStreamConsumersInput{
StreamARN: aws.String(streamArn),
}

var results []*kinesis.Consumer

err := conn.ListStreamConsumersPages(input, func(page *kinesis.ListStreamConsumersOutput, lastPage bool) bool {
if page == nil {
return !lastPage
}

for _, consumer := range page.Consumers {
if consumer == nil {
continue
}

if v, ok := d.GetOk("name"); ok && v.(string) != aws.StringValue(consumer.ConsumerName) {
continue
}

if v, ok := d.GetOk("arn"); ok && v.(string) != aws.StringValue(consumer.ConsumerARN) {
continue
}

results = append(results, consumer)

}

return !lastPage
})

if err != nil {
return fmt.Errorf("error listing Kinesis Stream Consumers: %w", err)
}

if len(results) == 0 {
return fmt.Errorf("no Kinesis Stream Consumer found matching criteria; try different search")
}

if len(results) > 1 {
return fmt.Errorf("multiple Kinesis Stream Consumers found matching criteria; try different search")
}

consumer := results[0]

d.SetId(aws.StringValue(consumer.ConsumerARN))
d.Set("arn", consumer.ConsumerARN)
d.Set("name", consumer.ConsumerName)
d.Set("status", consumer.ConsumerStatus)
d.Set("stream_arn", streamArn)
d.Set("creation_timestamp", aws.TimeValue(consumer.ConsumerCreationTimestamp).Format(time.RFC3339))

return nil
}
140 changes: 140 additions & 0 deletions aws/data_source_aws_kinesis_stream_consumer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package aws

import (
"fmt"
"testing"

"github.com/hashicorp/terraform-plugin-sdk/v2/helper/acctest"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
)

func TestAccAWSKinesisStreamConsumerDataSource_basic(t *testing.T) {
rName := acctest.RandomWithPrefix("tf-acc-test")
dataSourceName := "data.aws_kinesis_stream_consumer.test"
resourceName := "aws_kinesis_stream_consumer.test"
streamName := "aws_kinesis_stream.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: nil,
Steps: []resource.TestStep{
{
Config: testAccAWSKinesisStreamConsumerDataSourceConfig(rName),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttrPair(dataSourceName, "arn", resourceName, "arn"),
resource.TestCheckResourceAttrPair(dataSourceName, "name", resourceName, "name"),
resource.TestCheckResourceAttrPair(dataSourceName, "stream_arn", streamName, "arn"),
resource.TestCheckResourceAttrSet(dataSourceName, "creation_timestamp"),
resource.TestCheckResourceAttrSet(dataSourceName, "status"),
),
},
},
})
}

func TestAccAWSKinesisStreamConsumerDataSource_Name(t *testing.T) {
rName := acctest.RandomWithPrefix("tf-acc-test")
dataSourceName := "data.aws_kinesis_stream_consumer.test"
resourceName := "aws_kinesis_stream_consumer.test"
streamName := "aws_kinesis_stream.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: nil,
Steps: []resource.TestStep{
{
Config: testAccAWSKinesisStreamConsumerDataSourceConfigName(rName),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttrPair(dataSourceName, "arn", resourceName, "arn"),
resource.TestCheckResourceAttrPair(dataSourceName, "name", resourceName, "name"),
resource.TestCheckResourceAttrPair(dataSourceName, "stream_arn", streamName, "arn"),
resource.TestCheckResourceAttrSet(dataSourceName, "creation_timestamp"),
resource.TestCheckResourceAttrSet(dataSourceName, "status"),
),
},
},
})
}

func TestAccAWSKinesisStreamConsumerDataSource_Arn(t *testing.T) {
rName := acctest.RandomWithPrefix("tf-acc-test")
dataSourceName := "data.aws_kinesis_stream_consumer.test"
resourceName := "aws_kinesis_stream_consumer.test"
streamName := "aws_kinesis_stream.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: nil,
Steps: []resource.TestStep{
{
Config: testAccAWSKinesisStreamConsumerDataSourceConfigArn(rName),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttrPair(dataSourceName, "arn", resourceName, "arn"),
resource.TestCheckResourceAttrPair(dataSourceName, "name", resourceName, "name"),
resource.TestCheckResourceAttrPair(dataSourceName, "stream_arn", streamName, "arn"),
resource.TestCheckResourceAttrSet(dataSourceName, "creation_timestamp"),
resource.TestCheckResourceAttrSet(dataSourceName, "status"),
),
},
},
})
}

func testAccAWSKinesisStreamConsumerDataSourceBaseConfig(rName string) string {
return fmt.Sprintf(`
resource "aws_kinesis_stream" "test" {
name = %q
shard_count = 2
}
`, rName)
}

func testAccAWSKinesisStreamConsumerDataSourceConfig(rName string) string {
return composeConfig(
testAccAWSKinesisStreamConsumerDataSourceBaseConfig(rName),
fmt.Sprintf(`
data "aws_kinesis_stream_consumer" "test" {
stream_arn = aws_kinesis_stream_consumer.test.stream_arn
}
resource "aws_kinesis_stream_consumer" "test" {
name = %q
stream_arn = aws_kinesis_stream.test.arn
}
`, rName))
}

func testAccAWSKinesisStreamConsumerDataSourceConfigName(rName string) string {
return composeConfig(
testAccAWSKinesisStreamConsumerDataSourceBaseConfig(rName),
fmt.Sprintf(`
data "aws_kinesis_stream_consumer" "test" {
name = aws_kinesis_stream_consumer.test.name
stream_arn = aws_kinesis_stream_consumer.test.stream_arn
}
resource "aws_kinesis_stream_consumer" "test" {
name = %q
stream_arn = aws_kinesis_stream.test.arn
}
`, rName))
}

func testAccAWSKinesisStreamConsumerDataSourceConfigArn(rName string) string {
return composeConfig(
testAccAWSKinesisStreamConsumerDataSourceBaseConfig(rName),
fmt.Sprintf(`
data "aws_kinesis_stream_consumer" "test" {
arn = aws_kinesis_stream_consumer.test.arn
stream_arn = aws_kinesis_stream_consumer.test.stream_arn
}
resource "aws_kinesis_stream_consumer" "test" {
name = %q
stream_arn = aws_kinesis_stream.test.arn
}
`, rName))
}
25 changes: 25 additions & 0 deletions aws/internal/service/kinesis/finder/finder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package finder

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
)

// StreamConsumerByARN returns the stream consumer corresponding to the specified ARN.
// Returns nil if no stream consumer is found.
func StreamConsumerByARN(conn *kinesis.Kinesis, arn string) (*kinesis.ConsumerDescription, error) {
input := &kinesis.DescribeStreamConsumerInput{
ConsumerARN: aws.String(arn),
}

output, err := conn.DescribeStreamConsumer(input)
if err != nil {
return nil, err
}

if output == nil {
return nil, nil
}

return output.ConsumerDescription, nil
}
30 changes: 30 additions & 0 deletions aws/internal/service/kinesis/waiter/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package waiter

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/service/kinesis/finder"
)

const (
StreamConsumerStatusNotFound = "NotFound"
StreamConsumerStatusUnknown = "Unknown"
)

// StreamConsumerStatus fetches the StreamConsumer and its Status
func StreamConsumerStatus(conn *kinesis.Kinesis, arn string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
consumer, err := finder.StreamConsumerByARN(conn, arn)

if err != nil {
return nil, StreamConsumerStatusUnknown, err
}

if consumer == nil {
return nil, StreamConsumerStatusNotFound, nil
}

return consumer, aws.StringValue(consumer.ConsumerStatus), nil
}
}
49 changes: 49 additions & 0 deletions aws/internal/service/kinesis/waiter/waiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package waiter

import (
"time"

"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
)

const (
StreamConsumerCreatedTimeout = 5 * time.Minute
StreamConsumerDeletedTimeout = 5 * time.Minute
)

// StreamConsumerCreated waits for an Stream Consumer to return Active
func StreamConsumerCreated(conn *kinesis.Kinesis, arn string) (*kinesis.ConsumerDescription, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{kinesis.ConsumerStatusCreating},
Target: []string{kinesis.ConsumerStatusActive},
Refresh: StreamConsumerStatus(conn, arn),
Timeout: StreamConsumerCreatedTimeout,
}

outputRaw, err := stateConf.WaitForState()

if v, ok := outputRaw.(*kinesis.ConsumerDescription); ok {
return v, err
}

return nil, err
}

// StreamConsumerDeleted waits for a Stream Consumer to be deleted
func StreamConsumerDeleted(conn *kinesis.Kinesis, arn string) (*kinesis.ConsumerDescription, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{kinesis.ConsumerStatusDeleting},
Target: []string{},
Refresh: StreamConsumerStatus(conn, arn),
Timeout: StreamConsumerDeletedTimeout,
}

outputRaw, err := stateConf.WaitForState()

if v, ok := outputRaw.(*kinesis.ConsumerDescription); ok {
return v, err
}

return nil, err
}
2 changes: 2 additions & 0 deletions aws/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ func Provider() *schema.Provider {
"aws_iot_endpoint": dataSourceAwsIotEndpoint(),
"aws_ip_ranges": dataSourceAwsIPRanges(),
"aws_kinesis_stream": dataSourceAwsKinesisStream(),
"aws_kinesis_stream_consumer": dataSourceAwsKinesisStreamConsumer(),
"aws_kms_alias": dataSourceAwsKmsAlias(),
"aws_kms_ciphertext": dataSourceAwsKmsCiphertext(),
"aws_kms_key": dataSourceAwsKmsKey(),
Expand Down Expand Up @@ -770,6 +771,7 @@ func Provider() *schema.Provider {
"aws_kinesisanalyticsv2_application": resourceAwsKinesisAnalyticsV2Application(),
"aws_kinesis_firehose_delivery_stream": resourceAwsKinesisFirehoseDeliveryStream(),
"aws_kinesis_stream": resourceAwsKinesisStream(),
"aws_kinesis_stream_consumer": resourceAwsKinesisStreamConsumer(),
"aws_kinesis_video_stream": resourceAwsKinesisVideoStream(),
"aws_kms_alias": resourceAwsKmsAlias(),
"aws_kms_external_key": resourceAwsKmsExternalKey(),
Expand Down
Loading