Skip to content

Commit

Permalink
Merge pull request #17149 from anthonyroach/f-kinesis_stream_consumer
Browse files Browse the repository at this point in the history
Adds Kinesis Stream Consumer resource and data source (third time)
  • Loading branch information
anGie44 authored Mar 18, 2021
2 parents 78b3991 + 48bf260 commit 436f62c
Show file tree
Hide file tree
Showing 12 changed files with 801 additions and 11 deletions.
7 changes: 7 additions & 0 deletions .changelog/17149.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
```release-note:new-data-source
aws_kinesis_stream_consumer
```

```release-note:new-resource
aws_kinesis_stream_consumer
```
107 changes: 107 additions & 0 deletions aws/data_source_aws_kinesis_stream_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package aws

import (
"fmt"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
)

func dataSourceAwsKinesisStreamConsumer() *schema.Resource {
return &schema.Resource{
Read: dataSourceAwsKinesisStreamConsumerRead,

Schema: map[string]*schema.Schema{
"arn": {
Type: schema.TypeString,
Optional: true,
Computed: true,
ValidateFunc: validateArn,
},

"creation_timestamp": {
Type: schema.TypeString,
Computed: true,
},

"name": {
Type: schema.TypeString,
Optional: true,
Computed: true,
},

"status": {
Type: schema.TypeString,
Computed: true,
},

"stream_arn": {
Type: schema.TypeString,
Required: true,
ValidateFunc: validateArn,
},
},
}
}

func dataSourceAwsKinesisStreamConsumerRead(d *schema.ResourceData, meta interface{}) error {
conn := meta.(*AWSClient).kinesisconn

streamArn := d.Get("stream_arn").(string)

input := &kinesis.ListStreamConsumersInput{
StreamARN: aws.String(streamArn),
}

var results []*kinesis.Consumer

err := conn.ListStreamConsumersPages(input, func(page *kinesis.ListStreamConsumersOutput, lastPage bool) bool {
if page == nil {
return !lastPage
}

for _, consumer := range page.Consumers {
if consumer == nil {
continue
}

if v, ok := d.GetOk("name"); ok && v.(string) != aws.StringValue(consumer.ConsumerName) {
continue
}

if v, ok := d.GetOk("arn"); ok && v.(string) != aws.StringValue(consumer.ConsumerARN) {
continue
}

results = append(results, consumer)

}

return !lastPage
})

if err != nil {
return fmt.Errorf("error listing Kinesis Stream Consumers: %w", err)
}

if len(results) == 0 {
return fmt.Errorf("no Kinesis Stream Consumer found matching criteria; try different search")
}

if len(results) > 1 {
return fmt.Errorf("multiple Kinesis Stream Consumers found matching criteria; try different search")
}

consumer := results[0]

d.SetId(aws.StringValue(consumer.ConsumerARN))
d.Set("arn", consumer.ConsumerARN)
d.Set("name", consumer.ConsumerName)
d.Set("status", consumer.ConsumerStatus)
d.Set("stream_arn", streamArn)
d.Set("creation_timestamp", aws.TimeValue(consumer.ConsumerCreationTimestamp).Format(time.RFC3339))

return nil
}
140 changes: 140 additions & 0 deletions aws/data_source_aws_kinesis_stream_consumer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package aws

import (
"fmt"
"testing"

"github.com/hashicorp/terraform-plugin-sdk/v2/helper/acctest"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
)

func TestAccAWSKinesisStreamConsumerDataSource_basic(t *testing.T) {
rName := acctest.RandomWithPrefix("tf-acc-test")
dataSourceName := "data.aws_kinesis_stream_consumer.test"
resourceName := "aws_kinesis_stream_consumer.test"
streamName := "aws_kinesis_stream.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: nil,
Steps: []resource.TestStep{
{
Config: testAccAWSKinesisStreamConsumerDataSourceConfig(rName),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttrPair(dataSourceName, "arn", resourceName, "arn"),
resource.TestCheckResourceAttrPair(dataSourceName, "name", resourceName, "name"),
resource.TestCheckResourceAttrPair(dataSourceName, "stream_arn", streamName, "arn"),
resource.TestCheckResourceAttrSet(dataSourceName, "creation_timestamp"),
resource.TestCheckResourceAttrSet(dataSourceName, "status"),
),
},
},
})
}

