From 485a9645200fd27ecf06371ffd7e0aee36e67c51 Mon Sep 17 00:00:00 2001 From: Anthony Roach Date: Sat, 16 Jan 2021 13:27:33 -0500 Subject: [PATCH 1/4] add aws_kinesis_stream_consumer resource and data source Adapted from original commits from @rockycore at yomagroup:feature/kinesis-stream-consumer. Changes on top of the original commits: * rebased onto master, and squashed * added warning resource removal on not found during read * set the resource id immediately after creation * fixed import id syntax to match the documentation * added an import test * removed Optional from arn output in schema --- ...data_source_aws_kinesis_stream_consumer.go | 57 ++++ ...source_aws_kinesis_stream_consumer_test.go | 35 +++ aws/provider.go | 2 + aws/resource_aws_kinesis_stream_consumer.go | 229 ++++++++++++++++ ...source_aws_kinesis_stream_consumer_test.go | 258 ++++++++++++++++++ .../d/kinesis_stream_consumer.html.markdown | 41 +++ .../r/kinesis_stream_consumer.html.markdown | 68 +++++ 7 files changed, 690 insertions(+) create mode 100644 aws/data_source_aws_kinesis_stream_consumer.go create mode 100644 aws/data_source_aws_kinesis_stream_consumer_test.go create mode 100644 aws/resource_aws_kinesis_stream_consumer.go create mode 100644 aws/resource_aws_kinesis_stream_consumer_test.go create mode 100644 website/docs/d/kinesis_stream_consumer.html.markdown create mode 100644 website/docs/r/kinesis_stream_consumer.html.markdown diff --git a/aws/data_source_aws_kinesis_stream_consumer.go b/aws/data_source_aws_kinesis_stream_consumer.go new file mode 100644 index 00000000000..3bc611881a2 --- /dev/null +++ b/aws/data_source_aws_kinesis_stream_consumer.go @@ -0,0 +1,57 @@ +package aws + +import ( + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" +) + +func dataSourceAwsKinesisStreamConsumer() *schema.Resource { + return &schema.Resource{ + Read: dataSourceAwsKinesisStreamConsumerRead, + + Schema: map[string]*schema.Schema{ + "name": { + Type: schema.TypeString, + Required: true, + }, + + "stream_arn": { + Type: schema.TypeString, + Required: true, + }, + + "arn": { + Type: schema.TypeString, + Computed: true, + }, + + "creation_timestamp": { + Type: schema.TypeInt, + Computed: true, + }, + + "status": { + Type: schema.TypeString, + Computed: true, + }, + }, + } +} + +func dataSourceAwsKinesisStreamConsumerRead(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).kinesisconn + cn := d.Get("name").(string) + sa := d.Get("stream_arn").(string) + + state, err := readKinesisStreamConsumerState(conn, cn, sa) + if err != nil { + return err + } + d.SetId(state.arn) + d.Set("arn", state.arn) + d.Set("name", cn) + d.Set("stream_arn", sa) + d.Set("status", state.status) + d.Set("creation_timestamp", state.creationTimestamp) + + return nil +} diff --git a/aws/data_source_aws_kinesis_stream_consumer_test.go b/aws/data_source_aws_kinesis_stream_consumer_test.go new file mode 100644 index 00000000000..d0a0e952b88 --- /dev/null +++ b/aws/data_source_aws_kinesis_stream_consumer_test.go @@ -0,0 +1,35 @@ +package aws + +import ( + "fmt" + "testing" + + "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" +) + +func TestAccAWSKinesisStreamConsumerDataSource_basic(t *testing.T) { + var stream kinesis.StreamDescription + var consumer kinesis.ConsumerDescription + config := createAccKinesisStreamConsumerConfig() + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckKinesisStreamConsumerDestroy, + Steps: []resource.TestStep{ + { + Config: config.data(), + Check: resource.ComposeTestCheckFunc( + testAccCheckKinesisStreamExists(config.stream.getName(), &stream), + testAccCheckKinesisStreamConsumerExists(config, -1, &consumer), + resource.TestCheckResourceAttrSet(fmt.Sprintf("data.%s", config.getName()), "arn"), + resource.TestCheckResourceAttrSet(fmt.Sprintf("data.%s", config.getName()), "stream_arn"), + resource.TestCheckResourceAttr(fmt.Sprintf("data.%s", config.getName()), "name", config.getConsumerName()), + resource.TestCheckResourceAttr(fmt.Sprintf("data.%s", config.getName()), "status", "ACTIVE"), + resource.TestCheckResourceAttrSet(fmt.Sprintf("data.%s", config.getName()), "creation_timestamp"), + ), + }, + }, + }) +} diff --git a/aws/provider.go b/aws/provider.go index c44cbf810ca..1b6517c583b 100644 --- a/aws/provider.go +++ b/aws/provider.go @@ -290,6 +290,7 @@ func Provider() *schema.Provider { "aws_iot_endpoint": dataSourceAwsIotEndpoint(), "aws_ip_ranges": dataSourceAwsIPRanges(), "aws_kinesis_stream": dataSourceAwsKinesisStream(), + "aws_kinesis_stream_consumer": dataSourceAwsKinesisStreamConsumer(), "aws_kms_alias": dataSourceAwsKmsAlias(), "aws_kms_ciphertext": dataSourceAwsKmsCiphertext(), "aws_kms_key": dataSourceAwsKmsKey(), @@ -770,6 +771,7 @@ func Provider() *schema.Provider { "aws_kinesisanalyticsv2_application": resourceAwsKinesisAnalyticsV2Application(), "aws_kinesis_firehose_delivery_stream": resourceAwsKinesisFirehoseDeliveryStream(), "aws_kinesis_stream": resourceAwsKinesisStream(), + "aws_kinesis_stream_consumer": resourceAwsKinesisStreamConsumer(), "aws_kinesis_video_stream": resourceAwsKinesisVideoStream(), "aws_kms_alias": resourceAwsKmsAlias(), "aws_kms_external_key": resourceAwsKmsExternalKey(), diff --git a/aws/resource_aws_kinesis_stream_consumer.go b/aws/resource_aws_kinesis_stream_consumer.go new file mode 100644 index 00000000000..9dc34e75147 --- /dev/null +++ b/aws/resource_aws_kinesis_stream_consumer.go @@ -0,0 +1,229 @@ +package aws + +import ( + "fmt" + "log" + "regexp" + "strings" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" +) + +func resourceAwsKinesisStreamConsumer() *schema.Resource { + return &schema.Resource{ + Create: resourceAwsKinesisStreamConsumerCreate, + Read: resourceAwsKinesisStreamConsumerRead, + Delete: resourceAwsKinesisStreamConsumerDelete, + + Timeouts: &schema.ResourceTimeout{ + Create: schema.DefaultTimeout(5 * time.Minute), + Delete: schema.DefaultTimeout(120 * time.Minute), + }, + + Importer: &schema.ResourceImporter{ + State: func(d *schema.ResourceData, meta interface{}) ([]*schema.ResourceData, error) { + importParts, err := validateResourceAwsKinesisStreamConsumerImportString(d.Id()) + if err != nil { + return nil, err + } + d.Set("name", importParts[0]) + d.Set("stream_arn", importParts[1]) + return []*schema.ResourceData{d}, nil + }, + }, + + Schema: map[string]*schema.Schema{ + "name": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + }, + + "stream_arn": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + }, + + "arn": { + Type: schema.TypeString, + Computed: true, + }, + }, + } +} + +func resourceAwsKinesisStreamConsumerCreate(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).kinesisconn + cn := d.Get("name").(string) + sa := d.Get("stream_arn").(string) + + createOpts := &kinesis.RegisterStreamConsumerInput{ + ConsumerName: aws.String(cn), + StreamARN: aws.String(sa), + } + + createOutput, err := conn.RegisterStreamConsumer(createOpts) + if err != nil { + return fmt.Errorf("Unable to create stream consumer: %s", err) + } + + arn := aws.StringValue(createOutput.Consumer.ConsumerARN) + + d.SetId(arn) + d.Set("arn", arn) + + // No error, wait for ACTIVE state + stateConf := &resource.StateChangeConf{ + Pending: []string{"CREATING"}, + Target: []string{"ACTIVE"}, + Refresh: streamConsumerStateRefreshFunc(conn, cn, sa), + Timeout: d.Timeout(schema.TimeoutCreate), + Delay: 10 * time.Second, + MinTimeout: 3 * time.Second, + } + + _, err = stateConf.WaitForState() + if err != nil { + return fmt.Errorf( + "Error waiting for Kinesis Stream Consumer (%s) to become active: %s", + cn, err) + } + + return resourceAwsKinesisStreamConsumerRead(d, meta) +} + +func resourceAwsKinesisStreamConsumerRead(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).kinesisconn + cn := d.Get("name").(string) + sa := d.Get("stream_arn").(string) + + state, err := readKinesisStreamConsumerState(conn, cn, sa) + if err != nil { + if awsErr, ok := err.(awserr.Error); ok { + if awsErr.Code() == "ResourceNotFoundException" { + log.Printf("[WARN] Kinesis Stream Consumer (%s) not found, removing from state", d.Id()) + d.SetId("") + return nil + } + return fmt.Errorf("Error reading Kinesis Stream Consumer: \"%s\", code: \"%s\"", awsErr.Message(), awsErr.Code()) + } + return err + + } + d.SetId(state.arn) + d.Set("arn", state.arn) + return nil +} + +func resourceAwsKinesisStreamConsumerDelete(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).kinesisconn + cn := d.Get("name").(string) + sa := d.Get("stream_arn").(string) + + log.Printf("[DEBUG] Deregister Stream Consumer: %s", cn) + _, err := conn.DeregisterStreamConsumer(&kinesis.DeregisterStreamConsumerInput{ + ConsumerName: aws.String(cn), + StreamARN: aws.String(sa), + }) + if err != nil { + // Missing Stream Consumer or Stream (API error) + if awsErr, ok := err.(awserr.Error); ok { + if awsErr.Code() == "ResourceNotFoundException" { + log.Printf("[WARN] No Stream Consumer found: %v", cn) + return nil + } + } + return err + } + + stateConf := &resource.StateChangeConf{ + Pending: []string{"DELETING"}, + Target: []string{"DESTROYED"}, + Refresh: streamConsumerStateRefreshFunc(conn, cn, sa), + Timeout: d.Timeout(schema.TimeoutDelete), + Delay: 10 * time.Second, + MinTimeout: 3 * time.Second, + } + + _, err = stateConf.WaitForState() + if err != nil { + return fmt.Errorf( + "Error waiting for Stream Consumer (%s) to be destroyed: %s", + cn, err) + } + + return nil +} + +type kinesisStreamConsumerState struct { + arn string + streamArn string + creationTimestamp int64 + status string +} + +func validateResourceAwsKinesisStreamConsumerImportString(importStr string) ([]string, error) { + // example: my_consumer@arn:aws:kinesis:us-west-2:123456789012:stream/my-stream + importParts := strings.Split(strings.ToLower(importStr), "@") + errStr := "unexpected format of import string (%q), expected @: %s" + if len(importParts) != 2 { + return nil, fmt.Errorf(errStr, importStr, "invalid no. of parts") + } + + consumerName := importParts[0] + streamArn := importParts[1] + + consumerNameRe := regexp.MustCompile(`(^[a-zA-Z0-9_.-]+$)`) + streamArnRe := regexp.MustCompile(`arn:aws.*:kinesis:.*:\d{12}:stream/.+`) + + if !consumerNameRe.MatchString(consumerName) { + return nil, fmt.Errorf(errStr, importStr, "invalid consumer name") + } + + if !streamArnRe.MatchString(streamArn) { + return nil, fmt.Errorf(errStr, importStr, "invalid stream arn") + } + + return importParts, nil +} + +func readKinesisStreamConsumerState(conn *kinesis.Kinesis, cn string, sa string) (*kinesisStreamConsumerState, error) { + input := &kinesis.DescribeStreamConsumerInput{ + ConsumerName: aws.String(cn), + StreamARN: aws.String(sa), + } + + state := &kinesisStreamConsumerState{} + response, err := conn.DescribeStreamConsumer(input) + if err == nil { + state.arn = aws.StringValue(response.ConsumerDescription.ConsumerARN) + state.streamArn = aws.StringValue(response.ConsumerDescription.StreamARN) + state.creationTimestamp = aws.TimeValue(response.ConsumerDescription.ConsumerCreationTimestamp).Unix() + state.status = aws.StringValue(response.ConsumerDescription.ConsumerStatus) + } + + return state, err +} + +func streamConsumerStateRefreshFunc(conn *kinesis.Kinesis, cn string, sa string) resource.StateRefreshFunc { + return func() (interface{}, string, error) { + state, err := readKinesisStreamConsumerState(conn, cn, sa) + if err != nil { + if awsErr, ok := err.(awserr.Error); ok { + if awsErr.Code() == "ResourceNotFoundException" { + return 42, "DESTROYED", nil + } + return nil, awsErr.Code(), err + } + return nil, "failed", err + } + + return state, state.status, nil + } +} diff --git a/aws/resource_aws_kinesis_stream_consumer_test.go b/aws/resource_aws_kinesis_stream_consumer_test.go new file mode 100644 index 00000000000..88ade1522ad --- /dev/null +++ b/aws/resource_aws_kinesis_stream_consumer_test.go @@ -0,0 +1,258 @@ +package aws + +import ( + "fmt" + "strings" + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/acctest" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" + "github.com/hashicorp/terraform-plugin-sdk/v2/terraform" +) + +func TestAccAWSKinesisStreamConsumer_basic(t *testing.T) { + var stream kinesis.StreamDescription + var consumer kinesis.ConsumerDescription + config := createAccKinesisStreamConsumerConfig() + resourceName := config.getName() + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckKinesisStreamConsumerDestroy, + Steps: []resource.TestStep{ + { + Config: config.single(), + Check: resource.ComposeTestCheckFunc( + testAccCheckKinesisStreamExists(config.stream.getName(), &stream), + testAccCheckKinesisStreamConsumerExists(config, -1, &consumer), + testAccCheckAWSKinesisStreamConsumerAttributes(config, &consumer), + ), + }, + { + ResourceName: resourceName, + ImportStateIdFunc: testAccKinesisStreamConsumerImportStateIdFunc(config), + ImportState: true, + ImportStateVerify: true, + }, + }, + }) +} + +func testAccKinesisStreamConsumerImportStateIdFunc(config *accKinesisStreamConsumerConfig) resource.ImportStateIdFunc { + return func(s *terraform.State) (string, error) { + return fmt.Sprintf("%s@arn:aws:kinesis:%s:%s:stream/%s", config.getConsumerName(), testAccGetRegion(), testAccGetAccountID(), config.stream.getStreamName()), nil + } +} + +func TestAccAWSKinesisStreamConsumer_createMultipleConcurrentStreamConsumers(t *testing.T) { + var stream kinesis.StreamDescription + var consumer kinesis.ConsumerDescription + config := createAccKinesisStreamConsumerConfig() + + var checkFunctions []resource.TestCheckFunc + + checkFunctions = append( + checkFunctions, + testAccCheckKinesisStreamExists(config.stream.getName(), &stream)) + + for i := 0; i < config.count; i++ { + checkFunctions = append( + checkFunctions, + testAccCheckKinesisStreamConsumerExists(config, i, &consumer)) + } + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckKinesisStreamConsumerDestroy, + Steps: []resource.TestStep{ + { + Config: config.multiple(), + Check: resource.ComposeTestCheckFunc(checkFunctions...), + }, + }, + }) +} + +func testAccCheckKinesisStreamConsumerExists(c *accKinesisStreamConsumerConfig, + index int, consumer *kinesis.ConsumerDescription) resource.TestCheckFunc { + var resourceName string + if index > -1 { + resourceName = c.getIndexedName(index) + } else { + resourceName = c.getName() + } + + return func(s *terraform.State) error { + rs, ok := s.RootModule().Resources[resourceName] + if !ok { + return fmt.Errorf("Not found: %s", resourceName) + } + + if rs.Primary.ID == "" { + return fmt.Errorf("No Kinesis Stream Consumer ID is set") + } + + conn := testAccProvider.Meta().(*AWSClient).kinesisconn + describeOpts := &kinesis.DescribeStreamConsumerInput{ + ConsumerName: aws.String(rs.Primary.Attributes["name"]), + StreamARN: aws.String(rs.Primary.Attributes["stream_arn"]), + } + resp, err := conn.DescribeStreamConsumer(describeOpts) + if err != nil { + return err + } + + *consumer = *resp.ConsumerDescription + + return nil + } +} + +func testAccCheckAWSKinesisStreamConsumerAttributes( + c *accKinesisStreamConsumerConfig, consumer *kinesis.ConsumerDescription) resource.TestCheckFunc { + return func(s *terraform.State) error { + if !strings.HasPrefix(*consumer.ConsumerName, c.getConsumerName()) { + return fmt.Errorf("Bad Stream Consumer name: %s", *consumer.ConsumerName) + } + for _, rs := range s.RootModule().Resources { + if rs.Type != "aws_kinesis_stream_consumer" { + continue + } + + if *consumer.ConsumerARN != rs.Primary.Attributes["arn"] { + return fmt.Errorf("Bad Stream Consumer(%s) ARN\n\t expected: %s\n\tgot: %s\n", rs.Type, rs.Primary.Attributes["arn"], *consumer.ConsumerARN) + } + } + return nil + } +} + +func testAccCheckKinesisStreamConsumerDestroy(s *terraform.State) error { + for _, rs := range s.RootModule().Resources { + if rs.Type != "aws_kinesis_stream_consumer" { + continue + } + conn := testAccProvider.Meta().(*AWSClient).kinesisconn + describeOpts := &kinesis.DescribeStreamConsumerInput{ + ConsumerName: aws.String(rs.Primary.Attributes["name"]), + StreamARN: aws.String(rs.Primary.Attributes["stream_arn"]), + } + resp, err := conn.DescribeStreamConsumer(describeOpts) + if err == nil { + if resp.ConsumerDescription != nil && *resp.ConsumerDescription.ConsumerStatus != "DELETING" { + return fmt.Errorf("Error: Stream Consumer still exists") + } + } + + return nil + + } + + return nil +} + +type accKinesisStreamConfig struct { + resourceType string + resourceLocalName string + streamBasename string + randInt int +} + +func (r *accKinesisStreamConfig) getName() string { + return fmt.Sprintf("%s.%s", r.resourceType, r.resourceLocalName) +} + +func (r *accKinesisStreamConfig) getStreamName() string { + return fmt.Sprintf("%s-%d", r.streamBasename, r.randInt) +} + +func (r *accKinesisStreamConfig) single() string { + + return fmt.Sprintf(` +resource "%s" "%s" { + name = "%s" + shard_count = 2 + enforce_consumer_deletion = true + + tags = { + Name = "tf-test" + } +}`, r.resourceType, r.resourceLocalName, r.getStreamName()) +} + +type accKinesisStreamConsumerConfig struct { + stream *accKinesisStreamConfig + resourceType string + resourceLocalName string + consumerBasename string + count int + randInt int +} + +func (r *accKinesisStreamConsumerConfig) getName() string { + return fmt.Sprintf("%s.%s", r.resourceType, r.resourceLocalName) +} + +func (r *accKinesisStreamConsumerConfig) getIndexedName(index int) string { + return fmt.Sprintf("%s.%s.%d", r.resourceType, r.resourceLocalName, index) +} + +func (r *accKinesisStreamConsumerConfig) getConsumerName() string { + return fmt.Sprintf("%s-%d", r.consumerBasename, r.randInt) +} + +func (r *accKinesisStreamConsumerConfig) data() string { + return fmt.Sprintf(` +%s + +data "%s" "%s" { + name = "${%s.name}" + stream_arn = "${%s.arn}" +}`, r.single(), r.resourceType, r.resourceLocalName, r.getName(), r.stream.getName()) +} + +func (r *accKinesisStreamConsumerConfig) single() string { + + return fmt.Sprintf(` +%s + +resource "%s" "%s" { + name = "%s" + stream_arn = "${%s.arn}" +} + +`, r.stream.single(), r.resourceType, r.resourceLocalName, r.getConsumerName(), r.stream.getName()) +} + +func (r *accKinesisStreamConsumerConfig) multiple() string { + + return fmt.Sprintf(` +%s + +resource "%s" "%s" { + count = %d + name = "%s-${count.index}" + stream_arn = "${%s.arn}" +}`, r.stream.single(), r.resourceType, r.resourceLocalName, r.count, r.getConsumerName(), r.stream.getName()) +} + +func createAccKinesisStreamConsumerConfig() *accKinesisStreamConsumerConfig { + iRnd := acctest.RandInt() + return &accKinesisStreamConsumerConfig{ + stream: &accKinesisStreamConfig{ + resourceType: "aws_kinesis_stream", + resourceLocalName: "test_stream", + streamBasename: "terraform-kinesis-stream-test", + randInt: iRnd, + }, + resourceType: "aws_kinesis_stream_consumer", + resourceLocalName: "test_stream_consumer", + consumerBasename: "terraform-kinesis-stream-consumer-test", + count: 2, + randInt: iRnd, + } +} diff --git a/website/docs/d/kinesis_stream_consumer.html.markdown b/website/docs/d/kinesis_stream_consumer.html.markdown new file mode 100644 index 00000000000..2ecd1b277de --- /dev/null +++ b/website/docs/d/kinesis_stream_consumer.html.markdown @@ -0,0 +1,41 @@ +--- +subcategory: "Kinesis" +layout: "aws" +page_title: "AWS: aws_kinesis_stream_consumer" +description: |- + Provides a Kinesis Stream Consumer data source. +--- + +# Data Source: aws_kinesis_stream_consumer + +Use this data source to get information about a Kinesis Stream Consumer for use in other +resources. + +For more details, see the [Amazon Kinesis Stream Consumer Documentation][1]. + +## Example Usage + +```hcl +data "aws_kinesis_stream_consumer" "stream_consumer" { + name = "stream-consumer-name" + stream_arn = "${aws_kinesis_stream.stream.arn}" +} +``` + +## Argument Reference + +* `name` - (Required) The name of the Kinesis Stream Consumer. +* `stream_arn` - (Required) The Amazon Resource Name (ARN) of the Kinesis Stream. + +## Attributes Reference + +`id` is set to the Amazon Resource Name (ARN) of the Kinesis Stream Consumer. In addition, the following attributes +are exported: + +* `arn` - The Amazon Resource Name (ARN) of the Kinesis Stream Consumer (same as id). +* `name` - The name of the Kinesis Stream Consumer. +* `creation_timestamp` - The approximate UNIX timestamp that the stream consumer was created. +* `status` - The current status of the stream consumer. The stream status is one of CREATING, DELETING, ACTIVE, or UPDATING. +* `stream_arn` - The Amazon Resource Name (ARN) of the Kinesis Stream. + +[1]: https://docs.aws.amazon.com/streams/latest/dev/introduction-to-enhanced-consumers.html diff --git a/website/docs/r/kinesis_stream_consumer.html.markdown b/website/docs/r/kinesis_stream_consumer.html.markdown new file mode 100644 index 00000000000..f2e380c4867 --- /dev/null +++ b/website/docs/r/kinesis_stream_consumer.html.markdown @@ -0,0 +1,68 @@ +--- +subcategory: "Kinesis" +layout: "aws" +page_title: "AWS: aws_kinesis_stream_consumer" +description: |- + Provides a AWS Kinesis Stream Consumer +--- + +# aws_kinesis_stream_consumer + +Provides a Kinesis Stream Consumer resource. A consumer is an application that processes all data from a Kinesis data stream. +When a consumer uses enhanced fan-out, it gets its own 2 MiB/sec allotment of read throughput, allowing multiple consumers to +read data from the same stream in parallel, without contending for read throughput with other consumers. [Reading Data from Kinesis][1] + +You can register up to 20 consumers per stream. However, you can request a limit increase using the [Kinesis Data Streams limits form][2]. A given consumer can only be registered with one stream at a time. + +For more details, see the [Amazon Kinesis Stream Consumer Documentation][3]. + +## Example Usage + +```hcl +resource "aws_kinesis_stream" "test_stream" { + name = "terraform-kinesis-test" + shard_count = 1 +} + +resource "aws_lambda_event_source_mapping" "example" { + event_source_arn = "${aws_kinesis_stream_consumer.test_stream_consumer.arn}" + function_name = "${aws_lambda_function.example.arn}" + starting_position = "LATEST" +} + +resource "aws_kinesis_stream_consumer" "test_stream_consumer" { + name = "terraform-kinesis-stream-consumer-test" + stream_arn = "${aws_kinesis_stream.test_stream.arn}" +} +``` + +## Argument Reference + +The following arguments are supported: + +* `name` - (Required) A name to identify the stream. This is unique to the +AWS account and region the Stream Consumer is created in. +* `stream_arn` – (Required) The Amazon Resource Name (ARN) of the Kinesis Stream, the Consumer is connected to. + +## Attributes Reference + +* `id` - The unique Stream Consumer id +* `name` - The unique Stream Consumer name +* `arn` - The Amazon Resource Name (ARN) specifying the Stream Consumer (same as `id`) + +## Timeouts + +`aws_kinesis_stream_consumer` provides the following [Timeouts](/docs/configuration/resources.html#timeouts) configuration options: + +- `create` - (Default `5 minutes`) Used for Creating a Kinesis Stream Consumer +- `delete` - (Default `120 minutes`) Used for Destroying a Kinesis Stream Consumer + +Kinesis Streams can be imported using the `name@stream_arn`, e.g. + +``` +$ terraform import aws_kinesis_stream_consumer.test_stream_consumer terraform-kinesis-stream-consumer-test@arn:aws:kinesis:us-west-2:123456789012:stream/my-stream +``` + +[1]: https://docs.aws.amazon.com/streams/latest/dev/building-consumers.html +[2]: https://console.aws.amazon.com/support/v1?#/ +[3]: https://docs.aws.amazon.com/streams/latest/dev/introduction-to-enhanced-consumers.html From 06941533596050e9720ea74202ec40b657c7e96f Mon Sep 17 00:00:00 2001 From: Anthony Roach Date: Sat, 16 Jan 2021 18:21:37 -0500 Subject: [PATCH 2/4] lint fixes --- aws/resource_aws_kinesis_stream_consumer_test.go | 2 +- .../docs/d/kinesis_stream_consumer.html.markdown | 4 ++-- .../docs/r/kinesis_stream_consumer.html.markdown | 16 ++++++++-------- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/aws/resource_aws_kinesis_stream_consumer_test.go b/aws/resource_aws_kinesis_stream_consumer_test.go index 88ade1522ad..518b9534586 100644 --- a/aws/resource_aws_kinesis_stream_consumer_test.go +++ b/aws/resource_aws_kinesis_stream_consumer_test.go @@ -43,7 +43,7 @@ func TestAccAWSKinesisStreamConsumer_basic(t *testing.T) { func testAccKinesisStreamConsumerImportStateIdFunc(config *accKinesisStreamConsumerConfig) resource.ImportStateIdFunc { return func(s *terraform.State) (string, error) { - return fmt.Sprintf("%s@arn:aws:kinesis:%s:%s:stream/%s", config.getConsumerName(), testAccGetRegion(), testAccGetAccountID(), config.stream.getStreamName()), nil + return fmt.Sprintf("%s@arn:aws:kinesis:%s:%s:stream/%s", config.getConsumerName(), testAccGetRegion(), testAccGetAccountID(), config.stream.getStreamName()), nil //lintignore:AWSAT005 } } diff --git a/website/docs/d/kinesis_stream_consumer.html.markdown b/website/docs/d/kinesis_stream_consumer.html.markdown index 2ecd1b277de..c7442ed3fcf 100644 --- a/website/docs/d/kinesis_stream_consumer.html.markdown +++ b/website/docs/d/kinesis_stream_consumer.html.markdown @@ -17,8 +17,8 @@ For more details, see the [Amazon Kinesis Stream Consumer Documentation][1]. ```hcl data "aws_kinesis_stream_consumer" "stream_consumer" { - name = "stream-consumer-name" - stream_arn = "${aws_kinesis_stream.stream.arn}" + name = "stream-consumer-name" + stream_arn = aws_kinesis_stream.stream.arn } ``` diff --git a/website/docs/r/kinesis_stream_consumer.html.markdown b/website/docs/r/kinesis_stream_consumer.html.markdown index f2e380c4867..e379a290772 100644 --- a/website/docs/r/kinesis_stream_consumer.html.markdown +++ b/website/docs/r/kinesis_stream_consumer.html.markdown @@ -8,8 +8,8 @@ description: |- # aws_kinesis_stream_consumer -Provides a Kinesis Stream Consumer resource. A consumer is an application that processes all data from a Kinesis data stream. -When a consumer uses enhanced fan-out, it gets its own 2 MiB/sec allotment of read throughput, allowing multiple consumers to +Provides a Kinesis Stream Consumer resource. A consumer is an application that processes all data from a Kinesis data stream. +When a consumer uses enhanced fan-out, it gets its own 2 MiB/sec allotment of read throughput, allowing multiple consumers to read data from the same stream in parallel, without contending for read throughput with other consumers. [Reading Data from Kinesis][1] You can register up to 20 consumers per stream. However, you can request a limit increase using the [Kinesis Data Streams limits form][2]. A given consumer can only be registered with one stream at a time. @@ -20,19 +20,19 @@ For more details, see the [Amazon Kinesis Stream Consumer Documentation][3]. ```hcl resource "aws_kinesis_stream" "test_stream" { - name = "terraform-kinesis-test" - shard_count = 1 + name = "terraform-kinesis-test" + shard_count = 1 } resource "aws_lambda_event_source_mapping" "example" { - event_source_arn = "${aws_kinesis_stream_consumer.test_stream_consumer.arn}" - function_name = "${aws_lambda_function.example.arn}" + event_source_arn = aws_kinesis_stream_consumer.test_stream_consumer.arn + function_name = aws_lambda_function.example.arn starting_position = "LATEST" } resource "aws_kinesis_stream_consumer" "test_stream_consumer" { - name = "terraform-kinesis-stream-consumer-test" - stream_arn = "${aws_kinesis_stream.test_stream.arn}" + name = "terraform-kinesis-stream-consumer-test" + stream_arn = aws_kinesis_stream.test_stream.arn } ``` From b39b23012f97d02366bfe15eda59e391c99d4b0d Mon Sep 17 00:00:00 2001 From: stp Date: Sun, 13 Oct 2019 05:33:43 +0200 Subject: [PATCH 3/4] add aws_kinesis_stream_consumer documentation --- website/aws.erb | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 website/aws.erb diff --git a/website/aws.erb b/website/aws.erb new file mode 100644 index 00000000000..e69de29bb2d From 48bf2609f8790d7d476680d41f4d16ae12b40640 Mon Sep 17 00:00:00 2001 From: Angie Pinilla Date: Thu, 18 Mar 2021 03:08:01 -0400 Subject: [PATCH 4/4] CR updates; refactor to service package and update test coverage --- .changelog/17149.txt | 7 + ...data_source_aws_kinesis_stream_consumer.go | 92 ++++-- ...source_aws_kinesis_stream_consumer_test.go | 131 +++++++- aws/internal/service/kinesis/finder/finder.go | 25 ++ aws/internal/service/kinesis/waiter/status.go | 30 ++ aws/internal/service/kinesis/waiter/waiter.go | 49 +++ aws/resource_aws_kinesis_stream_consumer.go | 219 ++++--------- ...source_aws_kinesis_stream_consumer_test.go | 303 +++++++----------- aws/resource_aws_kinesis_stream_test.go | 19 +- website/aws.erb | 0 .../d/kinesis_stream_consumer.html.markdown | 29 +- .../r/kinesis_stream_consumer.html.markdown | 54 ++-- 12 files changed, 529 insertions(+), 429 deletions(-) create mode 100644 .changelog/17149.txt create mode 100644 aws/internal/service/kinesis/finder/finder.go create mode 100644 aws/internal/service/kinesis/waiter/status.go create mode 100644 aws/internal/service/kinesis/waiter/waiter.go delete mode 100644 website/aws.erb diff --git a/.changelog/17149.txt b/.changelog/17149.txt new file mode 100644 index 00000000000..8d6f8cf48f1 --- /dev/null +++ b/.changelog/17149.txt @@ -0,0 +1,7 @@ +```release-note:new-data-source +aws_kinesis_stream_consumer +``` + +```release-note:new-resource +aws_kinesis_stream_consumer +``` diff --git a/aws/data_source_aws_kinesis_stream_consumer.go b/aws/data_source_aws_kinesis_stream_consumer.go index 3bc611881a2..fe8bdac2d3f 100644 --- a/aws/data_source_aws_kinesis_stream_consumer.go +++ b/aws/data_source_aws_kinesis_stream_consumer.go @@ -1,6 +1,11 @@ package aws import ( + "fmt" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/kinesis" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" ) @@ -9,23 +14,21 @@ func dataSourceAwsKinesisStreamConsumer() *schema.Resource { Read: dataSourceAwsKinesisStreamConsumerRead, Schema: map[string]*schema.Schema{ - "name": { - Type: schema.TypeString, - Required: true, - }, - - "stream_arn": { - Type: schema.TypeString, - Required: true, + "arn": { + Type: schema.TypeString, + Optional: true, + Computed: true, + ValidateFunc: validateArn, }, - "arn": { + "creation_timestamp": { Type: schema.TypeString, Computed: true, }, - "creation_timestamp": { - Type: schema.TypeInt, + "name": { + Type: schema.TypeString, + Optional: true, Computed: true, }, @@ -33,25 +36,72 @@ func dataSourceAwsKinesisStreamConsumer() *schema.Resource { Type: schema.TypeString, Computed: true, }, + + "stream_arn": { + Type: schema.TypeString, + Required: true, + ValidateFunc: validateArn, + }, }, } } func dataSourceAwsKinesisStreamConsumerRead(d *schema.ResourceData, meta interface{}) error { conn := meta.(*AWSClient).kinesisconn - cn := d.Get("name").(string) - sa := d.Get("stream_arn").(string) - state, err := readKinesisStreamConsumerState(conn, cn, sa) + streamArn := d.Get("stream_arn").(string) + + input := &kinesis.ListStreamConsumersInput{ + StreamARN: aws.String(streamArn), + } + + var results []*kinesis.Consumer + + err := conn.ListStreamConsumersPages(input, func(page *kinesis.ListStreamConsumersOutput, lastPage bool) bool { + if page == nil { + return !lastPage + } + + for _, consumer := range page.Consumers { + if consumer == nil { + continue + } + + if v, ok := d.GetOk("name"); ok && v.(string) != aws.StringValue(consumer.ConsumerName) { + continue + } + + if v, ok := d.GetOk("arn"); ok && v.(string) != aws.StringValue(consumer.ConsumerARN) { + continue + } + + results = append(results, consumer) + + } + + return !lastPage + }) + if err != nil { - return err + return fmt.Errorf("error listing Kinesis Stream Consumers: %w", err) + } + + if len(results) == 0 { + return fmt.Errorf("no Kinesis Stream Consumer found matching criteria; try different search") } - d.SetId(state.arn) - d.Set("arn", state.arn) - d.Set("name", cn) - d.Set("stream_arn", sa) - d.Set("status", state.status) - d.Set("creation_timestamp", state.creationTimestamp) + + if len(results) > 1 { + return fmt.Errorf("multiple Kinesis Stream Consumers found matching criteria; try different search") + } + + consumer := results[0] + + d.SetId(aws.StringValue(consumer.ConsumerARN)) + d.Set("arn", consumer.ConsumerARN) + d.Set("name", consumer.ConsumerName) + d.Set("status", consumer.ConsumerStatus) + d.Set("stream_arn", streamArn) + d.Set("creation_timestamp", aws.TimeValue(consumer.ConsumerCreationTimestamp).Format(time.RFC3339)) return nil } diff --git a/aws/data_source_aws_kinesis_stream_consumer_test.go b/aws/data_source_aws_kinesis_stream_consumer_test.go index d0a0e952b88..20c07c136de 100644 --- a/aws/data_source_aws_kinesis_stream_consumer_test.go +++ b/aws/data_source_aws_kinesis_stream_consumer_test.go @@ -4,32 +4,137 @@ import ( "fmt" "testing" - "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/acctest" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" ) func TestAccAWSKinesisStreamConsumerDataSource_basic(t *testing.T) { - var stream kinesis.StreamDescription - var consumer kinesis.ConsumerDescription - config := createAccKinesisStreamConsumerConfig() + rName := acctest.RandomWithPrefix("tf-acc-test") + dataSourceName := "data.aws_kinesis_stream_consumer.test" + resourceName := "aws_kinesis_stream_consumer.test" + streamName := "aws_kinesis_stream.test" resource.ParallelTest(t, resource.TestCase{ PreCheck: func() { testAccPreCheck(t) }, Providers: testAccProviders, - CheckDestroy: testAccCheckKinesisStreamConsumerDestroy, + CheckDestroy: nil, Steps: []resource.TestStep{ { - Config: config.data(), + Config: testAccAWSKinesisStreamConsumerDataSourceConfig(rName), Check: resource.ComposeTestCheckFunc( - testAccCheckKinesisStreamExists(config.stream.getName(), &stream), - testAccCheckKinesisStreamConsumerExists(config, -1, &consumer), - resource.TestCheckResourceAttrSet(fmt.Sprintf("data.%s", config.getName()), "arn"), - resource.TestCheckResourceAttrSet(fmt.Sprintf("data.%s", config.getName()), "stream_arn"), - resource.TestCheckResourceAttr(fmt.Sprintf("data.%s", config.getName()), "name", config.getConsumerName()), - resource.TestCheckResourceAttr(fmt.Sprintf("data.%s", config.getName()), "status", "ACTIVE"), - resource.TestCheckResourceAttrSet(fmt.Sprintf("data.%s", config.getName()), "creation_timestamp"), + resource.TestCheckResourceAttrPair(dataSourceName, "arn", resourceName, "arn"), + resource.TestCheckResourceAttrPair(dataSourceName, "name", resourceName, "name"), + resource.TestCheckResourceAttrPair(dataSourceName, "stream_arn", streamName, "arn"), + resource.TestCheckResourceAttrSet(dataSourceName, "creation_timestamp"), + resource.TestCheckResourceAttrSet(dataSourceName, "status"), ), }, }, }) } + +func TestAccAWSKinesisStreamConsumerDataSource_Name(t *testing.T) { + rName := acctest.RandomWithPrefix("tf-acc-test") + dataSourceName := "data.aws_kinesis_stream_consumer.test" + resourceName := "aws_kinesis_stream_consumer.test" + streamName := "aws_kinesis_stream.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: nil, + Steps: []resource.TestStep{ + { + Config: testAccAWSKinesisStreamConsumerDataSourceConfigName(rName), + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttrPair(dataSourceName, "arn", resourceName, "arn"), + resource.TestCheckResourceAttrPair(dataSourceName, "name", resourceName, "name"), + resource.TestCheckResourceAttrPair(dataSourceName, "stream_arn", streamName, "arn"), + resource.TestCheckResourceAttrSet(dataSourceName, "creation_timestamp"), + resource.TestCheckResourceAttrSet(dataSourceName, "status"), + ), + }, + }, + }) +} + +func TestAccAWSKinesisStreamConsumerDataSource_Arn(t *testing.T) { + rName := acctest.RandomWithPrefix("tf-acc-test") + dataSourceName := "data.aws_kinesis_stream_consumer.test" + resourceName := "aws_kinesis_stream_consumer.test" + streamName := "aws_kinesis_stream.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: nil, + Steps: []resource.TestStep{ + { + Config: testAccAWSKinesisStreamConsumerDataSourceConfigArn(rName), + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttrPair(dataSourceName, "arn", resourceName, "arn"), + resource.TestCheckResourceAttrPair(dataSourceName, "name", resourceName, "name"), + resource.TestCheckResourceAttrPair(dataSourceName, "stream_arn", streamName, "arn"), + resource.TestCheckResourceAttrSet(dataSourceName, "creation_timestamp"), + resource.TestCheckResourceAttrSet(dataSourceName, "status"), + ), + }, + }, + }) +} + +func testAccAWSKinesisStreamConsumerDataSourceBaseConfig(rName string) string { + return fmt.Sprintf(` +resource "aws_kinesis_stream" "test" { + name = %q + shard_count = 2 +} +`, rName) +} + +func testAccAWSKinesisStreamConsumerDataSourceConfig(rName string) string { + return composeConfig( + testAccAWSKinesisStreamConsumerDataSourceBaseConfig(rName), + fmt.Sprintf(` +data "aws_kinesis_stream_consumer" "test" { + stream_arn = aws_kinesis_stream_consumer.test.stream_arn +} + +resource "aws_kinesis_stream_consumer" "test" { + name = %q + stream_arn = aws_kinesis_stream.test.arn +} +`, rName)) +} + +func testAccAWSKinesisStreamConsumerDataSourceConfigName(rName string) string { + return composeConfig( + testAccAWSKinesisStreamConsumerDataSourceBaseConfig(rName), + fmt.Sprintf(` +data "aws_kinesis_stream_consumer" "test" { + name = aws_kinesis_stream_consumer.test.name + stream_arn = aws_kinesis_stream_consumer.test.stream_arn +} + +resource "aws_kinesis_stream_consumer" "test" { + name = %q + stream_arn = aws_kinesis_stream.test.arn +} +`, rName)) +} + +func testAccAWSKinesisStreamConsumerDataSourceConfigArn(rName string) string { + return composeConfig( + testAccAWSKinesisStreamConsumerDataSourceBaseConfig(rName), + fmt.Sprintf(` +data "aws_kinesis_stream_consumer" "test" { + arn = aws_kinesis_stream_consumer.test.arn + stream_arn = aws_kinesis_stream_consumer.test.stream_arn +} + +resource "aws_kinesis_stream_consumer" "test" { + name = %q + stream_arn = aws_kinesis_stream.test.arn +} +`, rName)) +} diff --git a/aws/internal/service/kinesis/finder/finder.go b/aws/internal/service/kinesis/finder/finder.go new file mode 100644 index 00000000000..ac13b1b0d05 --- /dev/null +++ b/aws/internal/service/kinesis/finder/finder.go @@ -0,0 +1,25 @@ +package finder + +import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/kinesis" +) + +// StreamConsumerByARN returns the stream consumer corresponding to the specified ARN. +// Returns nil if no stream consumer is found. +func StreamConsumerByARN(conn *kinesis.Kinesis, arn string) (*kinesis.ConsumerDescription, error) { + input := &kinesis.DescribeStreamConsumerInput{ + ConsumerARN: aws.String(arn), + } + + output, err := conn.DescribeStreamConsumer(input) + if err != nil { + return nil, err + } + + if output == nil { + return nil, nil + } + + return output.ConsumerDescription, nil +} diff --git a/aws/internal/service/kinesis/waiter/status.go b/aws/internal/service/kinesis/waiter/status.go new file mode 100644 index 00000000000..b8403ac6e27 --- /dev/null +++ b/aws/internal/service/kinesis/waiter/status.go @@ -0,0 +1,30 @@ +package waiter + +import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" + "github.com/terraform-providers/terraform-provider-aws/aws/internal/service/kinesis/finder" +) + +const ( + StreamConsumerStatusNotFound = "NotFound" + StreamConsumerStatusUnknown = "Unknown" +) + +// StreamConsumerStatus fetches the StreamConsumer and its Status +func StreamConsumerStatus(conn *kinesis.Kinesis, arn string) resource.StateRefreshFunc { + return func() (interface{}, string, error) { + consumer, err := finder.StreamConsumerByARN(conn, arn) + + if err != nil { + return nil, StreamConsumerStatusUnknown, err + } + + if consumer == nil { + return nil, StreamConsumerStatusNotFound, nil + } + + return consumer, aws.StringValue(consumer.ConsumerStatus), nil + } +} diff --git a/aws/internal/service/kinesis/waiter/waiter.go b/aws/internal/service/kinesis/waiter/waiter.go new file mode 100644 index 00000000000..1e3814bcd65 --- /dev/null +++ b/aws/internal/service/kinesis/waiter/waiter.go @@ -0,0 +1,49 @@ +package waiter + +import ( + "time" + + "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" +) + +const ( + StreamConsumerCreatedTimeout = 5 * time.Minute + StreamConsumerDeletedTimeout = 5 * time.Minute +) + +// StreamConsumerCreated waits for an Stream Consumer to return Active +func StreamConsumerCreated(conn *kinesis.Kinesis, arn string) (*kinesis.ConsumerDescription, error) { + stateConf := &resource.StateChangeConf{ + Pending: []string{kinesis.ConsumerStatusCreating}, + Target: []string{kinesis.ConsumerStatusActive}, + Refresh: StreamConsumerStatus(conn, arn), + Timeout: StreamConsumerCreatedTimeout, + } + + outputRaw, err := stateConf.WaitForState() + + if v, ok := outputRaw.(*kinesis.ConsumerDescription); ok { + return v, err + } + + return nil, err +} + +// StreamConsumerDeleted waits for a Stream Consumer to be deleted +func StreamConsumerDeleted(conn *kinesis.Kinesis, arn string) (*kinesis.ConsumerDescription, error) { + stateConf := &resource.StateChangeConf{ + Pending: []string{kinesis.ConsumerStatusDeleting}, + Target: []string{}, + Refresh: StreamConsumerStatus(conn, arn), + Timeout: StreamConsumerDeletedTimeout, + } + + outputRaw, err := stateConf.WaitForState() + + if v, ok := outputRaw.(*kinesis.ConsumerDescription); ok { + return v, err + } + + return nil, err +} diff --git a/aws/resource_aws_kinesis_stream_consumer.go b/aws/resource_aws_kinesis_stream_consumer.go index 9dc34e75147..cefb1202ac6 100644 --- a/aws/resource_aws_kinesis_stream_consumer.go +++ b/aws/resource_aws_kinesis_stream_consumer.go @@ -3,15 +3,14 @@ package aws import ( "fmt" "log" - "regexp" - "strings" "time" "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/kinesis" - "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" + "github.com/hashicorp/aws-sdk-go-base/tfawserr" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" + "github.com/terraform-providers/terraform-provider-aws/aws/internal/service/kinesis/finder" + "github.com/terraform-providers/terraform-provider-aws/aws/internal/service/kinesis/waiter" ) func resourceAwsKinesisStreamConsumer() *schema.Resource { @@ -20,39 +19,32 @@ func resourceAwsKinesisStreamConsumer() *schema.Resource { Read: resourceAwsKinesisStreamConsumerRead, Delete: resourceAwsKinesisStreamConsumerDelete, - Timeouts: &schema.ResourceTimeout{ - Create: schema.DefaultTimeout(5 * time.Minute), - Delete: schema.DefaultTimeout(120 * time.Minute), - }, - Importer: &schema.ResourceImporter{ - State: func(d *schema.ResourceData, meta interface{}) ([]*schema.ResourceData, error) { - importParts, err := validateResourceAwsKinesisStreamConsumerImportString(d.Id()) - if err != nil { - return nil, err - } - d.Set("name", importParts[0]) - d.Set("stream_arn", importParts[1]) - return []*schema.ResourceData{d}, nil - }, + State: schema.ImportStatePassthrough, }, Schema: map[string]*schema.Schema{ - "name": { + "arn": { Type: schema.TypeString, - Required: true, - ForceNew: true, + Computed: true, }, - "stream_arn": { + "creation_timestamp": { + Type: schema.TypeString, + Computed: true, + }, + + "name": { Type: schema.TypeString, Required: true, ForceNew: true, }, - "arn": { - Type: schema.TypeString, - Computed: true, + "stream_arn": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + ValidateFunc: validateArn, }, }, } @@ -60,39 +52,28 @@ func resourceAwsKinesisStreamConsumer() *schema.Resource { func resourceAwsKinesisStreamConsumerCreate(d *schema.ResourceData, meta interface{}) error { conn := meta.(*AWSClient).kinesisconn - cn := d.Get("name").(string) - sa := d.Get("stream_arn").(string) - createOpts := &kinesis.RegisterStreamConsumerInput{ - ConsumerName: aws.String(cn), - StreamARN: aws.String(sa), + name := d.Get("name").(string) + streamArn := d.Get("stream_arn").(string) + + input := &kinesis.RegisterStreamConsumerInput{ + ConsumerName: aws.String(name), + StreamARN: aws.String(streamArn), } - createOutput, err := conn.RegisterStreamConsumer(createOpts) + output, err := conn.RegisterStreamConsumer(input) if err != nil { - return fmt.Errorf("Unable to create stream consumer: %s", err) + return fmt.Errorf("error creating Kinesis Stream Consumer (%s): %w", name, err) } - arn := aws.StringValue(createOutput.Consumer.ConsumerARN) - - d.SetId(arn) - d.Set("arn", arn) - - // No error, wait for ACTIVE state - stateConf := &resource.StateChangeConf{ - Pending: []string{"CREATING"}, - Target: []string{"ACTIVE"}, - Refresh: streamConsumerStateRefreshFunc(conn, cn, sa), - Timeout: d.Timeout(schema.TimeoutCreate), - Delay: 10 * time.Second, - MinTimeout: 3 * time.Second, + if output == nil || output.Consumer == nil { + return fmt.Errorf("error creating Kinesis Stream Consumer (%s): empty output", name) } - _, err = stateConf.WaitForState() - if err != nil { - return fmt.Errorf( - "Error waiting for Kinesis Stream Consumer (%s) to become active: %s", - cn, err) + d.SetId(aws.StringValue(output.Consumer.ConsumerARN)) + + if _, err := waiter.StreamConsumerCreated(conn, d.Id()); err != nil { + return fmt.Errorf("error waiting for Kinesis Stream Consumer (%s) creation: %w", d.Id(), err) } return resourceAwsKinesisStreamConsumerRead(d, meta) @@ -100,130 +81,58 @@ func resourceAwsKinesisStreamConsumerCreate(d *schema.ResourceData, meta interfa func resourceAwsKinesisStreamConsumerRead(d *schema.ResourceData, meta interface{}) error { conn := meta.(*AWSClient).kinesisconn - cn := d.Get("name").(string) - sa := d.Get("stream_arn").(string) - state, err := readKinesisStreamConsumerState(conn, cn, sa) - if err != nil { - if awsErr, ok := err.(awserr.Error); ok { - if awsErr.Code() == "ResourceNotFoundException" { - log.Printf("[WARN] Kinesis Stream Consumer (%s) not found, removing from state", d.Id()) - d.SetId("") - return nil - } - return fmt.Errorf("Error reading Kinesis Stream Consumer: \"%s\", code: \"%s\"", awsErr.Message(), awsErr.Code()) - } - return err + consumer, err := finder.StreamConsumerByARN(conn, d.Id()) + if !d.IsNewResource() && tfawserr.ErrCodeEquals(err, kinesis.ErrCodeResourceNotFoundException) { + log.Printf("[WARN] Kinesis Stream Consumer (%s) not found, removing from state", d.Id()) + d.SetId("") + return nil } - d.SetId(state.arn) - d.Set("arn", state.arn) - return nil -} -func resourceAwsKinesisStreamConsumerDelete(d *schema.ResourceData, meta interface{}) error { - conn := meta.(*AWSClient).kinesisconn - cn := d.Get("name").(string) - sa := d.Get("stream_arn").(string) - - log.Printf("[DEBUG] Deregister Stream Consumer: %s", cn) - _, err := conn.DeregisterStreamConsumer(&kinesis.DeregisterStreamConsumerInput{ - ConsumerName: aws.String(cn), - StreamARN: aws.String(sa), - }) if err != nil { - // Missing Stream Consumer or Stream (API error) - if awsErr, ok := err.(awserr.Error); ok { - if awsErr.Code() == "ResourceNotFoundException" { - log.Printf("[WARN] No Stream Consumer found: %v", cn) - return nil - } - } - return err + return fmt.Errorf("error reading Kinesis Stream Consumer (%s): %w", d.Id(), err) } - stateConf := &resource.StateChangeConf{ - Pending: []string{"DELETING"}, - Target: []string{"DESTROYED"}, - Refresh: streamConsumerStateRefreshFunc(conn, cn, sa), - Timeout: d.Timeout(schema.TimeoutDelete), - Delay: 10 * time.Second, - MinTimeout: 3 * time.Second, + if consumer == nil { + if d.IsNewResource() { + return fmt.Errorf("error reading Kinesis Stream Consumer (%s): empty output after creation", d.Id()) + } + log.Printf("[WARN] Kinesis Stream Consumer (%s) not found, removing from state", d.Id()) + d.SetId("") + return nil } - _, err = stateConf.WaitForState() - if err != nil { - return fmt.Errorf( - "Error waiting for Stream Consumer (%s) to be destroyed: %s", - cn, err) - } + d.Set("arn", consumer.ConsumerARN) + d.Set("name", consumer.ConsumerName) + d.Set("creation_timestamp", aws.TimeValue(consumer.ConsumerCreationTimestamp).Format(time.RFC3339)) + d.Set("stream_arn", consumer.StreamARN) return nil } -type kinesisStreamConsumerState struct { - arn string - streamArn string - creationTimestamp int64 - status string -} - -func validateResourceAwsKinesisStreamConsumerImportString(importStr string) ([]string, error) { - // example: my_consumer@arn:aws:kinesis:us-west-2:123456789012:stream/my-stream - importParts := strings.Split(strings.ToLower(importStr), "@") - errStr := "unexpected format of import string (%q), expected @: %s" - if len(importParts) != 2 { - return nil, fmt.Errorf(errStr, importStr, "invalid no. of parts") - } - - consumerName := importParts[0] - streamArn := importParts[1] - - consumerNameRe := regexp.MustCompile(`(^[a-zA-Z0-9_.-]+$)`) - streamArnRe := regexp.MustCompile(`arn:aws.*:kinesis:.*:\d{12}:stream/.+`) - - if !consumerNameRe.MatchString(consumerName) { - return nil, fmt.Errorf(errStr, importStr, "invalid consumer name") - } +func resourceAwsKinesisStreamConsumerDelete(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).kinesisconn - if !streamArnRe.MatchString(streamArn) { - return nil, fmt.Errorf(errStr, importStr, "invalid stream arn") + input := &kinesis.DeregisterStreamConsumerInput{ + ConsumerARN: aws.String(d.Id()), } - return importParts, nil -} + _, err := conn.DeregisterStreamConsumer(input) -func readKinesisStreamConsumerState(conn *kinesis.Kinesis, cn string, sa string) (*kinesisStreamConsumerState, error) { - input := &kinesis.DescribeStreamConsumerInput{ - ConsumerName: aws.String(cn), - StreamARN: aws.String(sa), - } - - state := &kinesisStreamConsumerState{} - response, err := conn.DescribeStreamConsumer(input) - if err == nil { - state.arn = aws.StringValue(response.ConsumerDescription.ConsumerARN) - state.streamArn = aws.StringValue(response.ConsumerDescription.StreamARN) - state.creationTimestamp = aws.TimeValue(response.ConsumerDescription.ConsumerCreationTimestamp).Unix() - state.status = aws.StringValue(response.ConsumerDescription.ConsumerStatus) + if err != nil { + if tfawserr.ErrCodeEquals(err, kinesis.ErrCodeResourceNotFoundException) { + return nil + } + return fmt.Errorf("error deleting Kinesis Stream Consumer (%s): %w", d.Id(), err) } - return state, err -} - -func streamConsumerStateRefreshFunc(conn *kinesis.Kinesis, cn string, sa string) resource.StateRefreshFunc { - return func() (interface{}, string, error) { - state, err := readKinesisStreamConsumerState(conn, cn, sa) - if err != nil { - if awsErr, ok := err.(awserr.Error); ok { - if awsErr.Code() == "ResourceNotFoundException" { - return 42, "DESTROYED", nil - } - return nil, awsErr.Code(), err - } - return nil, "failed", err + if _, err := waiter.StreamConsumerDeleted(conn, d.Id()); err != nil { + if tfawserr.ErrCodeEquals(err, kinesis.ErrCodeResourceNotFoundException) { + return nil } - - return state, state.status, nil + return fmt.Errorf("error waiting for Kinesis Stream Consumer (%s) deletion: %w", d.Id(), err) } + + return nil } diff --git a/aws/resource_aws_kinesis_stream_consumer_test.go b/aws/resource_aws_kinesis_stream_consumer_test.go index 518b9534586..16818dba9ed 100644 --- a/aws/resource_aws_kinesis_stream_consumer_test.go +++ b/aws/resource_aws_kinesis_stream_consumer_test.go @@ -2,38 +2,39 @@ package aws import ( "fmt" - "strings" + "regexp" "testing" - "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/hashicorp/aws-sdk-go-base/tfawserr" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/acctest" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" "github.com/hashicorp/terraform-plugin-sdk/v2/terraform" + "github.com/terraform-providers/terraform-provider-aws/aws/internal/service/kinesis/finder" ) func TestAccAWSKinesisStreamConsumer_basic(t *testing.T) { - var stream kinesis.StreamDescription - var consumer kinesis.ConsumerDescription - config := createAccKinesisStreamConsumerConfig() - resourceName := config.getName() + resourceName := "aws_kinesis_stream_consumer.test" + streamName := "aws_kinesis_stream.test" + rName := acctest.RandomWithPrefix("tf-acc-test") resource.ParallelTest(t, resource.TestCase{ PreCheck: func() { testAccPreCheck(t) }, Providers: testAccProviders, - CheckDestroy: testAccCheckKinesisStreamConsumerDestroy, + CheckDestroy: testAccCheckAWSKinesisStreamConsumerDestroy, Steps: []resource.TestStep{ { - Config: config.single(), + Config: testAccAWSKinesisStreamConsumerConfig_basic(rName), Check: resource.ComposeTestCheckFunc( - testAccCheckKinesisStreamExists(config.stream.getName(), &stream), - testAccCheckKinesisStreamConsumerExists(config, -1, &consumer), - testAccCheckAWSKinesisStreamConsumerAttributes(config, &consumer), + testAccAWSKinesisStreamConsumerExists(resourceName), + testAccMatchResourceAttrRegionalARN(resourceName, "arn", "kinesis", regexp.MustCompile(fmt.Sprintf("stream/%[1]s/consumer/%[1]s", rName))), + resource.TestCheckResourceAttr(resourceName, "name", rName), + resource.TestCheckResourceAttrPair(resourceName, "stream_arn", streamName, "arn"), + resource.TestCheckResourceAttrSet(resourceName, "creation_timestamp"), ), }, { ResourceName: resourceName, - ImportStateIdFunc: testAccKinesisStreamConsumerImportStateIdFunc(config), ImportState: true, ImportStateVerify: true, }, @@ -41,218 +42,162 @@ func TestAccAWSKinesisStreamConsumer_basic(t *testing.T) { }) } -func testAccKinesisStreamConsumerImportStateIdFunc(config *accKinesisStreamConsumerConfig) resource.ImportStateIdFunc { - return func(s *terraform.State) (string, error) { - return fmt.Sprintf("%s@arn:aws:kinesis:%s:%s:stream/%s", config.getConsumerName(), testAccGetRegion(), testAccGetAccountID(), config.stream.getStreamName()), nil //lintignore:AWSAT005 - } -} +func TestAccAWSKinesisStreamConsumer_disappears(t *testing.T) { + resourceName := "aws_kinesis_stream_consumer.test" + rName := acctest.RandomWithPrefix("tf-acc-test") -func TestAccAWSKinesisStreamConsumer_createMultipleConcurrentStreamConsumers(t *testing.T) { - var stream kinesis.StreamDescription - var consumer kinesis.ConsumerDescription - config := createAccKinesisStreamConsumerConfig() + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: nil, + Steps: []resource.TestStep{ + { + Config: testAccAWSKinesisStreamConsumerConfig_basic(rName), + Check: resource.ComposeTestCheckFunc( + testAccAWSKinesisStreamConsumerExists(resourceName), + testAccCheckResourceDisappears(testAccProvider, resourceAwsKinesisStreamConsumer(), resourceName), + ), + ExpectNonEmptyPlan: true, + }, + }, + }) +} - var checkFunctions []resource.TestCheckFunc +func TestAccAWSKinesisStreamConsumer_MaxConcurrentConsumers(t *testing.T) { + resourceName := "aws_kinesis_stream_consumer.test" + rName := acctest.RandomWithPrefix("tf-acc-test") - checkFunctions = append( - checkFunctions, - testAccCheckKinesisStreamExists(config.stream.getName(), &stream)) + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckAWSKinesisStreamConsumerDestroy, + Steps: []resource.TestStep{ + { + // Test creation of max number (5 according to AWS API docs) of concurrent consumers for a single stream + Config: testAccAWSKinesisStreamConsumerConfig_multiple(rName, 5), + Check: resource.ComposeTestCheckFunc( + testAccAWSKinesisStreamConsumerExists(fmt.Sprintf("%s.0", resourceName)), + testAccAWSKinesisStreamConsumerExists(fmt.Sprintf("%s.1", resourceName)), + testAccAWSKinesisStreamConsumerExists(fmt.Sprintf("%s.2", resourceName)), + testAccAWSKinesisStreamConsumerExists(fmt.Sprintf("%s.3", resourceName)), + testAccAWSKinesisStreamConsumerExists(fmt.Sprintf("%s.4", resourceName)), + ), + }, + }, + }) +} - for i := 0; i < config.count; i++ { - checkFunctions = append( - checkFunctions, - testAccCheckKinesisStreamConsumerExists(config, i, &consumer)) - } +func TestAccAWSKinesisStreamConsumer_ExceedMaxConcurrentConsumers(t *testing.T) { + resourceName := "aws_kinesis_stream_consumer.test" + rName := acctest.RandomWithPrefix("tf-acc-test") resource.ParallelTest(t, resource.TestCase{ PreCheck: func() { testAccPreCheck(t) }, Providers: testAccProviders, - CheckDestroy: testAccCheckKinesisStreamConsumerDestroy, + CheckDestroy: testAccCheckAWSKinesisStreamConsumerDestroy, Steps: []resource.TestStep{ { - Config: config.multiple(), - Check: resource.ComposeTestCheckFunc(checkFunctions...), + // Test creation of more than the max number (5 according to AWS API docs) of concurrent consumers for a single stream + Config: testAccAWSKinesisStreamConsumerConfig_multiple(rName, 10), + Check: resource.ComposeTestCheckFunc( + testAccAWSKinesisStreamConsumerExists(fmt.Sprintf("%s.0", resourceName)), + testAccAWSKinesisStreamConsumerExists(fmt.Sprintf("%s.1", resourceName)), + testAccAWSKinesisStreamConsumerExists(fmt.Sprintf("%s.2", resourceName)), + testAccAWSKinesisStreamConsumerExists(fmt.Sprintf("%s.3", resourceName)), + testAccAWSKinesisStreamConsumerExists(fmt.Sprintf("%s.4", resourceName)), + testAccAWSKinesisStreamConsumerExists(fmt.Sprintf("%s.5", resourceName)), + testAccAWSKinesisStreamConsumerExists(fmt.Sprintf("%s.6", resourceName)), + testAccAWSKinesisStreamConsumerExists(fmt.Sprintf("%s.7", resourceName)), + testAccAWSKinesisStreamConsumerExists(fmt.Sprintf("%s.8", resourceName)), + testAccAWSKinesisStreamConsumerExists(fmt.Sprintf("%s.9", resourceName)), + ), }, }, }) } -func testAccCheckKinesisStreamConsumerExists(c *accKinesisStreamConsumerConfig, - index int, consumer *kinesis.ConsumerDescription) resource.TestCheckFunc { - var resourceName string - if index > -1 { - resourceName = c.getIndexedName(index) - } else { - resourceName = c.getName() - } +func testAccCheckAWSKinesisStreamConsumerDestroy(s *terraform.State) error { + conn := testAccProvider.Meta().(*AWSClient).kinesisconn - return func(s *terraform.State) error { - rs, ok := s.RootModule().Resources[resourceName] - if !ok { - return fmt.Errorf("Not found: %s", resourceName) + for _, rs := range s.RootModule().Resources { + if rs.Type != "aws_kinesis_stream_consumer" { + continue } - if rs.Primary.ID == "" { - return fmt.Errorf("No Kinesis Stream Consumer ID is set") - } + consumer, err := finder.StreamConsumerByARN(conn, rs.Primary.ID) - conn := testAccProvider.Meta().(*AWSClient).kinesisconn - describeOpts := &kinesis.DescribeStreamConsumerInput{ - ConsumerName: aws.String(rs.Primary.Attributes["name"]), - StreamARN: aws.String(rs.Primary.Attributes["stream_arn"]), + if tfawserr.ErrCodeEquals(err, kinesis.ErrCodeResourceNotFoundException) { + continue } - resp, err := conn.DescribeStreamConsumer(describeOpts) + if err != nil { - return err + return fmt.Errorf("error reading Kinesis Stream Consumer (%s): %w", rs.Primary.ID, err) } - *consumer = *resp.ConsumerDescription - - return nil + if consumer != nil { + return fmt.Errorf("Kinesis Stream Consumer (%s) still exists", rs.Primary.ID) + } } + + return nil } -func testAccCheckAWSKinesisStreamConsumerAttributes( - c *accKinesisStreamConsumerConfig, consumer *kinesis.ConsumerDescription) resource.TestCheckFunc { +func testAccAWSKinesisStreamConsumerExists(resourceName string) resource.TestCheckFunc { return func(s *terraform.State) error { - if !strings.HasPrefix(*consumer.ConsumerName, c.getConsumerName()) { - return fmt.Errorf("Bad Stream Consumer name: %s", *consumer.ConsumerName) - } - for _, rs := range s.RootModule().Resources { - if rs.Type != "aws_kinesis_stream_consumer" { - continue - } + rs, ok := s.RootModule().Resources[resourceName] - if *consumer.ConsumerARN != rs.Primary.Attributes["arn"] { - return fmt.Errorf("Bad Stream Consumer(%s) ARN\n\t expected: %s\n\tgot: %s\n", rs.Type, rs.Primary.Attributes["arn"], *consumer.ConsumerARN) - } + if !ok { + return fmt.Errorf("resource %s not found", resourceName) } - return nil - } -} -func testAccCheckKinesisStreamConsumerDestroy(s *terraform.State) error { - for _, rs := range s.RootModule().Resources { - if rs.Type != "aws_kinesis_stream_consumer" { - continue + if rs.Primary.ID == "" { + return fmt.Errorf("resource %s has not set its id", resourceName) } + conn := testAccProvider.Meta().(*AWSClient).kinesisconn - describeOpts := &kinesis.DescribeStreamConsumerInput{ - ConsumerName: aws.String(rs.Primary.Attributes["name"]), - StreamARN: aws.String(rs.Primary.Attributes["stream_arn"]), + + consumer, err := finder.StreamConsumerByARN(conn, rs.Primary.ID) + + if err != nil { + return fmt.Errorf("error reading Kinesis Stream Consumer (%s): %w", rs.Primary.ID, err) } - resp, err := conn.DescribeStreamConsumer(describeOpts) - if err == nil { - if resp.ConsumerDescription != nil && *resp.ConsumerDescription.ConsumerStatus != "DELETING" { - return fmt.Errorf("Error: Stream Consumer still exists") - } + + if consumer == nil { + return fmt.Errorf("Kinesis Stream Consumer (%s) not found", rs.Primary.ID) } return nil - } - - return nil -} - -type accKinesisStreamConfig struct { - resourceType string - resourceLocalName string - streamBasename string - randInt int } -func (r *accKinesisStreamConfig) getName() string { - return fmt.Sprintf("%s.%s", r.resourceType, r.resourceLocalName) -} - -func (r *accKinesisStreamConfig) getStreamName() string { - return fmt.Sprintf("%s-%d", r.streamBasename, r.randInt) -} - -func (r *accKinesisStreamConfig) single() string { - +func testAccAWSKinesisStreamConsumerBaseConfig(rName string) string { return fmt.Sprintf(` -resource "%s" "%s" { - name = "%s" - shard_count = 2 - enforce_consumer_deletion = true - - tags = { - Name = "tf-test" - } -}`, r.resourceType, r.resourceLocalName, r.getStreamName()) -} - -type accKinesisStreamConsumerConfig struct { - stream *accKinesisStreamConfig - resourceType string - resourceLocalName string - consumerBasename string - count int - randInt int +resource "aws_kinesis_stream" "test" { + name = %q + shard_count = 2 } - -func (r *accKinesisStreamConsumerConfig) getName() string { - return fmt.Sprintf("%s.%s", r.resourceType, r.resourceLocalName) -} - -func (r *accKinesisStreamConsumerConfig) getIndexedName(index int) string { - return fmt.Sprintf("%s.%s.%d", r.resourceType, r.resourceLocalName, index) -} - -func (r *accKinesisStreamConsumerConfig) getConsumerName() string { - return fmt.Sprintf("%s-%d", r.consumerBasename, r.randInt) -} - -func (r *accKinesisStreamConsumerConfig) data() string { - return fmt.Sprintf(` -%s - -data "%s" "%s" { - name = "${%s.name}" - stream_arn = "${%s.arn}" -}`, r.single(), r.resourceType, r.resourceLocalName, r.getName(), r.stream.getName()) +`, rName) } -func (r *accKinesisStreamConsumerConfig) single() string { - - return fmt.Sprintf(` -%s - -resource "%s" "%s" { - name = "%s" - stream_arn = "${%s.arn}" +func testAccAWSKinesisStreamConsumerConfig_basic(rName string) string { + return composeConfig( + testAccAWSKinesisStreamConsumerBaseConfig(rName), + fmt.Sprintf(` +resource "aws_kinesis_stream_consumer" "test" { + name = %q + stream_arn = aws_kinesis_stream.test.arn } - -`, r.stream.single(), r.resourceType, r.resourceLocalName, r.getConsumerName(), r.stream.getName()) +`, rName)) } -func (r *accKinesisStreamConsumerConfig) multiple() string { - - return fmt.Sprintf(` -%s - -resource "%s" "%s" { - count = %d - name = "%s-${count.index}" - stream_arn = "${%s.arn}" -}`, r.stream.single(), r.resourceType, r.resourceLocalName, r.count, r.getConsumerName(), r.stream.getName()) +func testAccAWSKinesisStreamConsumerConfig_multiple(rName string, count int) string { + return composeConfig( + testAccAWSKinesisStreamConsumerBaseConfig(rName), + fmt.Sprintf(` +resource "aws_kinesis_stream_consumer" "test" { + count = %d + name = "%s-${count.index}" + stream_arn = aws_kinesis_stream.test.arn } - -func createAccKinesisStreamConsumerConfig() *accKinesisStreamConsumerConfig { - iRnd := acctest.RandInt() - return &accKinesisStreamConsumerConfig{ - stream: &accKinesisStreamConfig{ - resourceType: "aws_kinesis_stream", - resourceLocalName: "test_stream", - streamBasename: "terraform-kinesis-stream-test", - randInt: iRnd, - }, - resourceType: "aws_kinesis_stream_consumer", - resourceLocalName: "test_stream_consumer", - consumerBasename: "terraform-kinesis-stream-consumer-test", - count: 2, - randInt: iRnd, - } +`, count, rName)) } diff --git a/aws/resource_aws_kinesis_stream_test.go b/aws/resource_aws_kinesis_stream_test.go index f080ad624ae..16623fc4c51 100644 --- a/aws/resource_aws_kinesis_stream_test.go +++ b/aws/resource_aws_kinesis_stream_test.go @@ -33,23 +33,20 @@ func testSweepKinesisStreams(region string) error { var sweeperErrs *multierror.Error err = conn.ListStreamsPages(input, func(page *kinesis.ListStreamsOutput, lastPage bool) bool { - for _, streamNamePtr := range page.StreamNames { - if streamNamePtr == nil { + for _, streamName := range page.StreamNames { + if streamName == nil { continue } - streamName := aws.StringValue(streamNamePtr) - input := &kinesis.DeleteStreamInput{ - EnforceConsumerDeletion: aws.Bool(false), - StreamName: streamNamePtr, - } - - log.Printf("[INFO] Deleting Kinesis Stream: %s", streamName) + r := resourceAwsKinesisStream() + d := r.Data(nil) + d.Set("name", streamName) + d.Set("enforce_consumer_deletion", true) - _, err := conn.DeleteStream(input) + err := r.Delete(d, client) if err != nil { - sweeperErr := fmt.Errorf("error deleting Kinesis Stream (%s): %w", streamName, err) + sweeperErr := fmt.Errorf("error deleting Kinesis Stream (%s): %w", aws.StringValue(streamName), err) log.Printf("[ERROR] %s", sweeperErr) sweeperErrs = multierror.Append(sweeperErrs, sweeperErr) } diff --git a/website/aws.erb b/website/aws.erb deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/website/docs/d/kinesis_stream_consumer.html.markdown b/website/docs/d/kinesis_stream_consumer.html.markdown index c7442ed3fcf..eec1bd5acf6 100644 --- a/website/docs/d/kinesis_stream_consumer.html.markdown +++ b/website/docs/d/kinesis_stream_consumer.html.markdown @@ -3,39 +3,36 @@ subcategory: "Kinesis" layout: "aws" page_title: "AWS: aws_kinesis_stream_consumer" description: |- - Provides a Kinesis Stream Consumer data source. + Provides details about a Kinesis Stream Consumer. --- # Data Source: aws_kinesis_stream_consumer -Use this data source to get information about a Kinesis Stream Consumer for use in other -resources. +Provides details about a Kinesis Stream Consumer. For more details, see the [Amazon Kinesis Stream Consumer Documentation][1]. ## Example Usage ```hcl -data "aws_kinesis_stream_consumer" "stream_consumer" { - name = "stream-consumer-name" - stream_arn = aws_kinesis_stream.stream.arn +data "aws_kinesis_stream_consumer" "example" { + name = "example-consumer" + stream_arn = aws_kinesis_stream.example.arn } ``` ## Argument Reference -* `name` - (Required) The name of the Kinesis Stream Consumer. -* `stream_arn` - (Required) The Amazon Resource Name (ARN) of the Kinesis Stream. +* `arn` - (Optional) Amazon Resource Name (ARN) of the stream consumer. +* `name` - (Optional) Name of the stream consumer. +* `stream_arn` - (Required) Amazon Resource Name (ARN) of the data stream the consumer is registered with. ## Attributes Reference -`id` is set to the Amazon Resource Name (ARN) of the Kinesis Stream Consumer. In addition, the following attributes -are exported: +In addition to all arguments above, the following attributes are exported: -* `arn` - The Amazon Resource Name (ARN) of the Kinesis Stream Consumer (same as id). -* `name` - The name of the Kinesis Stream Consumer. -* `creation_timestamp` - The approximate UNIX timestamp that the stream consumer was created. -* `status` - The current status of the stream consumer. The stream status is one of CREATING, DELETING, ACTIVE, or UPDATING. -* `stream_arn` - The Amazon Resource Name (ARN) of the Kinesis Stream. +* `creation_timestamp` - Approximate timestamp in [RFC3339 format](https://tools.ietf.org/html/rfc3339#section-5.8) of when the stream consumer was created. +* `id` - Amazon Resource Name (ARN) of the stream consumer. +* `status` - The current status of the stream consumer. -[1]: https://docs.aws.amazon.com/streams/latest/dev/introduction-to-enhanced-consumers.html +[1]: https://docs.aws.amazon.com/streams/latest/dev/amazon-kinesis-consumers.html diff --git a/website/docs/r/kinesis_stream_consumer.html.markdown b/website/docs/r/kinesis_stream_consumer.html.markdown index e379a290772..dd204b1e6bb 100644 --- a/website/docs/r/kinesis_stream_consumer.html.markdown +++ b/website/docs/r/kinesis_stream_consumer.html.markdown @@ -3,36 +3,28 @@ subcategory: "Kinesis" layout: "aws" page_title: "AWS: aws_kinesis_stream_consumer" description: |- - Provides a AWS Kinesis Stream Consumer + Manages a Kinesis Stream Consumer. --- -# aws_kinesis_stream_consumer +# Resource: aws_kinesis_stream_consumer -Provides a Kinesis Stream Consumer resource. A consumer is an application that processes all data from a Kinesis data stream. -When a consumer uses enhanced fan-out, it gets its own 2 MiB/sec allotment of read throughput, allowing multiple consumers to -read data from the same stream in parallel, without contending for read throughput with other consumers. [Reading Data from Kinesis][1] +Provides a resource to manage a Kinesis Stream Consumer. -You can register up to 20 consumers per stream. However, you can request a limit increase using the [Kinesis Data Streams limits form][2]. A given consumer can only be registered with one stream at a time. +-> **Note:** You can register up to 20 consumers per stream. A given consumer can only be registered with one stream at a time. -For more details, see the [Amazon Kinesis Stream Consumer Documentation][3]. +For more details, see the [Amazon Kinesis Stream Consumer Documentation][1]. ## Example Usage ```hcl -resource "aws_kinesis_stream" "test_stream" { - name = "terraform-kinesis-test" +resource "aws_kinesis_stream" "example" { + name = "example-stream" shard_count = 1 } -resource "aws_lambda_event_source_mapping" "example" { - event_source_arn = aws_kinesis_stream_consumer.test_stream_consumer.arn - function_name = aws_lambda_function.example.arn - starting_position = "LATEST" -} - -resource "aws_kinesis_stream_consumer" "test_stream_consumer" { - name = "terraform-kinesis-stream-consumer-test" - stream_arn = aws_kinesis_stream.test_stream.arn +resource "aws_kinesis_stream_consumer" "example" { + name = "example-consumer" + stream_arn = aws_kinesis_stream.example.arn } ``` @@ -40,29 +32,23 @@ resource "aws_kinesis_stream_consumer" "test_stream_consumer" { The following arguments are supported: -* `name` - (Required) A name to identify the stream. This is unique to the -AWS account and region the Stream Consumer is created in. -* `stream_arn` – (Required) The Amazon Resource Name (ARN) of the Kinesis Stream, the Consumer is connected to. +* `name` - (Required, Forces new resource) Name of the stream consumer. +* `stream_arn` – (Required, Forces new resource) Amazon Resource Name (ARN) of the data stream the consumer is registered with. ## Attributes Reference -* `id` - The unique Stream Consumer id -* `name` - The unique Stream Consumer name -* `arn` - The Amazon Resource Name (ARN) specifying the Stream Consumer (same as `id`) - -## Timeouts +In addition to all arguments above, the following attributes are exported: -`aws_kinesis_stream_consumer` provides the following [Timeouts](/docs/configuration/resources.html#timeouts) configuration options: +* `arn` - Amazon Resource Name (ARN) of the stream consumer. +* `creation_timestamp` - Approximate timestamp in [RFC3339 format](https://tools.ietf.org/html/rfc3339#section-5.8) of when the stream consumer was created. +* `id` - Amazon Resource Name (ARN) of the stream consumer. -- `create` - (Default `5 minutes`) Used for Creating a Kinesis Stream Consumer -- `delete` - (Default `120 minutes`) Used for Destroying a Kinesis Stream Consumer +## Import -Kinesis Streams can be imported using the `name@stream_arn`, e.g. +Kinesis Stream Consumers can be imported using the Amazon Resource Name (ARN) e.g. ``` -$ terraform import aws_kinesis_stream_consumer.test_stream_consumer terraform-kinesis-stream-consumer-test@arn:aws:kinesis:us-west-2:123456789012:stream/my-stream +$ terraform import aws_kinesis_stream_consumer.example arn:aws:kinesis:us-west-2:123456789012:stream/example/consumer/example:1616044553 ``` -[1]: https://docs.aws.amazon.com/streams/latest/dev/building-consumers.html -[2]: https://console.aws.amazon.com/support/v1?#/ -[3]: https://docs.aws.amazon.com/streams/latest/dev/introduction-to-enhanced-consumers.html +[1]: https://docs.aws.amazon.com/streams/latest/dev/amazon-kinesis-consumers.html \ No newline at end of file