diff --git a/.changelog/17149.txt b/.changelog/17149.txt new file mode 100644 index 00000000000..8d6f8cf48f1 --- /dev/null +++ b/.changelog/17149.txt @@ -0,0 +1,7 @@ +```release-note:new-data-source +aws_kinesis_stream_consumer +``` + +```release-note:new-resource +aws_kinesis_stream_consumer +``` diff --git a/aws/data_source_aws_kinesis_stream_consumer.go b/aws/data_source_aws_kinesis_stream_consumer.go new file mode 100644 index 00000000000..fe8bdac2d3f --- /dev/null +++ b/aws/data_source_aws_kinesis_stream_consumer.go @@ -0,0 +1,107 @@ +package aws + +import ( + "fmt" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" +) + +func dataSourceAwsKinesisStreamConsumer() *schema.Resource { + return &schema.Resource{ + Read: dataSourceAwsKinesisStreamConsumerRead, + + Schema: map[string]*schema.Schema{ + "arn": { + Type: schema.TypeString, + Optional: true, + Computed: true, + ValidateFunc: validateArn, + }, + + "creation_timestamp": { + Type: schema.TypeString, + Computed: true, + }, + + "name": { + Type: schema.TypeString, + Optional: true, + Computed: true, + }, + + "status": { + Type: schema.TypeString, + Computed: true, + }, + + "stream_arn": { + Type: schema.TypeString, + Required: true, + ValidateFunc: validateArn, + }, + }, + } +} + +func dataSourceAwsKinesisStreamConsumerRead(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).kinesisconn + + streamArn := d.Get("stream_arn").(string) + + input := &kinesis.ListStreamConsumersInput{ + StreamARN: aws.String(streamArn), + } + + var results []*kinesis.Consumer + + err := conn.ListStreamConsumersPages(input, func(page *kinesis.ListStreamConsumersOutput, lastPage bool) bool { + if page == nil { + return !lastPage + } + + for _, consumer := range page.Consumers { + if consumer == nil { + continue + } + + if v, ok := d.GetOk("name"); ok && v.(string) != aws.StringValue(consumer.ConsumerName) { + continue + } + + if v, ok := d.GetOk("arn"); ok && v.(string) != aws.StringValue(consumer.ConsumerARN) { + continue + } + + results = append(results, consumer) + + } + + return !lastPage + }) + + if err != nil { + return fmt.Errorf("error listing Kinesis Stream Consumers: %w", err) + } + + if len(results) == 0 { + return fmt.Errorf("no Kinesis Stream Consumer found matching criteria; try different search") + } + + if len(results) > 1 { + return fmt.Errorf("multiple Kinesis Stream Consumers found matching criteria; try different search") + } + + consumer := results[0] + + d.SetId(aws.StringValue(consumer.ConsumerARN)) + d.Set("arn", consumer.ConsumerARN) + d.Set("name", consumer.ConsumerName) + d.Set("status", consumer.ConsumerStatus) + d.Set("stream_arn", streamArn) + d.Set("creation_timestamp", aws.TimeValue(consumer.ConsumerCreationTimestamp).Format(time.RFC3339)) + + return nil +} diff --git a/aws/data_source_aws_kinesis_stream_consumer_test.go b/aws/data_source_aws_kinesis_stream_consumer_test.go new file mode 100644 index 00000000000..20c07c136de --- /dev/null +++ b/aws/data_source_aws_kinesis_stream_consumer_test.go @@ -0,0 +1,140 @@ +package aws + +import ( + "fmt" + "testing" + + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/acctest" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" +) + +func TestAccAWSKinesisStreamConsumerDataSource_basic(t *testing.T) { + rName := acctest.RandomWithPrefix("tf-acc-test") + dataSourceName := "data.aws_kinesis_stream_consumer.test" + resourceName := "aws_kinesis_stream_consumer.test" + streamName := "aws_kinesis_stream.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: nil, + Steps: []resource.TestStep{ + { + Config: testAccAWSKinesisStreamConsumerDataSourceConfig(rName), + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttrPair(dataSourceName, "arn", resourceName, "arn"), + resource.TestCheckResourceAttrPair(dataSourceName, "name", resourceName, "name"), + resource.TestCheckResourceAttrPair(dataSourceName, "stream_arn", streamName, "arn"), + resource.TestCheckResourceAttrSet(dataSourceName, "creation_timestamp"), + resource.TestCheckResourceAttrSet(dataSourceName, "status"), + ), + }, + }, + }) +} + +func TestAccAWSKinesisStreamConsumerDataSource_Name(t *testing.T) { + rName := acctest.RandomWithPrefix("tf-acc-test") + dataSourceName := "data.aws_kinesis_stream_consumer.test" + resourceName := "aws_kinesis_stream_consumer.test" + streamName := "aws_kinesis_stream.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: nil, + Steps: []resource.TestStep{ + { + Config: testAccAWSKinesisStreamConsumerDataSourceConfigName(rName), + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttrPair(dataSourceName, "arn", resourceName, "arn"), + resource.TestCheckResourceAttrPair(dataSourceName, "name", resourceName, "name"), + resource.TestCheckResourceAttrPair(dataSourceName, "stream_arn", streamName, "arn"), + resource.TestCheckResourceAttrSet(dataSourceName, "creation_timestamp"), + resource.TestCheckResourceAttrSet(dataSourceName, "status"), + ), + }, + }, + }) +} + +func TestAccAWSKinesisStreamConsumerDataSource_Arn(t *testing.T) { + rName := acctest.RandomWithPrefix("tf-acc-test") + dataSourceName := "data.aws_kinesis_stream_consumer.test" + resourceName := "aws_kinesis_stream_consumer.test" + streamName := "aws_kinesis_stream.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: nil, + Steps: []resource.TestStep{ + { + Config: testAccAWSKinesisStreamConsumerDataSourceConfigArn(rName), + Check: resource.ComposeTestCheckFunc( + resource.TestCheckResourceAttrPair(dataSourceName, "arn", resourceName, "arn"), + resource.TestCheckResourceAttrPair(dataSourceName, "name", resourceName, "name"), + resource.TestCheckResourceAttrPair(dataSourceName, "stream_arn", streamName, "arn"), + resource.TestCheckResourceAttrSet(dataSourceName, "creation_timestamp"), + resource.TestCheckResourceAttrSet(dataSourceName, "status"), + ), + }, + }, + }) +} + +func testAccAWSKinesisStreamConsumerDataSourceBaseConfig(rName string) string { + return fmt.Sprintf(` +resource "aws_kinesis_stream" "test" { + name = %q + shard_count = 2 +} +`, rName) +} + +func testAccAWSKinesisStreamConsumerDataSourceConfig(rName string) string { + return composeConfig( + testAccAWSKinesisStreamConsumerDataSourceBaseConfig(rName), + fmt.Sprintf(` +data "aws_kinesis_stream_consumer" "test" { + stream_arn = aws_kinesis_stream_consumer.test.stream_arn +} + +resource "aws_kinesis_stream_consumer" "test" { + name = %q + stream_arn = aws_kinesis_stream.test.arn +} +`, rName)) +} + +func testAccAWSKinesisStreamConsumerDataSourceConfigName(rName string) string { + return composeConfig( + testAccAWSKinesisStreamConsumerDataSourceBaseConfig(rName), + fmt.Sprintf(` +data "aws_kinesis_stream_consumer" "test" { + name = aws_kinesis_stream_consumer.test.name + stream_arn = aws_kinesis_stream_consumer.test.stream_arn +} + +resource "aws_kinesis_stream_consumer" "test" { + name = %q + stream_arn = aws_kinesis_stream.test.arn +} +`, rName)) +} + +func testAccAWSKinesisStreamConsumerDataSourceConfigArn(rName string) string { + return composeConfig( + testAccAWSKinesisStreamConsumerDataSourceBaseConfig(rName), + fmt.Sprintf(` +data "aws_kinesis_stream_consumer" "test" { + arn = aws_kinesis_stream_consumer.test.arn + stream_arn = aws_kinesis_stream_consumer.test.stream_arn +} + +resource "aws_kinesis_stream_consumer" "test" { + name = %q + stream_arn = aws_kinesis_stream.test.arn +} +`, rName)) +} diff --git a/aws/internal/service/kinesis/finder/finder.go b/aws/internal/service/kinesis/finder/finder.go new file mode 100644 index 00000000000..ac13b1b0d05 --- /dev/null +++ b/aws/internal/service/kinesis/finder/finder.go @@ -0,0 +1,25 @@ +package finder + +import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/kinesis" +) + +// StreamConsumerByARN returns the stream consumer corresponding to the specified ARN. +// Returns nil if no stream consumer is found. +func StreamConsumerByARN(conn *kinesis.Kinesis, arn string) (*kinesis.ConsumerDescription, error) { + input := &kinesis.DescribeStreamConsumerInput{ + ConsumerARN: aws.String(arn), + } + + output, err := conn.DescribeStreamConsumer(input) + if err != nil { + return nil, err + } + + if output == nil { + return nil, nil + } + + return output.ConsumerDescription, nil +} diff --git a/aws/internal/service/kinesis/waiter/status.go b/aws/internal/service/kinesis/waiter/status.go new file mode 100644 index 00000000000..b8403ac6e27 --- /dev/null +++ b/aws/internal/service/kinesis/waiter/status.go @@ -0,0 +1,30 @@ +package waiter + +import ( + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" + "github.com/terraform-providers/terraform-provider-aws/aws/internal/service/kinesis/finder" +) + +const ( + StreamConsumerStatusNotFound = "NotFound" + StreamConsumerStatusUnknown = "Unknown" +) + +// StreamConsumerStatus fetches the StreamConsumer and its Status +func StreamConsumerStatus(conn *kinesis.Kinesis, arn string) resource.StateRefreshFunc { + return func() (interface{}, string, error) { + consumer, err := finder.StreamConsumerByARN(conn, arn) + + if err != nil { + return nil, StreamConsumerStatusUnknown, err + } + + if consumer == nil { + return nil, StreamConsumerStatusNotFound, nil + } + + return consumer, aws.StringValue(consumer.ConsumerStatus), nil + } +} diff --git a/aws/internal/service/kinesis/waiter/waiter.go b/aws/internal/service/kinesis/waiter/waiter.go new file mode 100644 index 00000000000..1e3814bcd65 --- /dev/null +++ b/aws/internal/service/kinesis/waiter/waiter.go @@ -0,0 +1,49 @@ +package waiter + +import ( + "time" + + "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" +) + +const ( + StreamConsumerCreatedTimeout = 5 * time.Minute + StreamConsumerDeletedTimeout = 5 * time.Minute +) + +// StreamConsumerCreated waits for an Stream Consumer to return Active +func StreamConsumerCreated(conn *kinesis.Kinesis, arn string) (*kinesis.ConsumerDescription, error) { + stateConf := &resource.StateChangeConf{ + Pending: []string{kinesis.ConsumerStatusCreating}, + Target: []string{kinesis.ConsumerStatusActive}, + Refresh: StreamConsumerStatus(conn, arn), + Timeout: StreamConsumerCreatedTimeout, + } + + outputRaw, err := stateConf.WaitForState() + + if v, ok := outputRaw.(*kinesis.ConsumerDescription); ok { + return v, err + } + + return nil, err +} + +// StreamConsumerDeleted waits for a Stream Consumer to be deleted +func StreamConsumerDeleted(conn *kinesis.Kinesis, arn string) (*kinesis.ConsumerDescription, error) { + stateConf := &resource.StateChangeConf{ + Pending: []string{kinesis.ConsumerStatusDeleting}, + Target: []string{}, + Refresh: StreamConsumerStatus(conn, arn), + Timeout: StreamConsumerDeletedTimeout, + } + + outputRaw, err := stateConf.WaitForState() + + if v, ok := outputRaw.(*kinesis.ConsumerDescription); ok { + return v, err + } + + return nil, err +} diff --git a/aws/provider.go b/aws/provider.go index c44cbf810ca..1b6517c583b 100644 --- a/aws/provider.go +++ b/aws/provider.go @@ -290,6 +290,7 @@ func Provider() *schema.Provider { "aws_iot_endpoint": dataSourceAwsIotEndpoint(), "aws_ip_ranges": dataSourceAwsIPRanges(), "aws_kinesis_stream": dataSourceAwsKinesisStream(), + "aws_kinesis_stream_consumer": dataSourceAwsKinesisStreamConsumer(), "aws_kms_alias": dataSourceAwsKmsAlias(), "aws_kms_ciphertext": dataSourceAwsKmsCiphertext(), "aws_kms_key": dataSourceAwsKmsKey(), @@ -770,6 +771,7 @@ func Provider() *schema.Provider { "aws_kinesisanalyticsv2_application": resourceAwsKinesisAnalyticsV2Application(), "aws_kinesis_firehose_delivery_stream": resourceAwsKinesisFirehoseDeliveryStream(), "aws_kinesis_stream": resourceAwsKinesisStream(), + "aws_kinesis_stream_consumer": resourceAwsKinesisStreamConsumer(), "aws_kinesis_video_stream": resourceAwsKinesisVideoStream(), "aws_kms_alias": resourceAwsKmsAlias(), "aws_kms_external_key": resourceAwsKmsExternalKey(), diff --git a/aws/resource_aws_kinesis_stream_consumer.go b/aws/resource_aws_kinesis_stream_consumer.go new file mode 100644 index 00000000000..cefb1202ac6 --- /dev/null +++ b/aws/resource_aws_kinesis_stream_consumer.go @@ -0,0 +1,138 @@ +package aws + +import ( + "fmt" + "log" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/hashicorp/aws-sdk-go-base/tfawserr" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" + "github.com/terraform-providers/terraform-provider-aws/aws/internal/service/kinesis/finder" + "github.com/terraform-providers/terraform-provider-aws/aws/internal/service/kinesis/waiter" +) + +func resourceAwsKinesisStreamConsumer() *schema.Resource { + return &schema.Resource{ + Create: resourceAwsKinesisStreamConsumerCreate, + Read: resourceAwsKinesisStreamConsumerRead, + Delete: resourceAwsKinesisStreamConsumerDelete, + + Importer: &schema.ResourceImporter{ + State: schema.ImportStatePassthrough, + }, + + Schema: map[string]*schema.Schema{ + "arn": { + Type: schema.TypeString, + Computed: true, + }, + + "creation_timestamp": { + Type: schema.TypeString, + Computed: true, + }, + + "name": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + }, + + "stream_arn": { + Type: schema.TypeString, + Required: true, + ForceNew: true, + ValidateFunc: validateArn, + }, + }, + } +} + +func resourceAwsKinesisStreamConsumerCreate(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).kinesisconn + + name := d.Get("name").(string) + streamArn := d.Get("stream_arn").(string) + + input := &kinesis.RegisterStreamConsumerInput{ + ConsumerName: aws.String(name), + StreamARN: aws.String(streamArn), + } + + output, err := conn.RegisterStreamConsumer(input) + if err != nil { + return fmt.Errorf("error creating Kinesis Stream Consumer (%s): %w", name, err) + } + + if output == nil || output.Consumer == nil { + return fmt.Errorf("error creating Kinesis Stream Consumer (%s): empty output", name) + } + + d.SetId(aws.StringValue(output.Consumer.ConsumerARN)) + + if _, err := waiter.StreamConsumerCreated(conn, d.Id()); err != nil { + return fmt.Errorf("error waiting for Kinesis Stream Consumer (%s) creation: %w", d.Id(), err) + } + + return resourceAwsKinesisStreamConsumerRead(d, meta) +} + +func resourceAwsKinesisStreamConsumerRead(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).kinesisconn + + consumer, err := finder.StreamConsumerByARN(conn, d.Id()) + + if !d.IsNewResource() && tfawserr.ErrCodeEquals(err, kinesis.ErrCodeResourceNotFoundException) { + log.Printf("[WARN] Kinesis Stream Consumer (%s) not found, removing from state", d.Id()) + d.SetId("") + return nil + } + + if err != nil { + return fmt.Errorf("error reading Kinesis Stream Consumer (%s): %w", d.Id(), err) + } + + if consumer == nil { + if d.IsNewResource() { + return fmt.Errorf("error reading Kinesis Stream Consumer (%s): empty output after creation", d.Id()) + } + log.Printf("[WARN] Kinesis Stream Consumer (%s) not found, removing from state", d.Id()) + d.SetId("") + return nil + } + + d.Set("arn", consumer.ConsumerARN) + d.Set("name", consumer.ConsumerName) + d.Set("creation_timestamp", aws.TimeValue(consumer.ConsumerCreationTimestamp).Format(time.RFC3339)) + d.Set("stream_arn", consumer.StreamARN) + + return nil +} + +func resourceAwsKinesisStreamConsumerDelete(d *schema.ResourceData, meta interface{}) error { + conn := meta.(*AWSClient).kinesisconn + + input := &kinesis.DeregisterStreamConsumerInput{ + ConsumerARN: aws.String(d.Id()), + } + + _, err := conn.DeregisterStreamConsumer(input) + + if err != nil { + if tfawserr.ErrCodeEquals(err, kinesis.ErrCodeResourceNotFoundException) { + return nil + } + return fmt.Errorf("error deleting Kinesis Stream Consumer (%s): %w", d.Id(), err) + } + + if _, err := waiter.StreamConsumerDeleted(conn, d.Id()); err != nil { + if tfawserr.ErrCodeEquals(err, kinesis.ErrCodeResourceNotFoundException) { + return nil + } + return fmt.Errorf("error waiting for Kinesis Stream Consumer (%s) deletion: %w", d.Id(), err) + } + + return nil +} diff --git a/aws/resource_aws_kinesis_stream_consumer_test.go b/aws/resource_aws_kinesis_stream_consumer_test.go new file mode 100644 index 00000000000..16818dba9ed --- /dev/null +++ b/aws/resource_aws_kinesis_stream_consumer_test.go @@ -0,0 +1,203 @@ +package aws + +import ( + "fmt" + "regexp" + "testing" + + "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/hashicorp/aws-sdk-go-base/tfawserr" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/acctest" + "github.com/hashicorp/terraform-plugin-sdk/v2/helper/resource" + "github.com/hashicorp/terraform-plugin-sdk/v2/terraform" + "github.com/terraform-providers/terraform-provider-aws/aws/internal/service/kinesis/finder" +) + +func TestAccAWSKinesisStreamConsumer_basic(t *testing.T) { + resourceName := "aws_kinesis_stream_consumer.test" + streamName := "aws_kinesis_stream.test" + rName := acctest.RandomWithPrefix("tf-acc-test") + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckAWSKinesisStreamConsumerDestroy, + Steps: []resource.TestStep{ + { + Config: testAccAWSKinesisStreamConsumerConfig_basic(rName), + Check: resource.ComposeTestCheckFunc( + testAccAWSKinesisStreamConsumerExists(resourceName), + testAccMatchResourceAttrRegionalARN(resourceName, "arn", "kinesis", regexp.MustCompile(fmt.Sprintf("stream/%[1]s/consumer/%[1]s", rName))), + resource.TestCheckResourceAttr(resourceName, "name", rName), + resource.TestCheckResourceAttrPair(resourceName, "stream_arn", streamName, "arn"), + resource.TestCheckResourceAttrSet(resourceName, "creation_timestamp"), + ), + }, + { + ResourceName: resourceName, + ImportState: true, + ImportStateVerify: true, + }, + }, + }) +} + +func TestAccAWSKinesisStreamConsumer_disappears(t *testing.T) { + resourceName := "aws_kinesis_stream_consumer.test" + rName := acctest.RandomWithPrefix("tf-acc-test") + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: nil, + Steps: []resource.TestStep{ + { + Config: testAccAWSKinesisStreamConsumerConfig_basic(rName), + Check: resource.ComposeTestCheckFunc( + testAccAWSKinesisStreamConsumerExists(resourceName), + testAccCheckResourceDisappears(testAccProvider, resourceAwsKinesisStreamConsumer(), resourceName), + ), + ExpectNonEmptyPlan: true, + }, + }, + }) +} + +func TestAccAWSKinesisStreamConsumer_MaxConcurrentConsumers(t *testing.T) { + resourceName := "aws_kinesis_stream_consumer.test" + rName := acctest.RandomWithPrefix("tf-acc-test") + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckAWSKinesisStreamConsumerDestroy, + Steps: []resource.TestStep{ + { + // Test creation of max number (5 according to AWS API docs) of concurrent consumers for a single stream + Config: testAccAWSKinesisStreamConsumerConfig_multiple(rName, 5), + Check: resource.ComposeTestCheckFunc( + testAccAWSKinesisStreamConsumerExists(fmt.Sprintf("%s.0", resourceName)), + testAccAWSKinesisStreamConsumerExists(fmt.Sprintf("%s.1", resourceName)), + testAccAWSKinesisStreamConsumerExists(fmt.Sprintf("%s.2", resourceName)), + testAccAWSKinesisStreamConsumerExists(fmt.Sprintf("%s.3", resourceName)), + testAccAWSKinesisStreamConsumerExists(fmt.Sprintf("%s.4", resourceName)), + ), + }, + }, + }) +} + +func TestAccAWSKinesisStreamConsumer_ExceedMaxConcurrentConsumers(t *testing.T) { + resourceName := "aws_kinesis_stream_consumer.test" + rName := acctest.RandomWithPrefix("tf-acc-test") + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { testAccPreCheck(t) }, + Providers: testAccProviders, + CheckDestroy: testAccCheckAWSKinesisStreamConsumerDestroy, + Steps: []resource.TestStep{ + { + // Test creation of more than the max number (5 according to AWS API docs) of concurrent consumers for a single stream + Config: testAccAWSKinesisStreamConsumerConfig_multiple(rName, 10), + Check: resource.ComposeTestCheckFunc( + testAccAWSKinesisStreamConsumerExists(fmt.Sprintf("%s.0", resourceName)), + testAccAWSKinesisStreamConsumerExists(fmt.Sprintf("%s.1", resourceName)), + testAccAWSKinesisStreamConsumerExists(fmt.Sprintf("%s.2", resourceName)), + testAccAWSKinesisStreamConsumerExists(fmt.Sprintf("%s.3", resourceName)), + testAccAWSKinesisStreamConsumerExists(fmt.Sprintf("%s.4", resourceName)), + testAccAWSKinesisStreamConsumerExists(fmt.Sprintf("%s.5", resourceName)), + testAccAWSKinesisStreamConsumerExists(fmt.Sprintf("%s.6", resourceName)), + testAccAWSKinesisStreamConsumerExists(fmt.Sprintf("%s.7", resourceName)), + testAccAWSKinesisStreamConsumerExists(fmt.Sprintf("%s.8", resourceName)), + testAccAWSKinesisStreamConsumerExists(fmt.Sprintf("%s.9", resourceName)), + ), + }, + }, + }) +} + +func testAccCheckAWSKinesisStreamConsumerDestroy(s *terraform.State) error { + conn := testAccProvider.Meta().(*AWSClient).kinesisconn + + for _, rs := range s.RootModule().Resources { + if rs.Type != "aws_kinesis_stream_consumer" { + continue + } + + consumer, err := finder.StreamConsumerByARN(conn, rs.Primary.ID) + + if tfawserr.ErrCodeEquals(err, kinesis.ErrCodeResourceNotFoundException) { + continue + } + + if err != nil { + return fmt.Errorf("error reading Kinesis Stream Consumer (%s): %w", rs.Primary.ID, err) + } + + if consumer != nil { + return fmt.Errorf("Kinesis Stream Consumer (%s) still exists", rs.Primary.ID) + } + } + + return nil +} + +func testAccAWSKinesisStreamConsumerExists(resourceName string) resource.TestCheckFunc { + return func(s *terraform.State) error { + rs, ok := s.RootModule().Resources[resourceName] + + if !ok { + return fmt.Errorf("resource %s not found", resourceName) + } + + if rs.Primary.ID == "" { + return fmt.Errorf("resource %s has not set its id", resourceName) + } + + conn := testAccProvider.Meta().(*AWSClient).kinesisconn + + consumer, err := finder.StreamConsumerByARN(conn, rs.Primary.ID) + + if err != nil { + return fmt.Errorf("error reading Kinesis Stream Consumer (%s): %w", rs.Primary.ID, err) + } + + if consumer == nil { + return fmt.Errorf("Kinesis Stream Consumer (%s) not found", rs.Primary.ID) + } + + return nil + } +} + +func testAccAWSKinesisStreamConsumerBaseConfig(rName string) string { + return fmt.Sprintf(` +resource "aws_kinesis_stream" "test" { + name = %q + shard_count = 2 +} +`, rName) +} + +func testAccAWSKinesisStreamConsumerConfig_basic(rName string) string { + return composeConfig( + testAccAWSKinesisStreamConsumerBaseConfig(rName), + fmt.Sprintf(` +resource "aws_kinesis_stream_consumer" "test" { + name = %q + stream_arn = aws_kinesis_stream.test.arn +} +`, rName)) +} + +func testAccAWSKinesisStreamConsumerConfig_multiple(rName string, count int) string { + return composeConfig( + testAccAWSKinesisStreamConsumerBaseConfig(rName), + fmt.Sprintf(` +resource "aws_kinesis_stream_consumer" "test" { + count = %d + name = "%s-${count.index}" + stream_arn = aws_kinesis_stream.test.arn +} +`, count, rName)) +} diff --git a/aws/resource_aws_kinesis_stream_test.go b/aws/resource_aws_kinesis_stream_test.go index f080ad624ae..16623fc4c51 100644 --- a/aws/resource_aws_kinesis_stream_test.go +++ b/aws/resource_aws_kinesis_stream_test.go @@ -33,23 +33,20 @@ func testSweepKinesisStreams(region string) error { var sweeperErrs *multierror.Error err = conn.ListStreamsPages(input, func(page *kinesis.ListStreamsOutput, lastPage bool) bool { - for _, streamNamePtr := range page.StreamNames { - if streamNamePtr == nil { + for _, streamName := range page.StreamNames { + if streamName == nil { continue } - streamName := aws.StringValue(streamNamePtr) - input := &kinesis.DeleteStreamInput{ - EnforceConsumerDeletion: aws.Bool(false), - StreamName: streamNamePtr, - } - - log.Printf("[INFO] Deleting Kinesis Stream: %s", streamName) + r := resourceAwsKinesisStream() + d := r.Data(nil) + d.Set("name", streamName) + d.Set("enforce_consumer_deletion", true) - _, err := conn.DeleteStream(input) + err := r.Delete(d, client) if err != nil { - sweeperErr := fmt.Errorf("error deleting Kinesis Stream (%s): %w", streamName, err) + sweeperErr := fmt.Errorf("error deleting Kinesis Stream (%s): %w", aws.StringValue(streamName), err) log.Printf("[ERROR] %s", sweeperErr) sweeperErrs = multierror.Append(sweeperErrs, sweeperErr) } diff --git a/website/docs/d/kinesis_stream_consumer.html.markdown b/website/docs/d/kinesis_stream_consumer.html.markdown new file mode 100644 index 00000000000..eec1bd5acf6 --- /dev/null +++ b/website/docs/d/kinesis_stream_consumer.html.markdown @@ -0,0 +1,38 @@ +--- +subcategory: "Kinesis" +layout: "aws" +page_title: "AWS: aws_kinesis_stream_consumer" +description: |- + Provides details about a Kinesis Stream Consumer. +--- + +# Data Source: aws_kinesis_stream_consumer + +Provides details about a Kinesis Stream Consumer. + +For more details, see the [Amazon Kinesis Stream Consumer Documentation][1]. + +## Example Usage + +```hcl +data "aws_kinesis_stream_consumer" "example" { + name = "example-consumer" + stream_arn = aws_kinesis_stream.example.arn +} +``` + +## Argument Reference + +* `arn` - (Optional) Amazon Resource Name (ARN) of the stream consumer. +* `name` - (Optional) Name of the stream consumer. +* `stream_arn` - (Required) Amazon Resource Name (ARN) of the data stream the consumer is registered with. + +## Attributes Reference + +In addition to all arguments above, the following attributes are exported: + +* `creation_timestamp` - Approximate timestamp in [RFC3339 format](https://tools.ietf.org/html/rfc3339#section-5.8) of when the stream consumer was created. +* `id` - Amazon Resource Name (ARN) of the stream consumer. +* `status` - The current status of the stream consumer. + +[1]: https://docs.aws.amazon.com/streams/latest/dev/amazon-kinesis-consumers.html diff --git a/website/docs/r/kinesis_stream_consumer.html.markdown b/website/docs/r/kinesis_stream_consumer.html.markdown new file mode 100644 index 00000000000..dd204b1e6bb --- /dev/null +++ b/website/docs/r/kinesis_stream_consumer.html.markdown @@ -0,0 +1,54 @@ +--- +subcategory: "Kinesis" +layout: "aws" +page_title: "AWS: aws_kinesis_stream_consumer" +description: |- + Manages a Kinesis Stream Consumer. +--- + +# Resource: aws_kinesis_stream_consumer + +Provides a resource to manage a Kinesis Stream Consumer. + +-> **Note:** You can register up to 20 consumers per stream. A given consumer can only be registered with one stream at a time. + +For more details, see the [Amazon Kinesis Stream Consumer Documentation][1]. + +## Example Usage + +```hcl +resource "aws_kinesis_stream" "example" { + name = "example-stream" + shard_count = 1 +} + +resource "aws_kinesis_stream_consumer" "example" { + name = "example-consumer" + stream_arn = aws_kinesis_stream.example.arn +} +``` + +## Argument Reference + +The following arguments are supported: + +* `name` - (Required, Forces new resource) Name of the stream consumer. +* `stream_arn` – (Required, Forces new resource) Amazon Resource Name (ARN) of the data stream the consumer is registered with. + +## Attributes Reference + +In addition to all arguments above, the following attributes are exported: + +* `arn` - Amazon Resource Name (ARN) of the stream consumer. +* `creation_timestamp` - Approximate timestamp in [RFC3339 format](https://tools.ietf.org/html/rfc3339#section-5.8) of when the stream consumer was created. +* `id` - Amazon Resource Name (ARN) of the stream consumer. + +## Import + +Kinesis Stream Consumers can be imported using the Amazon Resource Name (ARN) e.g. + +``` +$ terraform import aws_kinesis_stream_consumer.example arn:aws:kinesis:us-west-2:123456789012:stream/example/consumer/example:1616044553 +``` + +[1]: https://docs.aws.amazon.com/streams/latest/dev/amazon-kinesis-consumers.html \ No newline at end of file