func TestAccAWSKinesisStreamConsumerDataSource_Name(t *testing.T) {
rName := acctest.RandomWithPrefix("tf-acc-test")
dataSourceName := "data.aws_kinesis_stream_consumer.test"
resourceName := "aws_kinesis_stream_consumer.test"
streamName := "aws_kinesis_stream.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: nil,
Steps: []resource.TestStep{
{
Config: testAccAWSKinesisStreamConsumerDataSourceConfigName(rName),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttrPair(dataSourceName, "arn", resourceName, "arn"),
resource.TestCheckResourceAttrPair(dataSourceName, "name", resourceName, "name"),
resource.TestCheckResourceAttrPair(dataSourceName, "stream_arn", streamName, "arn"),
resource.TestCheckResourceAttrSet(dataSourceName, "creation_timestamp"),
resource.TestCheckResourceAttrSet(dataSourceName, "status"),
),
},
},
})
}

func TestAccAWSKinesisStreamConsumerDataSource_Arn(t *testing.T) {
rName := acctest.RandomWithPrefix("tf-acc-test")
dataSourceName := "data.aws_kinesis_stream_consumer.test"
resourceName := "aws_kinesis_stream_consumer.test"
streamName := "aws_kinesis_stream.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: nil,
Steps: []resource.TestStep{
{
Config: testAccAWSKinesisStreamConsumerDataSourceConfigArn(rName),
Check: resource.ComposeTestCheckFunc(
resource.TestCheckResourceAttrPair(dataSourceName, "arn", resourceName, "arn"),
resource.TestCheckResourceAttrPair(dataSourceName, "name", resourceName, "name"),
resource.TestCheckResourceAttrPair(dataSourceName, "stream_arn", streamName, "arn"),
resource.TestCheckResourceAttrSet(dataSourceName, "creation_timestamp"),
resource.TestCheckResourceAttrSet(dataSourceName, "status"),
),
},
},
})
}

func testAccAWSKinesisStreamConsumerDataSourceBaseConfig(rName string) string {
return fmt.Sprintf(`
resource "aws_kinesis_stream" "test" {
name = %q
shard_count = 2
}
`, rName)
}

func testAccAWSKinesisStreamConsumerDataSourceConfig(rName string) string {
return composeConfig(
testAccAWSKinesisStreamConsumerDataSourceBaseConfig(rName),
fmt.Sprintf(`
data "aws_kinesis_stream_consumer" "test" {
stream_arn = aws_kinesis_stream_consumer.test.stream_arn
}
resource "aws_kinesis_stream_consumer" "test" {
name = %q
stream_arn = aws_kinesis_stream.test.arn
}
`, rName))
}

func testAccAWSKinesisStreamConsumerDataSourceConfigName(rName string) string {
return composeConfig(
testAccAWSKinesisStreamConsumerDataSourceBaseConfig(rName),
fmt.Sprintf(`
data "aws_kinesis_stream_consumer" "test" {
name = aws_kinesis_stream_consumer.test.name
stream_arn = aws_kinesis_stream_consumer.test.stream_arn
}
resource "aws_kinesis_stream_consumer" "test" {
name = %q
stream_arn = aws_kinesis_stream.test.arn
}
`, rName))
}

func testAccAWSKinesisStreamConsumerDataSourceConfigArn(rName string) string {
return composeConfig(
testAccAWSKinesisStreamConsumerDataSourceBaseConfig(rName),
fmt.Sprintf(`
data "aws_kinesis_stream_consumer" "test" {
arn = aws_kinesis_stream_consumer.test.arn
stream_arn = aws_kinesis_stream_consumer.test.stream_arn
}
resource "aws_kinesis_stream_consumer" "test" {
name = %q
stream_arn = aws_kinesis_stream.test.arn
}
`, rName))
}
25 changes: 25 additions & 0 deletions aws/internal/service/kinesis/finder/finder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package finder

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
)

// StreamConsumerByARN returns the stream consumer corresponding to the specified ARN.
// Returns nil if no stream consumer is found.
func StreamConsumerByARN(conn *kinesis.Kinesis, arn string) (*kinesis.ConsumerDescription, error) {
input := &kinesis.DescribeStreamConsumerInput{
ConsumerARN: aws.String(arn),
}

output, err := conn.DescribeStreamConsumer(input)
if err != nil {
return nil, err
}

if output == nil {
return nil, nil
}

return output.ConsumerDescription, nil
}
30 changes: 30 additions & 0 deletions aws/internal/service/kinesis/waiter/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package waiter

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
"github.com/terraform-providers/terraform-provider-aws/aws/internal/service/kinesis/finder"
)

const (
StreamConsumerStatusNotFound = "NotFound"
StreamConsumerStatusUnknown = "Unknown"
)

// StreamConsumerStatus fetches the StreamConsumer and its Status
func StreamConsumerStatus(conn *kinesis.Kinesis, arn string) resource.StateRefreshFunc {
return func() (interface{}, string, error) {
consumer, err := finder.StreamConsumerByARN(conn, arn)

if err != nil {
return nil, StreamConsumerStatusUnknown, err
}

if consumer == nil {
return nil, StreamConsumerStatusNotFound, nil
}

return consumer, aws.StringValue(consumer.ConsumerStatus), nil
}
}
49 changes: 49 additions & 0 deletions aws/internal/service/kinesis/waiter/waiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package waiter

import (
"time"

"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource"
)

const (
StreamConsumerCreatedTimeout = 5 * time.Minute
StreamConsumerDeletedTimeout = 5 * time.Minute
)

// StreamConsumerCreated waits for an Stream Consumer to return Active
func StreamConsumerCreated(conn *kinesis.Kinesis, arn string) (*kinesis.ConsumerDescription, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{kinesis.ConsumerStatusCreating},
Target: []string{kinesis.ConsumerStatusActive},
Refresh: StreamConsumerStatus(conn, arn),
Timeout: StreamConsumerCreatedTimeout,
}

outputRaw, err := stateConf.WaitForState()

if v, ok := outputRaw.(*kinesis.ConsumerDescription); ok {
return v, err
}

return nil, err
}

// StreamConsumerDeleted waits for a Stream Consumer to be deleted
func StreamConsumerDeleted(conn *kinesis.Kinesis, arn string) (*kinesis.ConsumerDescription, error) {
stateConf := &resource.StateChangeConf{
Pending: []string{kinesis.ConsumerStatusDeleting},
Target: []string{},
Refresh: StreamConsumerStatus(conn, arn),
Timeout: StreamConsumerDeletedTimeout,
}

outputRaw, err := stateConf.WaitForState()

if v, ok := outputRaw.(*kinesis.ConsumerDescription); ok {
return v, err
}

return nil, err
}
2 changes: 2 additions & 0 deletions aws/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ func Provider() *schema.Provider {
"aws_iot_endpoint": dataSourceAwsIotEndpoint(),
"aws_ip_ranges": dataSourceAwsIPRanges(),
"aws_kinesis_stream": dataSourceAwsKinesisStream(),
"aws_kinesis_stream_consumer": dataSourceAwsKinesisStreamConsumer(),
"aws_kms_alias": dataSourceAwsKmsAlias(),
"aws_kms_ciphertext": dataSourceAwsKmsCiphertext(),
"aws_kms_key": dataSourceAwsKmsKey(),
Expand Down Expand Up @@ -770,6 +771,7 @@ func Provider() *schema.Provider {
"aws_kinesisanalyticsv2_application": resourceAwsKinesisAnalyticsV2Application(),
"aws_kinesis_firehose_delivery_stream": resourceAwsKinesisFirehoseDeliveryStream(),
"aws_kinesis_stream": resourceAwsKinesisStream(),
"aws_kinesis_stream_consumer": resourceAwsKinesisStreamConsumer(),
"aws_kinesis_video_stream": resourceAwsKinesisVideoStream(),
"aws_kms_alias": resourceAwsKmsAlias(),
"aws_kms_external_key": resourceAwsKmsExternalKey(),
Expand Down
Loading

0 comments on commit 436f62c

Please sign in to comment.